2121import io .netty .handler .codec .http .HttpHeaderValues ;
2222import io .netty .handler .codec .http .HttpRequest ;
2323import io .netty .handler .codec .http .HttpResponse ;
24+ import io .netty .handler .codec .http .LastHttpContent ;
2425import io .netty .handler .codec .http .websocketx .BinaryWebSocketFrame ;
2526import io .netty .handler .codec .http .websocketx .CloseWebSocketFrame ;
2627import io .netty .handler .codec .http .websocketx .PingWebSocketFrame ;
3637import org .asynchttpclient .HttpResponseStatus ;
3738import org .asynchttpclient .netty .NettyResponseFuture ;
3839import org .asynchttpclient .netty .NettyResponseStatus ;
39- import org .asynchttpclient .netty .OnLastHttpContentCallback ;
4040import org .asynchttpclient .netty .channel .ChannelManager ;
4141import org .asynchttpclient .netty .channel .Channels ;
4242import org .asynchttpclient .netty .request .NettyRequestSender ;
@@ -52,71 +52,45 @@ public WebSocketHandler(AsyncHttpClientConfig config,//
5252 super (config , channelManager , requestSender );
5353 }
5454
55- private class UpgradeCallback extends OnLastHttpContentCallback {
56-
57- private final Channel channel ;
58- private final HttpResponse response ;
59- private final WebSocketUpgradeHandler handler ;
60- private final HttpResponseStatus status ;
61- private final HttpResponseHeaders responseHeaders ;
62-
63- public UpgradeCallback (NettyResponseFuture <?> future , Channel channel , HttpResponse response , WebSocketUpgradeHandler handler , HttpResponseStatus status ,
64- HttpResponseHeaders responseHeaders ) {
65- super (future );
66- this .channel = channel ;
67- this .response = response ;
68- this .handler = handler ;
69- this .status = status ;
70- this .responseHeaders = responseHeaders ;
55+ private void upgrade (Channel channel , NettyResponseFuture <?> future , WebSocketUpgradeHandler handler , HttpResponse response , HttpResponseHeaders responseHeaders )
56+ throws Exception {
57+ boolean validStatus = response .status ().equals (SWITCHING_PROTOCOLS );
58+ boolean validUpgrade = response .headers ().get (UPGRADE ) != null ;
59+ String connection = response .headers ().get (CONNECTION );
60+ boolean validConnection = HttpHeaderValues .UPGRADE .contentEqualsIgnoreCase (connection );
61+ final boolean headerOK = handler .onHeadersReceived (responseHeaders ) == State .CONTINUE ;
62+ if (!headerOK || !validStatus || !validUpgrade || !validConnection ) {
63+ requestSender .abort (channel , future , new IOException ("Invalid handshake response" ));
64+ return ;
7165 }
7266
73- // We don't need to synchronize as replacing the "ws-decoder" will
74- // process using the same thread.
75- private void invokeOnSucces (Channel channel , WebSocketUpgradeHandler h ) {
76- try {
77- h .onSuccess (new NettyWebSocket (channel , responseHeaders .getHeaders ()));
78- } catch (Exception ex ) {
79- logger .warn ("onSuccess unexpected exception" , ex );
80- }
67+ String accept = response .headers ().get (SEC_WEBSOCKET_ACCEPT );
68+ String key = getAcceptKey (future .getNettyRequest ().getHttpRequest ().headers ().get (SEC_WEBSOCKET_KEY ));
69+ if (accept == null || !accept .equals (key )) {
70+ requestSender .abort (channel , future , new IOException ("Invalid challenge. Actual: " + accept + ". Expected: " + key ));
8171 }
8272
83- @ Override
84- public void call () throws Exception {
85- boolean validStatus = response .status ().equals (SWITCHING_PROTOCOLS );
86- boolean validUpgrade = response .headers ().get (UPGRADE ) != null ;
87- String connection = response .headers ().get (CONNECTION );
88- boolean validConnection = HttpHeaderValues .UPGRADE .contentEqualsIgnoreCase (connection );
89- boolean statusReceived = handler .onStatusReceived (status ) == State .CONTINUE ;
90-
91- if (!statusReceived ) {
92- try {
93- handler .onCompleted ();
94- } finally {
95- future .done ();
96- }
97- return ;
98- }
99-
100- final boolean headerOK = handler .onHeadersReceived (responseHeaders ) == State .CONTINUE ;
101- if (!headerOK || !validStatus || !validUpgrade || !validConnection ) {
102- requestSender .abort (channel , future , new IOException ("Invalid handshake response" ));
103- return ;
104- }
73+ // set back the future so the protocol gets notified of frames
74+ // removing the HttpClientCodec from the pipeline might trigger a read with a WebSocket message
75+ // if it comes in the same frame as the HTTP Upgrade response
76+ Channels .setAttribute (channel , future );
10577
106- String accept = response .headers ().get (SEC_WEBSOCKET_ACCEPT );
107- String key = getAcceptKey (future .getNettyRequest ().getHttpRequest ().headers ().get (SEC_WEBSOCKET_KEY ));
108- if (accept == null || !accept .equals (key )) {
109- requestSender .abort (channel , future , new IOException (String .format ("Invalid challenge. Actual: %s. Expected: %s" , accept , key )));
110- }
78+ channelManager .upgradePipelineForWebSockets (channel .pipeline ());
11179
112- // set back the future so the protocol gets notified of frames
113- // removing the HttpClientCodec from the pipeline might trigger a read with a WebSocket message
114- // if it comes in the same frame as the HTTP Upgrade response
115- Channels .setAttribute (channel , future );
116-
117- channelManager .upgradePipelineForWebSockets (channel .pipeline ());
80+ // We don't need to synchronize as replacing the "ws-decoder" will
81+ // process using the same thread.
82+ try {
83+ handler .openWebSocket (new NettyWebSocket (channel , responseHeaders .getHeaders ()));
84+ } catch (Exception ex ) {
85+ logger .warn ("onSuccess unexpected exception" , ex );
86+ }
87+ future .done ();
88+ }
11889
119- invokeOnSucces (channel , handler );
90+ private void abort (NettyResponseFuture <?> future , WebSocketUpgradeHandler handler , HttpResponseStatus status ) throws Exception {
91+ try {
92+ handler .onThrowable (new IOException ("Invalid Status code=" + status .getStatusCode () + " text=" + status .getStatusText ()));
93+ } finally {
12094 future .done ();
12195 }
12296 }
@@ -136,36 +110,23 @@ public void handleRead(Channel channel, NettyResponseFuture<?> future, Object e)
136110 HttpResponseHeaders responseHeaders = new HttpResponseHeaders (response .headers ());
137111
138112 if (!interceptors .exitAfterIntercept (channel , future , handler , response , status , responseHeaders )) {
139- Channels .setAttribute (channel , new UpgradeCallback (future , channel , response , handler , status , responseHeaders ));
113+ switch (handler .onStatusReceived (status )) {
114+ case CONTINUE :
115+ upgrade (channel , future , handler , response , responseHeaders );
116+ break ;
117+ default :
118+ abort (future , handler , status );
119+ }
140120 }
141121
142122 } else if (e instanceof WebSocketFrame ) {
143123 final WebSocketFrame frame = (WebSocketFrame ) e ;
144124 WebSocketUpgradeHandler handler = (WebSocketUpgradeHandler ) future .getAsyncHandler ();
145125 NettyWebSocket webSocket = (NettyWebSocket ) handler .onCompleted ();
126+ handleFrame (channel , frame , handler , webSocket );
146127
147- if (webSocket != null ) {
148- handleFrame (channel , frame , handler , webSocket );
149- } else {
150- logger .debug ("Frame received but WebSocket is not available yet, buffering frame" );
151- frame .retain ();
152- Runnable bufferedFrame = new Runnable () {
153- public void run () {
154- try {
155- // WebSocket is now not null
156- NettyWebSocket webSocket = (NettyWebSocket ) handler .onCompleted ();
157- handleFrame (channel , frame , handler , webSocket );
158- } catch (Exception e ) {
159- logger .debug ("Failure while handling buffered frame" , e );
160- handler .onFailure (e );
161- } finally {
162- frame .release ();
163- }
164- }
165- };
166- handler .bufferFrame (bufferedFrame );
167- }
168- } else {
128+ } else if (!(e instanceof LastHttpContent )) {
129+ // ignore, end of handshake response
169130 logger .error ("Invalid message {}" , e );
170131 }
171132 }
@@ -197,7 +158,6 @@ public void handleException(NettyResponseFuture<?> future, Throwable e) {
197158
198159 try {
199160 WebSocketUpgradeHandler h = (WebSocketUpgradeHandler ) future .getAsyncHandler ();
200-
201161 NettyWebSocket webSocket = NettyWebSocket .class .cast (h .onCompleted ());
202162 if (webSocket != null ) {
203163 webSocket .onError (e .getCause ());
0 commit comments