1919import io .netty .buffer .ByteBuf ;
2020import io .netty .buffer .Unpooled ;
2121import io .netty .handler .codec .http .HttpHeaders ;
22+ import io .reactivex .Flowable ;
2223
2324import java .io .ByteArrayOutputStream ;
2425import java .io .File ;
6061import org .testng .annotations .BeforeClass ;
6162import org .testng .annotations .Test ;
6263
63- import rx .Observable ;
64- import rx .RxReactiveStreams ;
65-
6664public class ReactiveStreamsTest {
6765
6866 private static final Logger LOGGER = LoggerFactory .getLogger (ReactiveStreamsTest .class );
6967
7068 public static Publisher <ByteBuf > createPublisher (final byte [] bytes , final int chunkSize ) {
71- Observable <ByteBuf > observable = Observable .from (new ByteBufIterable (bytes , chunkSize ));
72- return RxReactiveStreams .toPublisher (observable );
69+ return Flowable .fromIterable (new ByteBufIterable (bytes , chunkSize ));
7370 }
7471
7572 private Tomcat tomcat ;
@@ -236,11 +233,7 @@ public void testConnectionDoesNotGetClosed() throws Exception {
236233
237234 byte [] responseBody = response .getResponseBodyAsBytes ();
238235 responseBody = response .getResponseBodyAsBytes ();
239- assertEquals (
240- Integer .valueOf (response .getHeader ("X-" + CONTENT_LENGTH )).intValue (),
241- LARGE_IMAGE_BYTES .length ,
242- "Server side payload length invalid"
243- );
236+ assertEquals (Integer .valueOf (response .getHeader ("X-" + CONTENT_LENGTH )).intValue (), LARGE_IMAGE_BYTES .length , "Server side payload length invalid" );
244237 assertEquals (responseBody .length , LARGE_IMAGE_BYTES .length , "Client side payload length invalid" );
245238 assertEquals (response .getHeader (CONTENT_MD5 ), expectedMd5 , "Server side payload MD5 invalid" );
246239 assertEquals (TestUtils .md5 (responseBody ), expectedMd5 , "Client side payload MD5 invalid" );
@@ -249,11 +242,7 @@ public void testConnectionDoesNotGetClosed() throws Exception {
249242 response = requestBuilder .execute ().get ();
250243 assertEquals (response .getStatusCode (), 200 );
251244 responseBody = response .getResponseBodyAsBytes ();
252- assertEquals (
253- Integer .valueOf (response .getHeader ("X-" + CONTENT_LENGTH )).intValue (),
254- LARGE_IMAGE_BYTES .length ,
255- "Server side payload length invalid"
256- );
245+ assertEquals (Integer .valueOf (response .getHeader ("X-" + CONTENT_LENGTH )).intValue (), LARGE_IMAGE_BYTES .length , "Server side payload length invalid" );
257246 assertEquals (responseBody .length , LARGE_IMAGE_BYTES .length , "Client side payload length invalid" );
258247
259248 try {
@@ -285,9 +274,7 @@ public static void main(String[] args) throws Exception {
285274 @ Test (groups = "standalone" , expectedExceptions = ExecutionException .class )
286275 public void testFailingStream () throws Exception {
287276 try (AsyncHttpClient client = asyncHttpClient (config ().setRequestTimeout (100 * 6000 ))) {
288- Observable <ByteBuf > failingObservable = Observable .error (new FailedStream ());
289- Publisher <ByteBuf > failingPublisher = RxReactiveStreams .toPublisher (failingObservable );
290-
277+ Publisher <ByteBuf > failingPublisher = Flowable .error (new FailedStream ());
291278 client .preparePut (getTargetUrl ()).setBody (failingPublisher ).execute ().get ();
292279 }
293280 }
@@ -520,7 +507,7 @@ public ByteBufIterable(byte[] payload, int chunkSize) {
520507 @ Override
521508 public Iterator <ByteBuf > iterator () {
522509 return new Iterator <ByteBuf >() {
523- private volatile int currentIndex = 0 ;
510+ private int currentIndex = 0 ;
524511
525512 @ Override
526513 public boolean hasNext () {
0 commit comments