Skip to content

Commit 2f940d3

Browse files
committed
quick refactor
1 parent 7a570db commit 2f940d3

File tree

3 files changed

+70
-63
lines changed

3 files changed

+70
-63
lines changed

packages/cloudflare/src/request.ts

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -112,47 +112,43 @@ export function wrapRequestHandler(
112112
if (captureErrors) {
113113
captureException(e, { mechanism: { handled: false, type: 'auto.http.cloudflare' } });
114114
}
115-
waitUntil?.(flush(2000));
116115
throw e;
117116
}
118117

119118
// Classify response to detect actual streaming
120119
const classification = await classifyResponseStreaming(res);
121120

122-
if (classification.isStreaming) {
121+
if (classification.isStreaming && classification.response.body) {
123122
// Streaming response detected - monitor consumption to keep span alive
124-
if (!classification.response.body) {
125-
// Shouldn't happen since isStreaming requires body, but handle gracefully
126-
span.end();
127-
waitUntil?.(flush(2000));
128-
return classification.response;
129-
}
130-
131123
const [clientStream, monitorStream] = classification.response.body.tee();
132124

133125
// Monitor stream consumption and end span when complete
134126
const streamMonitor = (async () => {
135127
const reader = monitorStream.getReader();
128+
let spanEnded = false;
136129

137-
// Safety timeout to prevent infinite loops if stream hangs
130+
// Safety timeout - end span after 30s even if stream hasn't finished
138131
const timeout = setTimeout(() => {
139-
span.end();
140-
reader.cancel().catch(() => {});
141-
}, 30000); // 30 second max
132+
if (!spanEnded) {
133+
spanEnded = true;
134+
span.end();
135+
}
136+
}, 5000);
142137

143138
try {
144139
let done = false;
145140
while (!done) {
146141
const result = await reader.read();
147142
done = result.done;
148143
}
149-
clearTimeout(timeout);
150-
span.end();
151-
} catch (err) {
152-
clearTimeout(timeout);
153-
span.end();
144+
} catch {
145+
// Stream error or cancellation - will end span in finally
154146
} finally {
147+
clearTimeout(timeout);
155148
reader.releaseLock();
149+
if (!spanEnded) {
150+
span.end();
151+
}
156152
}
157153
})();
158154

packages/cloudflare/src/utils/streaming.ts

Lines changed: 54 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,18 @@ export type StreamingGuess = {
44
};
55

66
/**
7+
* Classifies a Response as streaming or non-streaming.
78
*
9+
* Uses multiple heuristics:
10+
* - Content-Type: text/event-stream → streaming
11+
* - Content-Length header present → not streaming
12+
* - Otherwise: probes stream with timeout to detect behavior
13+
*
14+
* Note: Probing will tee() the stream and return a new Response object.
15+
*
16+
* @param res - The Response to classify
17+
* @param opts.timeoutMs - Probe timeout in ms (default: 25)
18+
* @returns Classification result with safe-to-return Response
819
*/
920
export async function classifyResponseStreaming(
1021
res: Response,
@@ -16,57 +27,61 @@ export async function classifyResponseStreaming(
1627
return { response: res, isStreaming: false };
1728
}
1829

19-
const ct = res.headers.get('content-type') ?? '';
20-
const cl = res.headers.get('content-length');
30+
const contentType = res.headers.get('content-type') ?? '';
31+
const contentLength = res.headers.get('content-length');
2132

22-
// Definitive streaming indicators
23-
if (/^text\/event-stream\b/i.test(ct)) {
33+
// Fast path: Server-Sent Events
34+
if (/^text\/event-stream\b/i.test(contentType)) {
2435
return { response: res, isStreaming: true };
2536
}
2637

27-
// Definitive non-streaming indicators
28-
if (cl && /^\d+$/.test(cl)) {
38+
// Fast path: Content-Length indicates buffered response
39+
if (contentLength && /^\d+$/.test(contentLength)) {
2940
return { response: res, isStreaming: false };
3041
}
3142

32-
// Probe the stream to detect streaming behavior
33-
// NOTE: This tees the stream and returns a new Response object
34-
const [probe, pass] = res.body.tee();
35-
const reader = probe.getReader();
43+
// Uncertain - probe the stream to determine behavior
44+
// After tee(), must use the teed stream (original is locked)
45+
const [probeStream, passStream] = res.body.tee();
46+
const reader = probeStream.getReader();
3647

37-
const firstChunkPromise = (async () => {
38-
try {
39-
const { value, done } = await reader.read();
40-
reader.releaseLock();
41-
if (done) return { arrivedBytes: 0, done: true };
42-
const bytes =
43-
value && typeof value === 'object' && 'byteLength' in value ? (value as { byteLength: number }).byteLength : 0;
44-
return { arrivedBytes: bytes, done: false };
45-
} catch {
46-
return { arrivedBytes: 0, done: false };
47-
}
48-
})();
48+
const probeResult = await Promise.race([
49+
// Try to read first chunk
50+
(async () => {
51+
try {
52+
const { value, done } = await reader.read();
53+
reader.releaseLock();
4954

50-
const timeout = new Promise<{ arrivedBytes: number; done: boolean }>(r =>
51-
setTimeout(() => r({ arrivedBytes: 0, done: false }), timeoutMs),
52-
);
55+
if (done) {
56+
return { arrivedBytes: 0, done: true };
57+
}
5358

54-
const peek = await Promise.race([firstChunkPromise, timeout]);
59+
const bytes =
60+
value && typeof value === 'object' && 'byteLength' in value
61+
? (value as { byteLength: number }).byteLength
62+
: 0;
63+
return { arrivedBytes: bytes, done: false };
64+
} catch {
65+
return { arrivedBytes: 0, done: false };
66+
}
67+
})(),
68+
// Timeout if first chunk takes too long
69+
new Promise<{ arrivedBytes: number; done: boolean }>(resolve =>
70+
setTimeout(() => resolve({ arrivedBytes: 0, done: false }), timeoutMs),
71+
),
72+
]);
5573

56-
// We must return the teed response since original is now locked
57-
const preserved = new Response(pass, res);
74+
const teededResponse = new Response(passStream, res);
5875

59-
let isStreaming = false;
60-
if (peek.done) {
61-
// Stream completed immediately
62-
isStreaming = false;
63-
} else if (peek.arrivedBytes === 0) {
64-
// Timeout waiting for first chunk - definitely streaming
65-
isStreaming = true;
76+
// Determine if streaming based on probe result
77+
if (probeResult.done) {
78+
// Stream completed immediately - buffered
79+
return { response: teededResponse, isStreaming: false };
80+
} else if (probeResult.arrivedBytes === 0) {
81+
// Timeout waiting - definitely streaming
82+
return { response: teededResponse, isStreaming: true };
6683
} else {
67-
// Got first chunk - streaming if no Content-Length
68-
isStreaming = cl == null;
84+
// Got chunk quickly - streaming if no Content-Length
85+
return { response: teededResponse, isStreaming: contentLength == null };
6986
}
70-
71-
return { response: preserved, isStreaming };
7287
}

packages/cloudflare/test/request.test.ts

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ describe('withSentry', () => {
305305
mockRequest.headers.set('content-length', '10');
306306

307307
let sentryEvent: Event = {};
308-
const result = await wrapRequestHandler(
308+
await wrapRequestHandler(
309309
{
310310
options: {
311311
...MOCK_OPTIONS,
@@ -320,14 +320,10 @@ describe('withSentry', () => {
320320
},
321321
() => {
322322
SentryCore.captureMessage('sentry-trace');
323-
const response = new Response('test');
324-
return response;
323+
return new Response('test');
325324
},
326325
);
327326

328-
// Consume response to trigger span end for non-streaming responses
329-
await result.text();
330-
331327
// Wait for async span end and transaction capture
332328
await new Promise(resolve => setTimeout(resolve, 50));
333329

0 commit comments

Comments
 (0)