@@ -277,6 +277,16 @@ function extractTraceAndBaggageFromMessage(message: { _sentry?: { sentry_trace?:
277277 return { } ;
278278}
279279
280+ /**
281+ * Instruments the RPC consumer methods of a Supabase client.
282+ *
283+ * A span is only created when we can match the consumer operation to its corresponding producer span.
284+ *
285+ * @param target - The original function to instrument.
286+ * @param thisArg - The context to bind the function to.
287+ * @param argumentsList - The arguments to pass to the function.
288+ * @returns A promise that resolves with the result of the original function.
289+ */
280290const instrumentRpcConsumer = ( target : any , thisArg : any , argumentsList : any [ ] ) : Promise < unknown > => {
281291 const [ operationName , queueParams ] = argumentsList as [
282292 'pop' ,
@@ -292,102 +302,136 @@ const instrumentRpcConsumer = (target: any, thisArg: any, argumentsList: any[]):
292302 return Reflect . apply ( target , thisArg , argumentsList ) ; // Not a consumer operation
293303 }
294304
295- return ( Reflect . apply ( target , thisArg , argumentsList ) as Promise < SupabaseResponse > ) . then ( ( res : SupabaseResponse ) => {
296- const latency = res . data ?. [ 0 ] ?. enqueued_at ? Date . now ( ) - Date . parse ( res . data ?. [ 0 ] ?. enqueued_at ) : undefined ;
297-
298- const { sentryTrace, baggage } = extractTraceAndBaggageFromMessage ( res . data ?. [ 0 ] ?. message || { } ) ;
299-
300- // Remove Sentry metadata from the returned message
301- delete res . data ?. [ 0 ] ?. message ?. _sentry ;
302-
303- return continueTrace (
304- {
305- sentryTrace,
306- baggage,
305+ return startSpan (
306+ {
307+ name : 'supabase.queue.receive' ,
308+ op : 'queue.process' ,
309+ attributes : {
310+ [ SEMANTIC_ATTRIBUTE_SENTRY_OP ] : 'queue.process' ,
311+ [ SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN ] : 'auto.db.supabase' ,
312+ 'messaging.system' : 'supabase' ,
307313 } ,
308- ( ) => {
309- return startSpan (
310- {
311- name : 'supabase.db.rpc' ,
312- attributes : {
313- [ SEMANTIC_ATTRIBUTE_SENTRY_OP ] : 'queue.process' ,
314- [ SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN ] : 'auto.db.supabase' ,
315- 'messaging.system' : 'supabase' ,
316- } ,
317- } ,
318- span => {
319- const messageId =
320- res ?. data ?. map ( item => ( typeof item === 'number' ? item : item . msg_id ) ) . join ( ',' ) || undefined ;
314+ } ,
315+ async ( span ) => {
316+ try {
317+ // Call the original function
318+ const res = await Reflect . apply ( target , thisArg , argumentsList ) as SupabaseResponse ;
321319
322- if ( messageId ) {
323- span . setAttribute ( 'messaging.message.id' , messageId ) ;
324- }
320+ // Calculate latency if possible
321+ const latency = res . data ?. [ 0 ] ?. enqueued_at ? Date . now ( ) - Date . parse ( res . data ?. [ 0 ] ?. enqueued_at ) : undefined ;
325322
326- if ( queueName ) {
327- span . setAttribute ( 'messaging.destination.name' , queueName ) ;
328- }
323+ // Extract trace context
324+ const { sentryTrace, baggage } = extractTraceAndBaggageFromMessage ( res . data ?. [ 0 ] ?. message || { } ) ;
329325
330- if ( latency ) {
331- span . setAttribute ( 'messaging.message.receive.latency' , latency ) ;
332- }
333-
334- const breadcrumb : SupabaseBreadcrumb = {
335- type : 'supabase' ,
336- category : `db.rpc.${ argumentsList [ 0 ] } ` ,
337- message : `rpc(${ argumentsList [ 0 ] } )` ,
338- } ;
326+ // Remove Sentry metadata from the returned message
327+ delete res . data ?. [ 0 ] ?. message ?. _sentry ;
339328
340- const data : Record < string , unknown > = { } ;
329+ // Get message ID if available
330+ const messageId = res ?. data ?. map ( item => ( typeof item === 'number' ? item : item . msg_id ) ) . join ( ',' ) || undefined ;
341331
342- if ( messageId ) {
343- data [ 'messaging.message.id' ] = messageId ;
344- }
332+ // Set span attributes
333+ if ( messageId ) {
334+ span . setAttribute ( 'messaging.message.id' , messageId ) ;
335+ }
345336
346- if ( queueName ) {
347- data [ 'messaging.destination.name' ] = queueName ;
348- }
337+ if ( queueName ) {
338+ span . setAttribute ( 'messaging.destination.name' , queueName ) ;
339+ }
349340
350- if ( Object . keys ( data ) . length ) {
351- breadcrumb . data = data ;
352- }
341+ if ( latency ) {
342+ span . setAttribute ( 'messaging.message.receive.latency' , latency ) ;
343+ }
353344
354- addBreadcrumb ( breadcrumb ) ;
345+ // Add breadcrumb for monitoring
346+ const breadcrumb : SupabaseBreadcrumb = {
347+ type : 'supabase' ,
348+ category : `db.rpc.${ argumentsList [ 0 ] } ` ,
349+ message : `rpc(${ argumentsList [ 0 ] } )` ,
350+ } ;
355351
356- if ( res . error ) {
357- const err = new Error ( res . error . message ) as SupabaseError ;
352+ const data : Record < string , unknown > = { } ;
353+ if ( messageId ) data [ 'messaging.message.id' ] = messageId ;
354+ if ( queueName ) data [ 'messaging.destination.name' ] = queueName ;
355+ if ( Object . keys ( data ) . length ) breadcrumb . data = data ;
358356
359- if ( res . error . code ) {
360- err . code = res . error . code ;
361- }
357+ addBreadcrumb ( breadcrumb ) ;
362358
363- if ( res . error . details ) {
364- err . details = res . error . details ;
365- }
359+ // Handle errors in the response
360+ if ( res . error ) {
361+ const err = new Error ( res . error . message ) as SupabaseError ;
362+ if ( res . error . code ) err . code = res . error . code ;
363+ if ( res . error . details ) err . details = res . error . details ;
366364
367- captureException ( err , {
368- contexts : {
369- supabase : {
370- queueName,
371- messageId,
372- } ,
373- } ,
374- } ) ;
365+ captureException ( err , {
366+ contexts : {
367+ supabase : { queueName, messageId } ,
368+ } ,
369+ } ) ;
375370
376- span . setStatus ( { code : SPAN_STATUS_ERROR } ) ;
377- } else {
378- span . setStatus ( { code : SPAN_STATUS_OK } ) ;
379- }
371+ span . setStatus ( { code : SPAN_STATUS_ERROR } ) ;
372+ } else {
373+ span . setStatus ( { code : SPAN_STATUS_OK } ) ;
374+ }
380375
381- span . end ( ) ;
376+ // Continue trace if we have the trace context
377+ if ( sentryTrace || baggage ) {
378+ return continueTrace (
379+ { sentryTrace, baggage } ,
380+ ( ) => startSpan (
381+ {
382+ name : 'supabase.db.rpc' ,
383+ op : 'queue.process' ,
384+ attributes : {
385+ [ SEMANTIC_ATTRIBUTE_SENTRY_OP ] : 'queue.process' ,
386+ [ SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN ] : 'auto.db.supabase' ,
387+ 'messaging.system' : 'supabase' ,
388+ } ,
389+ } ,
390+ processSpan => {
391+ if ( messageId ) {
392+ processSpan . setAttribute ( 'messaging.message.id' , messageId ) ;
393+ }
394+
395+ if ( queueName ) {
396+ processSpan . setAttribute ( 'messaging.destination.name' , queueName ) ;
397+ }
398+
399+ if ( latency ) {
400+ processSpan . setAttribute ( 'messaging.message.receive.latency' , latency ) ;
401+ }
402+
403+ if ( res . error ) {
404+ processSpan . setStatus ( { code : SPAN_STATUS_ERROR } ) ;
405+ } else {
406+ processSpan . setStatus ( { code : SPAN_STATUS_OK } ) ;
407+ }
408+
409+ processSpan . end ( ) ;
410+ return res ;
411+ }
412+ )
413+ ) ;
414+ }
382415
383- return res ;
384- } ,
385- ) ;
386- } ,
387- ) ;
388- } ) ;
416+ return res ;
417+ } catch ( error ) {
418+ span . setStatus ( { code : SPAN_STATUS_ERROR } ) ;
419+ throw error ;
420+ } finally {
421+ span . end ( ) ;
422+ }
423+ }
424+ ) ;
389425} ;
390426
427+ /**
428+ * Instruments the RPC producer methods of a Supabase client.
429+ *
430+ * @param target - The original function to instrument.
431+ * @param thisArg - The context to bind the function to.
432+ * @param argumentsList - The arguments to pass to the function.
433+ * @returns A promise that resolves with the result of the original function.
434+ */
391435function instrumentRpcProducer ( target : any , thisArg : any , argumentsList : any [ ] ) : Promise < unknown > {
392436 const maybeQueueParams = argumentsList [ 1 ] ;
393437
@@ -507,13 +551,19 @@ function instrumentRpcProducer(target: any, thisArg: any, argumentsList: any[]):
507551 ) ;
508552}
509553
554+ /**
555+ * Instruments the RPC methods of a Supabase client.
556+ *
557+ * @param SupabaseClient - The Supabase client instance to instrument.
558+ */
510559function instrumentRpc ( SupabaseClient : unknown ) : void {
511560 ( SupabaseClient as unknown as SupabaseClientInstance ) . rpc = new Proxy (
512561 ( SupabaseClient as unknown as SupabaseClientInstance ) . rpc ,
513562 {
514563 apply ( target , thisArg , argumentsList ) {
515564 let result : Promise < unknown > ;
516565
566+ // Check if the first argument is 'send', 'send_batch', or 'pop' to determine if it's a producer or consumer operation
517567 if ( argumentsList [ 0 ] === 'send' || argumentsList [ 0 ] === 'send_batch' ) {
518568 result = instrumentRpcProducer ( target , thisArg , argumentsList ) ;
519569 } else if ( argumentsList [ 0 ] === 'pop' ) {
0 commit comments