diff --git a/packages/cloudflare/src/request.ts b/packages/cloudflare/src/request.ts index 5c97562d9fde..7ac6b924c876 100644 --- a/packages/cloudflare/src/request.ts +++ b/packages/cloudflare/src/request.ts @@ -8,13 +8,14 @@ import { parseStringToURLObject, SEMANTIC_ATTRIBUTE_SENTRY_OP, setHttpStatus, - startSpan, + startSpanManual, winterCGHeadersToDict, withIsolationScope, } from '@sentry/core'; import type { CloudflareOptions } from './client'; import { addCloudResourceContext, addCultureContext, addRequest } from './scope-utils'; import { init } from './sdk'; +import { classifyResponseStreaming } from './utils/streaming'; interface RequestHandlerWrapperOptions { options: CloudflareOptions; @@ -98,26 +99,64 @@ export function wrapRequestHandler( // Note: This span will not have a duration unless I/O happens in the handler. This is // because of how the cloudflare workers runtime works. // See: https://developers.cloudflare.com/workers/runtime-apis/performance/ - return startSpan( - { - name, - attributes, - }, - async span => { - try { - const res = await handler(); - setHttpStatus(span, res.status); - return res; - } catch (e) { - if (captureErrors) { - captureException(e, { mechanism: { handled: false, type: 'auto.http.cloudflare' } }); - } - throw e; - } finally { - waitUntil?.(flush(2000)); + + // Use startSpanManual to control when span ends (needed for streaming responses) + return startSpanManual({ name, attributes }, async span => { + let res: Response; + + try { + res = await handler(); + setHttpStatus(span, res.status); + } catch (e) { + span.end(); + if (captureErrors) { + captureException(e, { mechanism: { handled: false, type: 'auto.http.cloudflare' } }); } - }, - ); + waitUntil?.(flush(2000)); + throw e; + } + + // Classify response to detect actual streaming + const classification = await classifyResponseStreaming(res); + + if (classification.isStreaming && classification.response.body) { + // Streaming response detected - monitor consumption to keep span alive + const [clientStream, monitorStream] = classification.response.body.tee(); + + // Monitor stream consumption and end span when complete + const streamMonitor = (async () => { + const reader = monitorStream.getReader(); + + try { + let done = false; + while (!done) { + const result = await reader.read(); + done = result.done; + } + } catch { + // Stream error or cancellation - will end span in finally + } finally { + reader.releaseLock(); + span.end(); + waitUntil?.(flush(2000)); + } + })(); + + waitUntil?.(streamMonitor); + + // Return response with client stream + return new Response(clientStream, { + status: classification.response.status, + statusText: classification.response.statusText, + headers: classification.response.headers, + }); + } + + // Non-streaming response - end span immediately and return original + span.end(); + waitUntil?.(flush(2000)); + return classification.response; + }); }, ); }); diff --git a/packages/cloudflare/src/utils/streaming.ts b/packages/cloudflare/src/utils/streaming.ts new file mode 100644 index 000000000000..2184cac48a3d --- /dev/null +++ b/packages/cloudflare/src/utils/streaming.ts @@ -0,0 +1,64 @@ +export type StreamingGuess = { + response: Response; + isStreaming: boolean; +}; + +/** + * Classifies a Response as streaming or non-streaming. + * + * Uses multiple heuristics: + * - No body → not streaming + * - Content-Type: text/event-stream → streaming + * - Content-Length header present → not streaming + * - Otherwise: attempts immediate read to detect behavior + * - Stream empty (done) → not streaming + * - Got data without Content-Length → streaming + * - Got data with Content-Length → not streaming + * + * Note: Probing will tee() the stream and return a new Response object. + * + * @param res - The Response to classify + * @returns Classification result with safe-to-return Response + */ +export async function classifyResponseStreaming(res: Response): Promise { + if (!res.body) { + return { response: res, isStreaming: false }; + } + + const contentType = res.headers.get('content-type') ?? ''; + const contentLength = res.headers.get('content-length'); + + // Fast path: Server-Sent Events + if (/^text\/event-stream\b/i.test(contentType)) { + return { response: res, isStreaming: true }; + } + + // Fast path: Content-Length indicates buffered response + if (contentLength && /^\d+$/.test(contentLength)) { + return { response: res, isStreaming: false }; + } + + // Probe the stream by trying to read first chunk immediately + // After tee(), must use the teed stream (original is locked) + const [probeStream, passStream] = res.body.tee(); + const reader = probeStream.getReader(); + + try { + const { done } = await reader.read(); + reader.releaseLock(); + + const teededResponse = new Response(passStream, res); + + if (done) { + // Stream completed immediately - buffered (empty body) + return { response: teededResponse, isStreaming: false }; + } + + // Got data - treat as streaming if no Content-Length header + return { response: teededResponse, isStreaming: contentLength == null }; + } catch { + reader.releaseLock(); + // Error reading - treat as non-streaming to be safe + return { response: new Response(passStream, res), isStreaming: false }; + } +} diff --git a/packages/cloudflare/test/durableobject.test.ts b/packages/cloudflare/test/durableobject.test.ts index 4d9e2a20fe97..42a9ef45b735 100644 --- a/packages/cloudflare/test/durableobject.test.ts +++ b/packages/cloudflare/test/durableobject.test.ts @@ -133,11 +133,25 @@ describe('instrumentDurableObjectWithSentry', () => { waitUntil, } as unknown as ExecutionContext; const dObject: any = Reflect.construct(instrumented, [context, {} as any]); - expect(() => dObject.fetch(new Request('https://example.com'))).not.toThrow(); - expect(flush).not.toBeCalled(); - expect(waitUntil).toHaveBeenCalledOnce(); + + // Call fetch (don't await yet) + const responsePromise = dObject.fetch(new Request('https://example.com')); + + // Advance past classification timeout and get response + vi.advanceTimersByTime(30); + const response = await responsePromise; + + // Consume response (triggers span end for buffered responses) + await response.text(); + + // The flush should now be queued in waitUntil + expect(waitUntil).toHaveBeenCalled(); + + // Advance to trigger the setTimeout in the handler's waitUntil vi.advanceTimersToNextTimer(); await Promise.all(waitUntil.mock.calls.map(([p]) => p)); + + // Now flush should have been called expect(flush).toBeCalled(); }); diff --git a/packages/cloudflare/test/handler.test.ts b/packages/cloudflare/test/handler.test.ts index 7768689ffc48..15fa3effcd7f 100644 --- a/packages/cloudflare/test/handler.test.ts +++ b/packages/cloudflare/test/handler.test.ts @@ -72,7 +72,11 @@ describe('withSentry', () => { createMockExecutionContext(), ); - expect(result).toBe(response); + // Response may be wrapped for streaming detection, verify content + expect(result?.status).toBe(response.status); + if (result) { + expect(await result.text()).toBe('test'); + } }); test('merges options from env and callback', async () => { diff --git a/packages/cloudflare/test/pages-plugin.test.ts b/packages/cloudflare/test/pages-plugin.test.ts index 5cfbd1f4bb5e..7f70ac7de098 100644 --- a/packages/cloudflare/test/pages-plugin.test.ts +++ b/packages/cloudflare/test/pages-plugin.test.ts @@ -52,6 +52,8 @@ describe('sentryPagesPlugin', () => { pluginArgs: MOCK_OPTIONS, }); - expect(result).toBe(response); + // Response may be wrapped for streaming detection, verify content + expect(result.status).toBe(response.status); + expect(await result.text()).toBe('test'); }); }); diff --git a/packages/cloudflare/test/request.test.ts b/packages/cloudflare/test/request.test.ts index d6d0de5824a1..0ef32618062f 100644 --- a/packages/cloudflare/test/request.test.ts +++ b/packages/cloudflare/test/request.test.ts @@ -33,7 +33,9 @@ describe('withSentry', () => { { options: MOCK_OPTIONS, request: new Request('https://example.com'), context: createMockExecutionContext() }, () => response, ); - expect(result).toBe(response); + // Response may be wrapped for streaming detection, verify content matches + expect(result.status).toBe(response.status); + expect(await result.text()).toBe('test'); }); test('flushes the event after the handler is done using the cloudflare context.waitUntil', async () => { @@ -48,6 +50,25 @@ describe('withSentry', () => { expect(waitUntilSpy).toHaveBeenLastCalledWith(expect.any(Promise)); }); + test('handles streaming responses correctly', async () => { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode('chunk1')); + controller.enqueue(new TextEncoder().encode('chunk2')); + controller.close(); + }, + }); + const streamingResponse = new Response(stream); + + const result = await wrapRequestHandler( + { options: MOCK_OPTIONS, request: new Request('https://example.com'), context: createMockExecutionContext() }, + () => streamingResponse, + ); + + const text = await result.text(); + expect(text).toBe('chunk1chunk2'); + }); + test("doesn't error if context is undefined", () => { expect(() => wrapRequestHandler( @@ -69,7 +90,7 @@ describe('withSentry', () => { }); test('flush must be called when all waitUntil are done', async () => { - const flush = vi.spyOn(SentryCore.Client.prototype, 'flush'); + const flushSpy = vi.spyOn(SentryCore.Client.prototype, 'flush'); vi.useFakeTimers(); onTestFinished(() => { vi.useRealTimers(); @@ -83,13 +104,17 @@ describe('withSentry', () => { await wrapRequestHandler({ options: MOCK_OPTIONS, request: new Request('https://example.com'), context }, () => { addDelayedWaitUntil(context); - return new Response('test'); + const response = new Response('test'); + // Add Content-Length to skip probing + response.headers.set('content-length', '4'); + return response; }); - expect(flush).not.toBeCalled(); + expect(flushSpy).not.toBeCalled(); expect(waitUntil).toBeCalled(); - vi.advanceTimersToNextTimerAsync().then(() => vi.runAllTimers()); + await vi.advanceTimersToNextTimerAsync(); + vi.runAllTimers(); await Promise.all(waits); - expect(flush).toHaveBeenCalledOnce(); + expect(flushSpy).toHaveBeenCalledOnce(); }); describe('scope instrumentation', () => { @@ -303,6 +328,9 @@ describe('withSentry', () => { }, ); + // Wait for async span end and transaction capture + await new Promise(resolve => setTimeout(resolve, 50)); + expect(sentryEvent.transaction).toEqual('GET /'); expect(sentryEvent.spans).toHaveLength(0); expect(sentryEvent.contexts?.trace).toEqual({