Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 59 additions & 20 deletions packages/cloudflare/src/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ import {
parseStringToURLObject,
SEMANTIC_ATTRIBUTE_SENTRY_OP,
setHttpStatus,
startSpan,
startSpanManual,
winterCGHeadersToDict,
withIsolationScope,
} from '@sentry/core';
import type { CloudflareOptions } from './client';
import { addCloudResourceContext, addCultureContext, addRequest } from './scope-utils';
import { init } from './sdk';
import { classifyResponseStreaming } from './utils/streaming';

interface RequestHandlerWrapperOptions {
options: CloudflareOptions;
Expand Down Expand Up @@ -98,26 +99,64 @@ export function wrapRequestHandler(
// Note: This span will not have a duration unless I/O happens in the handler. This is
// because of how the cloudflare workers runtime works.
// See: https://developers.cloudflare.com/workers/runtime-apis/performance/
return startSpan(
{
name,
attributes,
},
async span => {
try {
const res = await handler();
setHttpStatus(span, res.status);
return res;
} catch (e) {
if (captureErrors) {
captureException(e, { mechanism: { handled: false, type: 'auto.http.cloudflare' } });
}
throw e;
} finally {
waitUntil?.(flush(2000));

// Use startSpanManual to control when span ends (needed for streaming responses)
return startSpanManual({ name, attributes }, async span => {
let res: Response;

try {
res = await handler();
setHttpStatus(span, res.status);
} catch (e) {
span.end();
if (captureErrors) {
captureException(e, { mechanism: { handled: false, type: 'auto.http.cloudflare' } });
}
},
);
waitUntil?.(flush(2000));
throw e;
}

// Classify response to detect actual streaming
const classification = await classifyResponseStreaming(res);

if (classification.isStreaming && classification.response.body) {
// Streaming response detected - monitor consumption to keep span alive
const [clientStream, monitorStream] = classification.response.body.tee();

// Monitor stream consumption and end span when complete
const streamMonitor = (async () => {
const reader = monitorStream.getReader();

try {
let done = false;
while (!done) {
const result = await reader.read();
done = result.done;
}
} catch {
// Stream error or cancellation - will end span in finally
} finally {
reader.releaseLock();
span.end();
waitUntil?.(flush(2000));
}
})();

waitUntil?.(streamMonitor);

// Return response with client stream
return new Response(clientStream, {
status: classification.response.status,
statusText: classification.response.statusText,
headers: classification.response.headers,
});
}

// Non-streaming response - end span immediately and return original
span.end();
waitUntil?.(flush(2000));
return classification.response;
});
},
);
});
Expand Down
64 changes: 64 additions & 0 deletions packages/cloudflare/src/utils/streaming.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
export type StreamingGuess = {
response: Response;
isStreaming: boolean;
};

/**
* Classifies a Response as streaming or non-streaming.
*
* Uses multiple heuristics:
* - No body → not streaming
* - Content-Type: text/event-stream → streaming
* - Content-Length header present → not streaming
* - Otherwise: attempts immediate read to detect behavior
* - Stream empty (done) → not streaming
* - Got data without Content-Length → streaming
* - Got data with Content-Length → not streaming
*
* Note: Probing will tee() the stream and return a new Response object.
*
* @param res - The Response to classify
* @returns Classification result with safe-to-return Response
*/
export async function classifyResponseStreaming(res: Response): Promise<StreamingGuess> {
if (!res.body) {
return { response: res, isStreaming: false };
}

const contentType = res.headers.get('content-type') ?? '';
const contentLength = res.headers.get('content-length');

// Fast path: Server-Sent Events
if (/^text\/event-stream\b/i.test(contentType)) {
return { response: res, isStreaming: true };
}

// Fast path: Content-Length indicates buffered response
if (contentLength && /^\d+$/.test(contentLength)) {
return { response: res, isStreaming: false };
}

// Probe the stream by trying to read first chunk immediately
// After tee(), must use the teed stream (original is locked)
const [probeStream, passStream] = res.body.tee();
const reader = probeStream.getReader();

try {
const { done } = await reader.read();
reader.releaseLock();

const teededResponse = new Response(passStream, res);

if (done) {
// Stream completed immediately - buffered (empty body)
return { response: teededResponse, isStreaming: false };
}

// Got data - treat as streaming if no Content-Length header
return { response: teededResponse, isStreaming: contentLength == null };
} catch {
reader.releaseLock();
// Error reading - treat as non-streaming to be safe
return { response: new Response(passStream, res), isStreaming: false };
}
}
20 changes: 17 additions & 3 deletions packages/cloudflare/test/durableobject.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,25 @@ describe('instrumentDurableObjectWithSentry', () => {
waitUntil,
} as unknown as ExecutionContext;
const dObject: any = Reflect.construct(instrumented, [context, {} as any]);
expect(() => dObject.fetch(new Request('https://example.com'))).not.toThrow();
expect(flush).not.toBeCalled();
expect(waitUntil).toHaveBeenCalledOnce();

// Call fetch (don't await yet)
const responsePromise = dObject.fetch(new Request('https://example.com'));

// Advance past classification timeout and get response
vi.advanceTimersByTime(30);
const response = await responsePromise;

// Consume response (triggers span end for buffered responses)
await response.text();

// The flush should now be queued in waitUntil
expect(waitUntil).toHaveBeenCalled();

// Advance to trigger the setTimeout in the handler's waitUntil
vi.advanceTimersToNextTimer();
await Promise.all(waitUntil.mock.calls.map(([p]) => p));

// Now flush should have been called
expect(flush).toBeCalled();
});

Expand Down
6 changes: 5 additions & 1 deletion packages/cloudflare/test/handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ describe('withSentry', () => {
createMockExecutionContext(),
);

expect(result).toBe(response);
// Response may be wrapped for streaming detection, verify content
expect(result?.status).toBe(response.status);
if (result) {
expect(await result.text()).toBe('test');
}
});

test('merges options from env and callback', async () => {
Expand Down
4 changes: 3 additions & 1 deletion packages/cloudflare/test/pages-plugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ describe('sentryPagesPlugin', () => {
pluginArgs: MOCK_OPTIONS,
});

expect(result).toBe(response);
// Response may be wrapped for streaming detection, verify content
expect(result.status).toBe(response.status);
expect(await result.text()).toBe('test');
});
});
40 changes: 34 additions & 6 deletions packages/cloudflare/test/request.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ describe('withSentry', () => {
{ options: MOCK_OPTIONS, request: new Request('https://example.com'), context: createMockExecutionContext() },
() => response,
);
expect(result).toBe(response);
// Response may be wrapped for streaming detection, verify content matches
expect(result.status).toBe(response.status);
expect(await result.text()).toBe('test');
});

test('flushes the event after the handler is done using the cloudflare context.waitUntil', async () => {
Expand All @@ -48,6 +50,25 @@ describe('withSentry', () => {
expect(waitUntilSpy).toHaveBeenLastCalledWith(expect.any(Promise));
});

test('handles streaming responses correctly', async () => {
const stream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode('chunk1'));
controller.enqueue(new TextEncoder().encode('chunk2'));
controller.close();
},
});
const streamingResponse = new Response(stream);

const result = await wrapRequestHandler(
{ options: MOCK_OPTIONS, request: new Request('https://example.com'), context: createMockExecutionContext() },
() => streamingResponse,
);

const text = await result.text();
expect(text).toBe('chunk1chunk2');
});

test("doesn't error if context is undefined", () => {
expect(() =>
wrapRequestHandler(
Expand All @@ -69,7 +90,7 @@ describe('withSentry', () => {
});

test('flush must be called when all waitUntil are done', async () => {
const flush = vi.spyOn(SentryCore.Client.prototype, 'flush');
const flushSpy = vi.spyOn(SentryCore.Client.prototype, 'flush');
vi.useFakeTimers();
onTestFinished(() => {
vi.useRealTimers();
Expand All @@ -83,13 +104,17 @@ describe('withSentry', () => {

await wrapRequestHandler({ options: MOCK_OPTIONS, request: new Request('https://example.com'), context }, () => {
addDelayedWaitUntil(context);
return new Response('test');
const response = new Response('test');
// Add Content-Length to skip probing
response.headers.set('content-length', '4');
return response;
});
expect(flush).not.toBeCalled();
expect(flushSpy).not.toBeCalled();
expect(waitUntil).toBeCalled();
vi.advanceTimersToNextTimerAsync().then(() => vi.runAllTimers());
await vi.advanceTimersToNextTimerAsync();
vi.runAllTimers();
await Promise.all(waits);
expect(flush).toHaveBeenCalledOnce();
expect(flushSpy).toHaveBeenCalledOnce();
});

describe('scope instrumentation', () => {
Expand Down Expand Up @@ -303,6 +328,9 @@ describe('withSentry', () => {
},
);

// Wait for async span end and transaction capture
await new Promise(resolve => setTimeout(resolve, 50));

expect(sentryEvent.transaction).toEqual('GET /');
expect(sentryEvent.spans).toHaveLength(0);
expect(sentryEvent.contexts?.trace).toEqual({
Expand Down
Loading