@@ -2533,4 +2533,59 @@ class HTTPClientTests: XCTestCase {
25332533 XCTAssertEqual ( info. connectionNumber, 1 )
25342534 XCTAssertEqual ( info. requestNumber, 1 )
25352535 }
2536+
2537+ func testBackpressue( ) {
2538+ class BackpressureResponseDelegate : HTTPClientResponseDelegate {
2539+ typealias Response = Void
2540+ var count = 0
2541+ var processingBodyPart = false
2542+ var didntWait = false
2543+ var lock = Lock ( )
2544+
2545+ init ( ) { }
2546+
2547+ func didReceiveHead( task: HTTPClient . Task < Response > , _ head: HTTPResponseHead ) -> EventLoopFuture < Void > {
2548+ return task. eventLoop. makeSucceededFuture ( ( ) )
2549+ }
2550+
2551+ func didReceiveBodyPart( task: HTTPClient . Task < Response > , _ part: ByteBuffer ) -> EventLoopFuture < Void > {
2552+ lock. withLock {
2553+ // if processingBodyPart is true then previous body part is still being processed
2554+ // XCTAssertEqual doesn't work here so store result to test later
2555+ if processingBodyPart == true {
2556+ didntWait = true
2557+ }
2558+ processingBodyPart = true
2559+ count += 1
2560+ }
2561+ // wait one second before returning a successful future
2562+ return task. eventLoop. scheduleTask ( in: . milliseconds( 1000 ) ) {
2563+ self . lock. withLock {
2564+ self . processingBodyPart = false
2565+ self . count -= 1
2566+ }
2567+ } . futureResult
2568+ }
2569+
2570+ func didReceiveError( task: HTTPClient . Task < Response > , _ error: Error ) { }
2571+ func didFinishRequest( task: HTTPClient . Task < Response > ) throws { }
2572+ }
2573+
2574+ let elg = MultiThreadedEventLoopGroup ( numberOfThreads: 5 )
2575+ let client = HTTPClient ( eventLoopGroupProvider: . shared( elg) )
2576+ defer {
2577+ XCTAssertNoThrow ( try client. syncShutdown ( ) )
2578+ XCTAssertNoThrow ( try elg. syncShutdownGracefully ( ) )
2579+ }
2580+
2581+ let data = Data ( count: 65273 )
2582+ let backpressureResponseDelegate = BackpressureResponseDelegate ( )
2583+ guard let request = try ? HTTPClient . Request ( url: self . defaultHTTPBinURLPrefix + " get " , body: . data( data) ) else {
2584+ XCTFail ( " Failed to init Request " )
2585+ return
2586+ }
2587+ XCTAssertNoThrow ( try client. execute ( request: request, delegate: backpressureResponseDelegate) . wait ( ) )
2588+ XCTAssertEqual ( backpressureResponseDelegate. didntWait, false )
2589+ XCTAssertEqual ( backpressureResponseDelegate. count, 0 )
2590+ }
25362591}
0 commit comments