2121import java .nio .charset .Charset ;
2222import java .util .ArrayList ;
2323import java .util .List ;
24- import java .util .function .Supplier ;
2524
2625import org .reactivestreams .Publisher ;
2726import reactor .core .publisher .Flux ;
2827import reactor .core .publisher .Mono ;
2928
3029import org .springframework .core .io .buffer .DataBuffer ;
30+ import org .springframework .core .io .buffer .DataBufferFactory ;
3131
3232/**
3333 * A {@link Writer} that can write a {@link Flux} (or {@link Publisher}) to a data buffer.
3737 */
3838class FluxWriter extends Writer {
3939
40- private final Supplier < DataBuffer > factory ;
40+ private final DataBufferFactory factory ;
4141
4242 private final Charset charset ;
4343
44- private List <String > current = new ArrayList <>();
45-
4644 private List <Object > accumulated = new ArrayList <>();
4745
48- FluxWriter (Supplier <DataBuffer > factory ) {
49- this (factory , Charset .defaultCharset ());
50- }
51-
52- FluxWriter (Supplier <DataBuffer > factory , Charset charset ) {
46+ FluxWriter (DataBufferFactory factory , Charset charset ) {
5347 this .factory = factory ;
5448 this .charset = charset ;
5549 }
5650
57- public Publisher <? extends Publisher <? extends DataBuffer >> getBuffers () {
51+ @ SuppressWarnings ("unchecked" )
52+ public Flux <? extends Publisher <? extends DataBuffer >> getBuffers () {
5853 Flux <String > buffers = Flux .empty ();
59- if (!this .current .isEmpty ()) {
60- this .accumulated .add (new ArrayList <>(this .current ));
61- this .current .clear ();
62- }
54+ List <String > chunks = new ArrayList <>();
6355 for (Object thing : this .accumulated ) {
6456 if (thing instanceof Publisher ) {
65- @ SuppressWarnings ("unchecked" )
66- Publisher <String > publisher = (Publisher <String >) thing ;
67- buffers = buffers .concatWith (publisher );
57+ buffers = concatValues (chunks , buffers );
58+ buffers = buffers .concatWith ((Publisher <String >) thing );
6859 }
6960 else {
70- @ SuppressWarnings ("unchecked" )
71- List <String > list = (List <String >) thing ;
72- buffers = buffers .concatWithValues (list .toArray (new String [0 ]));
61+ chunks .add ((String ) thing );
7362 }
7463 }
75- return buffers .map ((string ) -> Mono .just (buffer ().write (string , this .charset )));
64+ buffers = concatValues (chunks , buffers );
65+ return buffers .map ((string ) -> Mono .fromCallable (() ->
66+ this .factory .allocateBuffer ().write (string , this .charset )));
67+ }
68+
69+ private Flux <String > concatValues (List <String > chunks , Flux <String > buffers ) {
70+ if (!chunks .isEmpty ()) {
71+ buffers = buffers .concatWithValues (chunks .toArray (new String [0 ]));
72+ chunks .clear ();
73+ }
74+ return buffers ;
7675 }
7776
7877 @ Override
7978 public void write (char [] cbuf , int off , int len ) throws IOException {
80- this .current .add (new String (cbuf , off , len ));
79+ this .accumulated .add (new String (cbuf , off , len ));
8180 }
8281
8382 @ Override
@@ -92,23 +91,8 @@ public void release() {
9291 // TODO: maybe implement this and call it on error
9392 }
9493
95- private DataBuffer buffer () {
96- return this .factory .get ();
97- }
98-
9994 public void write (Object thing ) {
100- if (thing instanceof Publisher ) {
101- if (!this .current .isEmpty ()) {
102- this .accumulated .add (new ArrayList <>(this .current ));
103- this .current .clear ();
104- }
105- this .accumulated .add (thing );
106- }
107- else {
108- if (thing instanceof String ) {
109- this .current .add ((String ) thing );
110- }
111- }
95+ this .accumulated .add (thing );
11296 }
11397
11498}
0 commit comments