Skip to content

Commit b9d02cf

Browse files
committed
quick refactor
1 parent 7a570db commit b9d02cf

File tree

3 files changed

+73
-69
lines changed

3 files changed

+73
-69
lines changed

packages/cloudflare/src/request.ts

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ export function wrapRequestHandler(
108108
res = await handler();
109109
setHttpStatus(span, res.status);
110110
} catch (e) {
111-
span.end(); // End span on error
111+
span.end();
112112
if (captureErrors) {
113113
captureException(e, { mechanism: { handled: false, type: 'auto.http.cloudflare' } });
114114
}
@@ -119,40 +119,29 @@ export function wrapRequestHandler(
119119
// Classify response to detect actual streaming
120120
const classification = await classifyResponseStreaming(res);
121121

122-
if (classification.isStreaming) {
122+
if (classification.isStreaming && classification.response.body) {
123123
// Streaming response detected - monitor consumption to keep span alive
124-
if (!classification.response.body) {
125-
// Shouldn't happen since isStreaming requires body, but handle gracefully
126-
span.end();
127-
waitUntil?.(flush(2000));
128-
return classification.response;
129-
}
130-
131124
const [clientStream, monitorStream] = classification.response.body.tee();
132125

133126
// Monitor stream consumption and end span when complete
134127
const streamMonitor = (async () => {
135128
const reader = monitorStream.getReader();
136129

137-
// Safety timeout to prevent infinite loops if stream hangs
138-
const timeout = setTimeout(() => {
139-
span.end();
140-
reader.cancel().catch(() => {});
141-
}, 30000); // 30 second max
130+
// Safety timeout - abort reading and end span after 5s even if stream hasn't finished
131+
const timeout = setTimeout(() => reader.cancel(), 5000);
142132

143133
try {
144134
let done = false;
145135
while (!done) {
146136
const result = await reader.read();
147137
done = result.done;
148138
}
149-
clearTimeout(timeout);
150-
span.end();
151-
} catch (err) {
152-
clearTimeout(timeout);
153-
span.end();
139+
} catch {
140+
// Stream error or cancellation - will end span in finally
154141
} finally {
142+
clearTimeout(timeout);
155143
reader.releaseLock();
144+
span.end();
156145
}
157146
})();
158147

packages/cloudflare/src/utils/streaming.ts

Lines changed: 54 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,18 @@ export type StreamingGuess = {
44
};
55

66
/**
7+
* Classifies a Response as streaming or non-streaming.
78
*
9+
* Uses multiple heuristics:
10+
* - Content-Type: text/event-stream → streaming
11+
* - Content-Length header present → not streaming
12+
* - Otherwise: probes stream with timeout to detect behavior
13+
*
14+
* Note: Probing will tee() the stream and return a new Response object.
15+
*
16+
* @param res - The Response to classify
17+
* @param opts.timeoutMs - Probe timeout in ms (default: 25)
18+
* @returns Classification result with safe-to-return Response
819
*/
920
export async function classifyResponseStreaming(
1021
res: Response,
@@ -16,57 +27,61 @@ export async function classifyResponseStreaming(
1627
return { response: res, isStreaming: false };
1728
}
1829

19-
const ct = res.headers.get('content-type') ?? '';
20-
const cl = res.headers.get('content-length');
30+
const contentType = res.headers.get('content-type') ?? '';
31+
const contentLength = res.headers.get('content-length');
2132

22-
// Definitive streaming indicators
23-
if (/^text\/event-stream\b/i.test(ct)) {
33+
// Fast path: Server-Sent Events
34+
if (/^text\/event-stream\b/i.test(contentType)) {
2435
return { response: res, isStreaming: true };
2536
}
2637

27-
// Definitive non-streaming indicators
28-
if (cl && /^\d+$/.test(cl)) {
38+
// Fast path: Content-Length indicates buffered response
39+
if (contentLength && /^\d+$/.test(contentLength)) {
2940
return { response: res, isStreaming: false };
3041
}
3142

32-
// Probe the stream to detect streaming behavior
33-
// NOTE: This tees the stream and returns a new Response object
34-
const [probe, pass] = res.body.tee();
35-
const reader = probe.getReader();
43+
// Uncertain - probe the stream to determine behavior
44+
// After tee(), must use the teed stream (original is locked)
45+
const [probeStream, passStream] = res.body.tee();
46+
const reader = probeStream.getReader();
3647

37-
const firstChunkPromise = (async () => {
38-
try {
39-
const { value, done } = await reader.read();
40-
reader.releaseLock();
41-
if (done) return { arrivedBytes: 0, done: true };
42-
const bytes =
43-
value && typeof value === 'object' && 'byteLength' in value ? (value as { byteLength: number }).byteLength : 0;
44-
return { arrivedBytes: bytes, done: false };
45-
} catch {
46-
return { arrivedBytes: 0, done: false };
47-
}
48-
})();
48+
const probeResult = await Promise.race([
49+
// Try to read first chunk
50+
(async () => {
51+
try {
52+
const { value, done } = await reader.read();
53+
reader.releaseLock();
4954

50-
const timeout = new Promise<{ arrivedBytes: number; done: boolean }>(r =>
51-
setTimeout(() => r({ arrivedBytes: 0, done: false }), timeoutMs),
52-
);
55+
if (done) {
56+
return { arrivedBytes: 0, done: true };
57+
}
5358

54-
const peek = await Promise.race([firstChunkPromise, timeout]);
59+
const bytes =
60+
value && typeof value === 'object' && 'byteLength' in value
61+
? (value as { byteLength: number }).byteLength
62+
: 0;
63+
return { arrivedBytes: bytes, done: false };
64+
} catch {
65+
return { arrivedBytes: 0, done: false };
66+
}
67+
})(),
68+
// Timeout if first chunk takes too long
69+
new Promise<{ arrivedBytes: number; done: boolean }>(resolve =>
70+
setTimeout(() => resolve({ arrivedBytes: 0, done: false }), timeoutMs),
71+
),
72+
]);
5573

56-
// We must return the teed response since original is now locked
57-
const preserved = new Response(pass, res);
74+
const teededResponse = new Response(passStream, res);
5875

59-
let isStreaming = false;
60-
if (peek.done) {
61-
// Stream completed immediately
62-
isStreaming = false;
63-
} else if (peek.arrivedBytes === 0) {
64-
// Timeout waiting for first chunk - definitely streaming
65-
isStreaming = true;
76+
// Determine if streaming based on probe result
77+
if (probeResult.done) {
78+
// Stream completed immediately - buffered
79+
return { response: teededResponse, isStreaming: false };
80+
} else if (probeResult.arrivedBytes === 0) {
81+
// Timeout waiting - definitely streaming
82+
return { response: teededResponse, isStreaming: true };
6683
} else {
67-
// Got first chunk - streaming if no Content-Length
68-
isStreaming = cl == null;
84+
// Got chunk quickly - streaming if no Content-Length
85+
return { response: teededResponse, isStreaming: contentLength == null };
6986
}
70-
71-
return { response: preserved, isStreaming };
7287
}

packages/cloudflare/test/request.test.ts

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ describe('withSentry', () => {
9090
});
9191

9292
test('flush must be called when all waitUntil are done', async () => {
93-
const flush = vi.spyOn(SentryCore.Client.prototype, 'flush');
93+
const flushSpy = vi.spyOn(SentryCore.Client.prototype, 'flush');
9494
vi.useFakeTimers();
9595
onTestFinished(() => {
9696
vi.useRealTimers();
@@ -104,13 +104,17 @@ describe('withSentry', () => {
104104

105105
await wrapRequestHandler({ options: MOCK_OPTIONS, request: new Request('https://example.com'), context }, () => {
106106
addDelayedWaitUntil(context);
107-
return new Response('test');
107+
const response = new Response('test');
108+
// Add Content-Length to skip probing
109+
response.headers.set('content-length', '4');
110+
return response;
108111
});
109-
expect(flush).not.toBeCalled();
112+
expect(flushSpy).not.toBeCalled();
110113
expect(waitUntil).toBeCalled();
111-
vi.advanceTimersToNextTimerAsync().then(() => vi.runAllTimers());
114+
await vi.advanceTimersToNextTimerAsync();
115+
vi.runAllTimers();
112116
await Promise.all(waits);
113-
expect(flush).toHaveBeenCalledOnce();
117+
expect(flushSpy).toHaveBeenCalledOnce();
114118
});
115119

116120
describe('scope instrumentation', () => {
@@ -305,7 +309,7 @@ describe('withSentry', () => {
305309
mockRequest.headers.set('content-length', '10');
306310

307311
let sentryEvent: Event = {};
308-
const result = await wrapRequestHandler(
312+
await wrapRequestHandler(
309313
{
310314
options: {
311315
...MOCK_OPTIONS,
@@ -320,14 +324,10 @@ describe('withSentry', () => {
320324
},
321325
() => {
322326
SentryCore.captureMessage('sentry-trace');
323-
const response = new Response('test');
324-
return response;
327+
return new Response('test');
325328
},
326329
);
327330

328-
// Consume response to trigger span end for non-streaming responses
329-
await result.text();
330-
331331
// Wait for async span end and transaction capture
332332
await new Promise(resolve => setTimeout(resolve, 50));
333333

0 commit comments

Comments
 (0)