From 2b3eaf12647593730c10cb646fd5ca7714f81c25 Mon Sep 17 00:00:00 2001 From: RulaKhaled Date: Fri, 31 Oct 2025 13:59:25 +0100 Subject: [PATCH 1/5] try to override p-map till react router esolves this issue --- .../react-router-7-framework-node-20-18/package.json | 5 +++++ .../react-router-7-framework-spa-node-20-18/package.json | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/dev-packages/e2e-tests/test-applications/react-router-7-framework-node-20-18/package.json b/dev-packages/e2e-tests/test-applications/react-router-7-framework-node-20-18/package.json index e9f1f0d51504..ceea35b5f885 100644 --- a/dev-packages/e2e-tests/test-applications/react-router-7-framework-node-20-18/package.json +++ b/dev-packages/e2e-tests/test-applications/react-router-7-framework-node-20-18/package.json @@ -55,5 +55,10 @@ "volta": { "extends": "../../package.json", "node": "20.18.2" + }, + "pnpm": { + "overrides": { + "p-map": "^6.0.0" + } } } diff --git a/dev-packages/e2e-tests/test-applications/react-router-7-framework-spa-node-20-18/package.json b/dev-packages/e2e-tests/test-applications/react-router-7-framework-spa-node-20-18/package.json index 4ad6fae68416..cd3b3bfcb332 100644 --- a/dev-packages/e2e-tests/test-applications/react-router-7-framework-spa-node-20-18/package.json +++ b/dev-packages/e2e-tests/test-applications/react-router-7-framework-spa-node-20-18/package.json @@ -53,5 +53,10 @@ "volta": { "extends": "../../package.json", "node": "20.18.2" + }, + "pnpm": { + "overrides": { + "p-map": "^6.0.0" + } } } From 0842114f499296f206aa43d853ff0021c7c63209 Mon Sep 17 00:00:00 2001 From: RulaKhaled Date: Tue, 4 Nov 2025 10:39:54 +0100 Subject: [PATCH 2/5] fix(cloudflare): Keep root span alive until streaming responses are consumed --- packages/cloudflare/src/request.ts | 92 +++++++++++++++---- packages/cloudflare/src/utils/streaming.ts | 72 +++++++++++++++ .../cloudflare/test/durableobject.test.ts | 20 +++- packages/cloudflare/test/handler.test.ts | 6 +- packages/cloudflare/test/pages-plugin.test.ts | 4 +- packages/cloudflare/test/request.test.ts | 34 ++++++- 6 files changed, 201 insertions(+), 27 deletions(-) create mode 100644 packages/cloudflare/src/utils/streaming.ts diff --git a/packages/cloudflare/src/request.ts b/packages/cloudflare/src/request.ts index 5c97562d9fde..4b3943f0f14d 100644 --- a/packages/cloudflare/src/request.ts +++ b/packages/cloudflare/src/request.ts @@ -8,13 +8,14 @@ import { parseStringToURLObject, SEMANTIC_ATTRIBUTE_SENTRY_OP, setHttpStatus, - startSpan, + startSpanManual, winterCGHeadersToDict, withIsolationScope, } from '@sentry/core'; import type { CloudflareOptions } from './client'; import { addCloudResourceContext, addCultureContext, addRequest } from './scope-utils'; import { init } from './sdk'; +import { classifyResponseStreaming } from './utils/streaming'; interface RequestHandlerWrapperOptions { options: CloudflareOptions; @@ -98,26 +99,79 @@ export function wrapRequestHandler( // Note: This span will not have a duration unless I/O happens in the handler. This is // because of how the cloudflare workers runtime works. // See: https://developers.cloudflare.com/workers/runtime-apis/performance/ - return startSpan( - { - name, - attributes, - }, - async span => { - try { - const res = await handler(); - setHttpStatus(span, res.status); - return res; - } catch (e) { - if (captureErrors) { - captureException(e, { mechanism: { handled: false, type: 'auto.http.cloudflare' } }); - } - throw e; - } finally { + + // Use startSpanManual to control when span ends (needed for streaming responses) + return startSpanManual({ name, attributes }, async span => { + let res: Response; + + try { + res = await handler(); + setHttpStatus(span, res.status); + } catch (e) { + span.end(); // End span on error + if (captureErrors) { + captureException(e, { mechanism: { handled: false, type: 'auto.http.cloudflare' } }); + } + waitUntil?.(flush(2000)); + throw e; + } + + // Classify response to detect actual streaming + const classification = await classifyResponseStreaming(res); + + if (classification.isStreaming) { + // Streaming response detected - monitor consumption to keep span alive + if (!classification.response.body) { + // Shouldn't happen since isStreaming requires body, but handle gracefully + span.end(); waitUntil?.(flush(2000)); + return classification.response; } - }, - ); + + const [clientStream, monitorStream] = classification.response.body.tee(); + + // Monitor stream consumption and end span when complete + const streamMonitor = (async () => { + const reader = monitorStream.getReader(); + + // Safety timeout to prevent infinite loops if stream hangs + const timeout = setTimeout(() => { + span.end(); + reader.cancel().catch(() => {}); + }, 30000); // 30 second max + + try { + let done = false; + while (!done) { + const result = await reader.read(); + done = result.done; + } + clearTimeout(timeout); + span.end(); + } catch (err) { + clearTimeout(timeout); + span.end(); + } finally { + reader.releaseLock(); + } + })(); + + // Use waitUntil to keep context alive and flush after span ends + waitUntil?.(streamMonitor.then(() => flush(2000))); + + // Return response with client stream + return new Response(clientStream, { + status: classification.response.status, + statusText: classification.response.statusText, + headers: classification.response.headers, + }); + } + + // Non-streaming response - end span immediately and return original + span.end(); + waitUntil?.(flush(2000)); + return classification.response; + }); }, ); }); diff --git a/packages/cloudflare/src/utils/streaming.ts b/packages/cloudflare/src/utils/streaming.ts new file mode 100644 index 000000000000..d4753a5430c8 --- /dev/null +++ b/packages/cloudflare/src/utils/streaming.ts @@ -0,0 +1,72 @@ +export type StreamingGuess = { + response: Response; + isStreaming: boolean; +}; + +/** + * + */ +export async function classifyResponseStreaming( + res: Response, + opts: { timeoutMs?: number } = {}, +): Promise { + const timeoutMs = opts.timeoutMs ?? 25; + + if (!res.body) { + return { response: res, isStreaming: false }; + } + + const ct = res.headers.get('content-type') ?? ''; + const cl = res.headers.get('content-length'); + + // Definitive streaming indicators + if (/^text\/event-stream\b/i.test(ct)) { + return { response: res, isStreaming: true }; + } + + // Definitive non-streaming indicators + if (cl && /^\d+$/.test(cl)) { + return { response: res, isStreaming: false }; + } + + // Probe the stream to detect streaming behavior + // NOTE: This tees the stream and returns a new Response object + const [probe, pass] = res.body.tee(); + const reader = probe.getReader(); + + const firstChunkPromise = (async () => { + try { + const { value, done } = await reader.read(); + reader.releaseLock(); + if (done) return { arrivedBytes: 0, done: true }; + const bytes = + value && typeof value === 'object' && 'byteLength' in value ? (value as { byteLength: number }).byteLength : 0; + return { arrivedBytes: bytes, done: false }; + } catch { + return { arrivedBytes: 0, done: false }; + } + })(); + + const timeout = new Promise<{ arrivedBytes: number; done: boolean }>(r => + setTimeout(() => r({ arrivedBytes: 0, done: false }), timeoutMs), + ); + + const peek = await Promise.race([firstChunkPromise, timeout]); + + // We must return the teed response since original is now locked + const preserved = new Response(pass, res); + + let isStreaming = false; + if (peek.done) { + // Stream completed immediately + isStreaming = false; + } else if (peek.arrivedBytes === 0) { + // Timeout waiting for first chunk - definitely streaming + isStreaming = true; + } else { + // Got first chunk - streaming if no Content-Length + isStreaming = cl == null; + } + + return { response: preserved, isStreaming }; +} diff --git a/packages/cloudflare/test/durableobject.test.ts b/packages/cloudflare/test/durableobject.test.ts index 4d9e2a20fe97..42a9ef45b735 100644 --- a/packages/cloudflare/test/durableobject.test.ts +++ b/packages/cloudflare/test/durableobject.test.ts @@ -133,11 +133,25 @@ describe('instrumentDurableObjectWithSentry', () => { waitUntil, } as unknown as ExecutionContext; const dObject: any = Reflect.construct(instrumented, [context, {} as any]); - expect(() => dObject.fetch(new Request('https://example.com'))).not.toThrow(); - expect(flush).not.toBeCalled(); - expect(waitUntil).toHaveBeenCalledOnce(); + + // Call fetch (don't await yet) + const responsePromise = dObject.fetch(new Request('https://example.com')); + + // Advance past classification timeout and get response + vi.advanceTimersByTime(30); + const response = await responsePromise; + + // Consume response (triggers span end for buffered responses) + await response.text(); + + // The flush should now be queued in waitUntil + expect(waitUntil).toHaveBeenCalled(); + + // Advance to trigger the setTimeout in the handler's waitUntil vi.advanceTimersToNextTimer(); await Promise.all(waitUntil.mock.calls.map(([p]) => p)); + + // Now flush should have been called expect(flush).toBeCalled(); }); diff --git a/packages/cloudflare/test/handler.test.ts b/packages/cloudflare/test/handler.test.ts index 7768689ffc48..15fa3effcd7f 100644 --- a/packages/cloudflare/test/handler.test.ts +++ b/packages/cloudflare/test/handler.test.ts @@ -72,7 +72,11 @@ describe('withSentry', () => { createMockExecutionContext(), ); - expect(result).toBe(response); + // Response may be wrapped for streaming detection, verify content + expect(result?.status).toBe(response.status); + if (result) { + expect(await result.text()).toBe('test'); + } }); test('merges options from env and callback', async () => { diff --git a/packages/cloudflare/test/pages-plugin.test.ts b/packages/cloudflare/test/pages-plugin.test.ts index 5cfbd1f4bb5e..7f70ac7de098 100644 --- a/packages/cloudflare/test/pages-plugin.test.ts +++ b/packages/cloudflare/test/pages-plugin.test.ts @@ -52,6 +52,8 @@ describe('sentryPagesPlugin', () => { pluginArgs: MOCK_OPTIONS, }); - expect(result).toBe(response); + // Response may be wrapped for streaming detection, verify content + expect(result.status).toBe(response.status); + expect(await result.text()).toBe('test'); }); }); diff --git a/packages/cloudflare/test/request.test.ts b/packages/cloudflare/test/request.test.ts index d6d0de5824a1..68f7dcdea7f2 100644 --- a/packages/cloudflare/test/request.test.ts +++ b/packages/cloudflare/test/request.test.ts @@ -33,7 +33,9 @@ describe('withSentry', () => { { options: MOCK_OPTIONS, request: new Request('https://example.com'), context: createMockExecutionContext() }, () => response, ); - expect(result).toBe(response); + // Response may be wrapped for streaming detection, verify content matches + expect(result.status).toBe(response.status); + expect(await result.text()).toBe('test'); }); test('flushes the event after the handler is done using the cloudflare context.waitUntil', async () => { @@ -48,6 +50,25 @@ describe('withSentry', () => { expect(waitUntilSpy).toHaveBeenLastCalledWith(expect.any(Promise)); }); + test('handles streaming responses correctly', async () => { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode('chunk1')); + controller.enqueue(new TextEncoder().encode('chunk2')); + controller.close(); + }, + }); + const streamingResponse = new Response(stream); + + const result = await wrapRequestHandler( + { options: MOCK_OPTIONS, request: new Request('https://example.com'), context: createMockExecutionContext() }, + () => streamingResponse, + ); + + const text = await result.text(); + expect(text).toBe('chunk1chunk2'); + }); + test("doesn't error if context is undefined", () => { expect(() => wrapRequestHandler( @@ -284,7 +305,7 @@ describe('withSentry', () => { mockRequest.headers.set('content-length', '10'); let sentryEvent: Event = {}; - await wrapRequestHandler( + const result = await wrapRequestHandler( { options: { ...MOCK_OPTIONS, @@ -299,10 +320,17 @@ describe('withSentry', () => { }, () => { SentryCore.captureMessage('sentry-trace'); - return new Response('test'); + const response = new Response('test'); + return response; }, ); + // Consume response to trigger span end for non-streaming responses + await result.text(); + + // Wait for async span end and transaction capture + await new Promise(resolve => setTimeout(resolve, 50)); + expect(sentryEvent.transaction).toEqual('GET /'); expect(sentryEvent.spans).toHaveLength(0); expect(sentryEvent.contexts?.trace).toEqual({ From eb7a5c130210d81150c9411140c1082733ff160d Mon Sep 17 00:00:00 2001 From: RulaKhaled Date: Tue, 4 Nov 2025 13:23:07 +0100 Subject: [PATCH 3/5] quick refactor --- packages/cloudflare/src/request.ts | 33 +++----- packages/cloudflare/src/utils/streaming.ts | 93 +++++++++++++--------- packages/cloudflare/test/request.test.ts | 22 ++--- 3 files changed, 77 insertions(+), 71 deletions(-) diff --git a/packages/cloudflare/src/request.ts b/packages/cloudflare/src/request.ts index 4b3943f0f14d..c9fc15831ed8 100644 --- a/packages/cloudflare/src/request.ts +++ b/packages/cloudflare/src/request.ts @@ -108,7 +108,7 @@ export function wrapRequestHandler( res = await handler(); setHttpStatus(span, res.status); } catch (e) { - span.end(); // End span on error + span.end(); if (captureErrors) { captureException(e, { mechanism: { handled: false, type: 'auto.http.cloudflare' } }); } @@ -119,26 +119,16 @@ export function wrapRequestHandler( // Classify response to detect actual streaming const classification = await classifyResponseStreaming(res); - if (classification.isStreaming) { + if (classification.isStreaming && classification.response.body) { // Streaming response detected - monitor consumption to keep span alive - if (!classification.response.body) { - // Shouldn't happen since isStreaming requires body, but handle gracefully - span.end(); - waitUntil?.(flush(2000)); - return classification.response; - } - const [clientStream, monitorStream] = classification.response.body.tee(); // Monitor stream consumption and end span when complete const streamMonitor = (async () => { const reader = monitorStream.getReader(); - // Safety timeout to prevent infinite loops if stream hangs - const timeout = setTimeout(() => { - span.end(); - reader.cancel().catch(() => {}); - }, 30000); // 30 second max + // Safety timeout - abort reading and end span after 5s even if stream hasn't finished + const timeout = setTimeout(() => reader.cancel(), 5000); try { let done = false; @@ -146,18 +136,19 @@ export function wrapRequestHandler( const result = await reader.read(); done = result.done; } - clearTimeout(timeout); - span.end(); - } catch (err) { - clearTimeout(timeout); - span.end(); + } catch { + // Stream error or cancellation - will end span in finally } finally { + clearTimeout(timeout); reader.releaseLock(); + span.end(); + waitUntil?.(flush(2000)); } })(); - // Use waitUntil to keep context alive and flush after span ends - waitUntil?.(streamMonitor.then(() => flush(2000))); + if (waitUntil) { + waitUntil(streamMonitor); + } // Return response with client stream return new Response(clientStream, { diff --git a/packages/cloudflare/src/utils/streaming.ts b/packages/cloudflare/src/utils/streaming.ts index d4753a5430c8..6225b1112e01 100644 --- a/packages/cloudflare/src/utils/streaming.ts +++ b/packages/cloudflare/src/utils/streaming.ts @@ -4,7 +4,18 @@ export type StreamingGuess = { }; /** + * Classifies a Response as streaming or non-streaming. * + * Uses multiple heuristics: + * - Content-Type: text/event-stream → streaming + * - Content-Length header present → not streaming + * - Otherwise: probes stream with timeout to detect behavior + * + * Note: Probing will tee() the stream and return a new Response object. + * + * @param res - The Response to classify + * @param opts.timeoutMs - Probe timeout in ms (default: 25) + * @returns Classification result with safe-to-return Response */ export async function classifyResponseStreaming( res: Response, @@ -16,57 +27,61 @@ export async function classifyResponseStreaming( return { response: res, isStreaming: false }; } - const ct = res.headers.get('content-type') ?? ''; - const cl = res.headers.get('content-length'); + const contentType = res.headers.get('content-type') ?? ''; + const contentLength = res.headers.get('content-length'); - // Definitive streaming indicators - if (/^text\/event-stream\b/i.test(ct)) { + // Fast path: Server-Sent Events + if (/^text\/event-stream\b/i.test(contentType)) { return { response: res, isStreaming: true }; } - // Definitive non-streaming indicators - if (cl && /^\d+$/.test(cl)) { + // Fast path: Content-Length indicates buffered response + if (contentLength && /^\d+$/.test(contentLength)) { return { response: res, isStreaming: false }; } - // Probe the stream to detect streaming behavior - // NOTE: This tees the stream and returns a new Response object - const [probe, pass] = res.body.tee(); - const reader = probe.getReader(); + // Uncertain - probe the stream to determine behavior + // After tee(), must use the teed stream (original is locked) + const [probeStream, passStream] = res.body.tee(); + const reader = probeStream.getReader(); - const firstChunkPromise = (async () => { - try { - const { value, done } = await reader.read(); - reader.releaseLock(); - if (done) return { arrivedBytes: 0, done: true }; - const bytes = - value && typeof value === 'object' && 'byteLength' in value ? (value as { byteLength: number }).byteLength : 0; - return { arrivedBytes: bytes, done: false }; - } catch { - return { arrivedBytes: 0, done: false }; - } - })(); + const probeResult = await Promise.race([ + // Try to read first chunk + (async () => { + try { + const { value, done } = await reader.read(); + reader.releaseLock(); - const timeout = new Promise<{ arrivedBytes: number; done: boolean }>(r => - setTimeout(() => r({ arrivedBytes: 0, done: false }), timeoutMs), - ); + if (done) { + return { arrivedBytes: 0, done: true }; + } - const peek = await Promise.race([firstChunkPromise, timeout]); + const bytes = + value && typeof value === 'object' && 'byteLength' in value + ? (value as { byteLength: number }).byteLength + : 0; + return { arrivedBytes: bytes, done: false }; + } catch { + return { arrivedBytes: 0, done: false }; + } + })(), + // Timeout if first chunk takes too long + new Promise<{ arrivedBytes: number; done: boolean }>(resolve => + setTimeout(() => resolve({ arrivedBytes: 0, done: false }), timeoutMs), + ), + ]); - // We must return the teed response since original is now locked - const preserved = new Response(pass, res); + const teededResponse = new Response(passStream, res); - let isStreaming = false; - if (peek.done) { - // Stream completed immediately - isStreaming = false; - } else if (peek.arrivedBytes === 0) { - // Timeout waiting for first chunk - definitely streaming - isStreaming = true; + // Determine if streaming based on probe result + if (probeResult.done) { + // Stream completed immediately - buffered + return { response: teededResponse, isStreaming: false }; + } else if (probeResult.arrivedBytes === 0) { + // Timeout waiting - definitely streaming + return { response: teededResponse, isStreaming: true }; } else { - // Got first chunk - streaming if no Content-Length - isStreaming = cl == null; + // Got chunk quickly - streaming if no Content-Length + return { response: teededResponse, isStreaming: contentLength == null }; } - - return { response: preserved, isStreaming }; } diff --git a/packages/cloudflare/test/request.test.ts b/packages/cloudflare/test/request.test.ts index 68f7dcdea7f2..0ef32618062f 100644 --- a/packages/cloudflare/test/request.test.ts +++ b/packages/cloudflare/test/request.test.ts @@ -90,7 +90,7 @@ describe('withSentry', () => { }); test('flush must be called when all waitUntil are done', async () => { - const flush = vi.spyOn(SentryCore.Client.prototype, 'flush'); + const flushSpy = vi.spyOn(SentryCore.Client.prototype, 'flush'); vi.useFakeTimers(); onTestFinished(() => { vi.useRealTimers(); @@ -104,13 +104,17 @@ describe('withSentry', () => { await wrapRequestHandler({ options: MOCK_OPTIONS, request: new Request('https://example.com'), context }, () => { addDelayedWaitUntil(context); - return new Response('test'); + const response = new Response('test'); + // Add Content-Length to skip probing + response.headers.set('content-length', '4'); + return response; }); - expect(flush).not.toBeCalled(); + expect(flushSpy).not.toBeCalled(); expect(waitUntil).toBeCalled(); - vi.advanceTimersToNextTimerAsync().then(() => vi.runAllTimers()); + await vi.advanceTimersToNextTimerAsync(); + vi.runAllTimers(); await Promise.all(waits); - expect(flush).toHaveBeenCalledOnce(); + expect(flushSpy).toHaveBeenCalledOnce(); }); describe('scope instrumentation', () => { @@ -305,7 +309,7 @@ describe('withSentry', () => { mockRequest.headers.set('content-length', '10'); let sentryEvent: Event = {}; - const result = await wrapRequestHandler( + await wrapRequestHandler( { options: { ...MOCK_OPTIONS, @@ -320,14 +324,10 @@ describe('withSentry', () => { }, () => { SentryCore.captureMessage('sentry-trace'); - const response = new Response('test'); - return response; + return new Response('test'); }, ); - // Consume response to trigger span end for non-streaming responses - await result.text(); - // Wait for async span end and transaction capture await new Promise(resolve => setTimeout(resolve, 50)); From ae8e345a162e031d8a3af26a7537e449450ba57e Mon Sep 17 00:00:00 2001 From: RulaKhaled Date: Tue, 4 Nov 2025 15:53:26 +0100 Subject: [PATCH 4/5] test buffered without timeout --- packages/cloudflare/src/utils/streaming.ts | 57 ++++++---------------- 1 file changed, 15 insertions(+), 42 deletions(-) diff --git a/packages/cloudflare/src/utils/streaming.ts b/packages/cloudflare/src/utils/streaming.ts index 6225b1112e01..76593a22d169 100644 --- a/packages/cloudflare/src/utils/streaming.ts +++ b/packages/cloudflare/src/utils/streaming.ts @@ -14,15 +14,9 @@ export type StreamingGuess = { * Note: Probing will tee() the stream and return a new Response object. * * @param res - The Response to classify - * @param opts.timeoutMs - Probe timeout in ms (default: 25) * @returns Classification result with safe-to-return Response */ -export async function classifyResponseStreaming( - res: Response, - opts: { timeoutMs?: number } = {}, -): Promise { - const timeoutMs = opts.timeoutMs ?? 25; - +export async function classifyResponseStreaming(res: Response): Promise { if (!res.body) { return { response: res, isStreaming: false }; } @@ -40,48 +34,27 @@ export async function classifyResponseStreaming( return { response: res, isStreaming: false }; } - // Uncertain - probe the stream to determine behavior + // Probe the stream by trying to read first chunk immediately // After tee(), must use the teed stream (original is locked) const [probeStream, passStream] = res.body.tee(); const reader = probeStream.getReader(); - const probeResult = await Promise.race([ - // Try to read first chunk - (async () => { - try { - const { value, done } = await reader.read(); - reader.releaseLock(); - - if (done) { - return { arrivedBytes: 0, done: true }; - } + try { + const { done } = await reader.read(); + reader.releaseLock(); - const bytes = - value && typeof value === 'object' && 'byteLength' in value - ? (value as { byteLength: number }).byteLength - : 0; - return { arrivedBytes: bytes, done: false }; - } catch { - return { arrivedBytes: 0, done: false }; - } - })(), - // Timeout if first chunk takes too long - new Promise<{ arrivedBytes: number; done: boolean }>(resolve => - setTimeout(() => resolve({ arrivedBytes: 0, done: false }), timeoutMs), - ), - ]); + const teededResponse = new Response(passStream, res); - const teededResponse = new Response(passStream, res); + if (done) { + // Stream completed immediately - buffered (empty body) + return { response: teededResponse, isStreaming: false }; + } - // Determine if streaming based on probe result - if (probeResult.done) { - // Stream completed immediately - buffered - return { response: teededResponse, isStreaming: false }; - } else if (probeResult.arrivedBytes === 0) { - // Timeout waiting - definitely streaming - return { response: teededResponse, isStreaming: true }; - } else { - // Got chunk quickly - streaming if no Content-Length + // Got data - treat as streaming if no Content-Length header return { response: teededResponse, isStreaming: contentLength == null }; + } catch { + reader.releaseLock(); + // Error reading - treat as non-streaming to be safe + return { response: new Response(passStream, res), isStreaming: false }; } } From 62a1b2c2ddb2ad91125d1938b92694f5de63783e Mon Sep 17 00:00:00 2001 From: RulaKhaled Date: Tue, 4 Nov 2025 16:03:51 +0100 Subject: [PATCH 5/5] remove timeout --- packages/cloudflare/src/request.ts | 8 +------- packages/cloudflare/src/utils/streaming.ts | 6 +++++- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/packages/cloudflare/src/request.ts b/packages/cloudflare/src/request.ts index c9fc15831ed8..7ac6b924c876 100644 --- a/packages/cloudflare/src/request.ts +++ b/packages/cloudflare/src/request.ts @@ -127,9 +127,6 @@ export function wrapRequestHandler( const streamMonitor = (async () => { const reader = monitorStream.getReader(); - // Safety timeout - abort reading and end span after 5s even if stream hasn't finished - const timeout = setTimeout(() => reader.cancel(), 5000); - try { let done = false; while (!done) { @@ -139,16 +136,13 @@ export function wrapRequestHandler( } catch { // Stream error or cancellation - will end span in finally } finally { - clearTimeout(timeout); reader.releaseLock(); span.end(); waitUntil?.(flush(2000)); } })(); - if (waitUntil) { - waitUntil(streamMonitor); - } + waitUntil?.(streamMonitor); // Return response with client stream return new Response(clientStream, { diff --git a/packages/cloudflare/src/utils/streaming.ts b/packages/cloudflare/src/utils/streaming.ts index 76593a22d169..2184cac48a3d 100644 --- a/packages/cloudflare/src/utils/streaming.ts +++ b/packages/cloudflare/src/utils/streaming.ts @@ -7,9 +7,13 @@ export type StreamingGuess = { * Classifies a Response as streaming or non-streaming. * * Uses multiple heuristics: + * - No body → not streaming * - Content-Type: text/event-stream → streaming * - Content-Length header present → not streaming - * - Otherwise: probes stream with timeout to detect behavior + * - Otherwise: attempts immediate read to detect behavior + * - Stream empty (done) → not streaming + * - Got data without Content-Length → streaming + * - Got data with Content-Length → not streaming * * Note: Probing will tee() the stream and return a new Response object. *