Skip to content

Commit 0842114

Browse files
committed
fix(cloudflare): Keep root span alive until streaming responses are consumed
1 parent 2b3eaf1 commit 0842114

File tree

6 files changed

+201
-27
lines changed

6 files changed

+201
-27
lines changed

packages/cloudflare/src/request.ts

Lines changed: 73 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,14 @@ import {
88
parseStringToURLObject,
99
SEMANTIC_ATTRIBUTE_SENTRY_OP,
1010
setHttpStatus,
11-
startSpan,
11+
startSpanManual,
1212
winterCGHeadersToDict,
1313
withIsolationScope,
1414
} from '@sentry/core';
1515
import type { CloudflareOptions } from './client';
1616
import { addCloudResourceContext, addCultureContext, addRequest } from './scope-utils';
1717
import { init } from './sdk';
18+
import { classifyResponseStreaming } from './utils/streaming';
1819

1920
interface RequestHandlerWrapperOptions {
2021
options: CloudflareOptions;
@@ -98,26 +99,79 @@ export function wrapRequestHandler(
9899
// Note: This span will not have a duration unless I/O happens in the handler. This is
99100
// because of how the cloudflare workers runtime works.
100101
// See: https://developers.cloudflare.com/workers/runtime-apis/performance/
101-
return startSpan(
102-
{
103-
name,
104-
attributes,
105-
},
106-
async span => {
107-
try {
108-
const res = await handler();
109-
setHttpStatus(span, res.status);
110-
return res;
111-
} catch (e) {
112-
if (captureErrors) {
113-
captureException(e, { mechanism: { handled: false, type: 'auto.http.cloudflare' } });
114-
}
115-
throw e;
116-
} finally {
102+
103+
// Use startSpanManual to control when span ends (needed for streaming responses)
104+
return startSpanManual({ name, attributes }, async span => {
105+
let res: Response;
106+
107+
try {
108+
res = await handler();
109+
setHttpStatus(span, res.status);
110+
} catch (e) {
111+
span.end(); // End span on error
112+
if (captureErrors) {
113+
captureException(e, { mechanism: { handled: false, type: 'auto.http.cloudflare' } });
114+
}
115+
waitUntil?.(flush(2000));
116+
throw e;
117+
}
118+
119+
// Classify response to detect actual streaming
120+
const classification = await classifyResponseStreaming(res);
121+
122+
if (classification.isStreaming) {
123+
// Streaming response detected - monitor consumption to keep span alive
124+
if (!classification.response.body) {
125+
// Shouldn't happen since isStreaming requires body, but handle gracefully
126+
span.end();
117127
waitUntil?.(flush(2000));
128+
return classification.response;
118129
}
119-
},
120-
);
130+
131+
const [clientStream, monitorStream] = classification.response.body.tee();
132+
133+
// Monitor stream consumption and end span when complete
134+
const streamMonitor = (async () => {
135+
const reader = monitorStream.getReader();
136+
137+
// Safety timeout to prevent infinite loops if stream hangs
138+
const timeout = setTimeout(() => {
139+
span.end();
140+
reader.cancel().catch(() => {});
141+
}, 30000); // 30 second max
142+
143+
try {
144+
let done = false;
145+
while (!done) {
146+
const result = await reader.read();
147+
done = result.done;
148+
}
149+
clearTimeout(timeout);
150+
span.end();
151+
} catch (err) {
152+
clearTimeout(timeout);
153+
span.end();
154+
} finally {
155+
reader.releaseLock();
156+
}
157+
})();
158+
159+
// Use waitUntil to keep context alive and flush after span ends
160+
waitUntil?.(streamMonitor.then(() => flush(2000)));
161+
162+
// Return response with client stream
163+
return new Response(clientStream, {
164+
status: classification.response.status,
165+
statusText: classification.response.statusText,
166+
headers: classification.response.headers,
167+
});
168+
}
169+
170+
// Non-streaming response - end span immediately and return original
171+
span.end();
172+
waitUntil?.(flush(2000));
173+
return classification.response;
174+
});
121175
},
122176
);
123177
});
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
export type StreamingGuess = {
2+
response: Response;
3+
isStreaming: boolean;
4+
};
5+
6+
/**
7+
*
8+
*/
9+
export async function classifyResponseStreaming(
10+
res: Response,
11+
opts: { timeoutMs?: number } = {},
12+
): Promise<StreamingGuess> {
13+
const timeoutMs = opts.timeoutMs ?? 25;
14+
15+
if (!res.body) {
16+
return { response: res, isStreaming: false };
17+
}
18+
19+
const ct = res.headers.get('content-type') ?? '';
20+
const cl = res.headers.get('content-length');
21+
22+
// Definitive streaming indicators
23+
if (/^text\/event-stream\b/i.test(ct)) {
24+
return { response: res, isStreaming: true };
25+
}
26+
27+
// Definitive non-streaming indicators
28+
if (cl && /^\d+$/.test(cl)) {
29+
return { response: res, isStreaming: false };
30+
}
31+
32+
// Probe the stream to detect streaming behavior
33+
// NOTE: This tees the stream and returns a new Response object
34+
const [probe, pass] = res.body.tee();
35+
const reader = probe.getReader();
36+
37+
const firstChunkPromise = (async () => {
38+
try {
39+
const { value, done } = await reader.read();
40+
reader.releaseLock();
41+
if (done) return { arrivedBytes: 0, done: true };
42+
const bytes =
43+
value && typeof value === 'object' && 'byteLength' in value ? (value as { byteLength: number }).byteLength : 0;
44+
return { arrivedBytes: bytes, done: false };
45+
} catch {
46+
return { arrivedBytes: 0, done: false };
47+
}
48+
})();
49+
50+
const timeout = new Promise<{ arrivedBytes: number; done: boolean }>(r =>
51+
setTimeout(() => r({ arrivedBytes: 0, done: false }), timeoutMs),
52+
);
53+
54+
const peek = await Promise.race([firstChunkPromise, timeout]);
55+
56+
// We must return the teed response since original is now locked
57+
const preserved = new Response(pass, res);
58+
59+
let isStreaming = false;
60+
if (peek.done) {
61+
// Stream completed immediately
62+
isStreaming = false;
63+
} else if (peek.arrivedBytes === 0) {
64+
// Timeout waiting for first chunk - definitely streaming
65+
isStreaming = true;
66+
} else {
67+
// Got first chunk - streaming if no Content-Length
68+
isStreaming = cl == null;
69+
}
70+
71+
return { response: preserved, isStreaming };
72+
}

packages/cloudflare/test/durableobject.test.ts

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,11 +133,25 @@ describe('instrumentDurableObjectWithSentry', () => {
133133
waitUntil,
134134
} as unknown as ExecutionContext;
135135
const dObject: any = Reflect.construct(instrumented, [context, {} as any]);
136-
expect(() => dObject.fetch(new Request('https://example.com'))).not.toThrow();
137-
expect(flush).not.toBeCalled();
138-
expect(waitUntil).toHaveBeenCalledOnce();
136+
137+
// Call fetch (don't await yet)
138+
const responsePromise = dObject.fetch(new Request('https://example.com'));
139+
140+
// Advance past classification timeout and get response
141+
vi.advanceTimersByTime(30);
142+
const response = await responsePromise;
143+
144+
// Consume response (triggers span end for buffered responses)
145+
await response.text();
146+
147+
// The flush should now be queued in waitUntil
148+
expect(waitUntil).toHaveBeenCalled();
149+
150+
// Advance to trigger the setTimeout in the handler's waitUntil
139151
vi.advanceTimersToNextTimer();
140152
await Promise.all(waitUntil.mock.calls.map(([p]) => p));
153+
154+
// Now flush should have been called
141155
expect(flush).toBeCalled();
142156
});
143157

packages/cloudflare/test/handler.test.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,11 @@ describe('withSentry', () => {
7272
createMockExecutionContext(),
7373
);
7474

75-
expect(result).toBe(response);
75+
// Response may be wrapped for streaming detection, verify content
76+
expect(result?.status).toBe(response.status);
77+
if (result) {
78+
expect(await result.text()).toBe('test');
79+
}
7680
});
7781

7882
test('merges options from env and callback', async () => {

packages/cloudflare/test/pages-plugin.test.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ describe('sentryPagesPlugin', () => {
5252
pluginArgs: MOCK_OPTIONS,
5353
});
5454

55-
expect(result).toBe(response);
55+
// Response may be wrapped for streaming detection, verify content
56+
expect(result.status).toBe(response.status);
57+
expect(await result.text()).toBe('test');
5658
});
5759
});

packages/cloudflare/test/request.test.ts

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ describe('withSentry', () => {
3333
{ options: MOCK_OPTIONS, request: new Request('https://example.com'), context: createMockExecutionContext() },
3434
() => response,
3535
);
36-
expect(result).toBe(response);
36+
// Response may be wrapped for streaming detection, verify content matches
37+
expect(result.status).toBe(response.status);
38+
expect(await result.text()).toBe('test');
3739
});
3840

3941
test('flushes the event after the handler is done using the cloudflare context.waitUntil', async () => {
@@ -48,6 +50,25 @@ describe('withSentry', () => {
4850
expect(waitUntilSpy).toHaveBeenLastCalledWith(expect.any(Promise));
4951
});
5052

53+
test('handles streaming responses correctly', async () => {
54+
const stream = new ReadableStream({
55+
start(controller) {
56+
controller.enqueue(new TextEncoder().encode('chunk1'));
57+
controller.enqueue(new TextEncoder().encode('chunk2'));
58+
controller.close();
59+
},
60+
});
61+
const streamingResponse = new Response(stream);
62+
63+
const result = await wrapRequestHandler(
64+
{ options: MOCK_OPTIONS, request: new Request('https://example.com'), context: createMockExecutionContext() },
65+
() => streamingResponse,
66+
);
67+
68+
const text = await result.text();
69+
expect(text).toBe('chunk1chunk2');
70+
});
71+
5172
test("doesn't error if context is undefined", () => {
5273
expect(() =>
5374
wrapRequestHandler(
@@ -284,7 +305,7 @@ describe('withSentry', () => {
284305
mockRequest.headers.set('content-length', '10');
285306

286307
let sentryEvent: Event = {};
287-
await wrapRequestHandler(
308+
const result = await wrapRequestHandler(
288309
{
289310
options: {
290311
...MOCK_OPTIONS,
@@ -299,10 +320,17 @@ describe('withSentry', () => {
299320
},
300321
() => {
301322
SentryCore.captureMessage('sentry-trace');
302-
return new Response('test');
323+
const response = new Response('test');
324+
return response;
303325
},
304326
);
305327

328+
// Consume response to trigger span end for non-streaming responses
329+
await result.text();
330+
331+
// Wait for async span end and transaction capture
332+
await new Promise(resolve => setTimeout(resolve, 50));
333+
306334
expect(sentryEvent.transaction).toEqual('GET /');
307335
expect(sentryEvent.spans).toHaveLength(0);
308336
expect(sentryEvent.contexts?.trace).toEqual({

0 commit comments

Comments
 (0)