@@ -7,152 +7,114 @@ import { MACHINE_METADATA } from "./constants.js";
77import { EventCache } from "./eventCache.js" ;
88import nodeMachineId from "node-machine-id" ;
99import { getDeviceId } from "@mongodb-js/device-id" ;
10- import fs from "fs/promises" ;
11-
12- async function fileExists ( filePath : string ) : Promise < boolean > {
13- try {
14- await fs . access ( filePath , fs . constants . F_OK ) ;
15- return true ; // File exists
16- } catch ( e : unknown ) {
17- if (
18- e instanceof Error &&
19- (
20- e as Error & {
21- code : string ;
22- }
23- ) . code === "ENOENT"
24- ) {
25- return false ; // File does not exist
26- }
27- throw e ; // Re-throw unexpected errors
28- }
29- }
3010
31- async function isContainerized ( ) : Promise < boolean > {
32- if ( process . env . container ) {
33- return true ;
34- }
35-
36- const exists = await Promise . all ( [ "/.dockerenv" , "/run/.containerenv" , "/var/run/.containerenv" ] . map ( fileExists ) ) ;
11+ type EventResult = {
12+ success : boolean ;
13+ error ?: Error ;
14+ } ;
3715
38- return exists . includes ( true ) ;
39- }
16+ export const DEVICE_ID_TIMEOUT = 3000 ;
4017
4118export class Telemetry {
19+ private isBufferingEvents : boolean = true ;
20+ /** Resolves when the device ID is retrieved or timeout occurs */
21+ public deviceIdPromise : Promise < string > | undefined ;
4222 private deviceIdAbortController = new AbortController ( ) ;
4323 private eventCache : EventCache ;
4424 private getRawMachineId : ( ) => Promise < string > ;
45- private getContainerEnv : ( ) => Promise < boolean > ;
46- private cachedCommonProperties ?: CommonProperties ;
47- private flushing : boolean = false ;
4825
4926 private constructor (
5027 private readonly session : Session ,
5128 private readonly userConfig : UserConfig ,
52- {
53- eventCache,
54- getRawMachineId,
55- getContainerEnv,
56- } : {
57- eventCache : EventCache ;
58- getRawMachineId : ( ) => Promise < string > ;
59- getContainerEnv : ( ) => Promise < boolean > ;
60- }
29+ private readonly commonProperties : CommonProperties ,
30+ { eventCache, getRawMachineId } : { eventCache : EventCache ; getRawMachineId : ( ) => Promise < string > }
6131 ) {
6232 this . eventCache = eventCache ;
6333 this . getRawMachineId = getRawMachineId ;
64- this . getContainerEnv = getContainerEnv ;
6534 }
6635
6736 static create (
6837 session : Session ,
6938 userConfig : UserConfig ,
7039 {
40+ commonProperties = { ...MACHINE_METADATA } ,
7141 eventCache = EventCache . getInstance ( ) ,
7242 getRawMachineId = ( ) => nodeMachineId . machineId ( true ) ,
73- getContainerEnv = isContainerized ,
7443 } : {
7544 eventCache ?: EventCache ;
7645 getRawMachineId ?: ( ) => Promise < string > ;
77- getContainerEnv ?: ( ) => Promise < boolean > ;
46+ commonProperties ?: CommonProperties ;
7847 } = { }
7948 ) : Telemetry {
80- const instance = new Telemetry ( session , userConfig , {
81- eventCache,
82- getRawMachineId,
83- getContainerEnv,
84- } ) ;
49+ const instance = new Telemetry ( session , userConfig , commonProperties , { eventCache, getRawMachineId } ) ;
8550
51+ void instance . start ( ) ;
8652 return instance ;
8753 }
8854
55+ private async start ( ) : Promise < void > {
56+ if ( ! this . isTelemetryEnabled ( ) ) {
57+ return ;
58+ }
59+ this . deviceIdPromise = getDeviceId ( {
60+ getMachineId : ( ) => this . getRawMachineId ( ) ,
61+ onError : ( reason , error ) => {
62+ switch ( reason ) {
63+ case "resolutionError" :
64+ logger . debug ( LogId . telemetryDeviceIdFailure , "telemetry" , String ( error ) ) ;
65+ break ;
66+ case "timeout" :
67+ logger . debug ( LogId . telemetryDeviceIdTimeout , "telemetry" , "Device ID retrieval timed out" ) ;
68+ break ;
69+ case "abort" :
70+ // No need to log in the case of aborts
71+ break ;
72+ }
73+ } ,
74+ abortSignal : this . deviceIdAbortController . signal ,
75+ } ) ;
76+
77+ this . commonProperties . device_id = await this . deviceIdPromise ;
78+
79+ this . isBufferingEvents = false ;
80+ }
81+
8982 public async close ( ) : Promise < void > {
9083 this . deviceIdAbortController . abort ( ) ;
91- await this . flush ( ) ;
84+ this . isBufferingEvents = false ;
85+ await this . emitEvents ( this . eventCache . getEvents ( ) ) ;
9286 }
9387
9488 /**
9589 * Emits events through the telemetry pipeline
9690 * @param events - The events to emit
9791 */
98- public emitEvents ( events : BaseEvent [ ] ) : void {
99- void this . flush ( events ) ;
92+ public async emitEvents ( events : BaseEvent [ ] ) : Promise < void > {
93+ try {
94+ if ( ! this . isTelemetryEnabled ( ) ) {
95+ logger . info ( LogId . telemetryEmitFailure , "telemetry" , `Telemetry is disabled.` ) ;
96+ return ;
97+ }
98+
99+ await this . emit ( events ) ;
100+ } catch {
101+ logger . debug ( LogId . telemetryEmitFailure , "telemetry" , `Error emitting telemetry events.` ) ;
102+ }
100103 }
101104
102105 /**
103106 * Gets the common properties for events
104107 * @returns Object containing common properties for all events
105108 */
106- private async getCommonProperties ( ) : Promise < CommonProperties > {
107- if ( ! this . cachedCommonProperties ) {
108- let deviceId : string | undefined ;
109- let containerEnv : boolean | undefined ;
110- try {
111- await Promise . all ( [
112- getDeviceId ( {
113- getMachineId : ( ) => this . getRawMachineId ( ) ,
114- onError : ( reason , error ) => {
115- switch ( reason ) {
116- case "resolutionError" :
117- logger . debug ( LogId . telemetryDeviceIdFailure , "telemetry" , String ( error ) ) ;
118- break ;
119- case "timeout" :
120- logger . debug (
121- LogId . telemetryDeviceIdTimeout ,
122- "telemetry" ,
123- "Device ID retrieval timed out"
124- ) ;
125- break ;
126- case "abort" :
127- // No need to log in the case of aborts
128- break ;
129- }
130- } ,
131- abortSignal : this . deviceIdAbortController . signal ,
132- } ) . then ( ( id ) => {
133- deviceId = id ;
134- } ) ,
135- this . getContainerEnv ( ) . then ( ( env ) => {
136- containerEnv = env ;
137- } ) ,
138- ] ) ;
139- } catch ( error : unknown ) {
140- const err = error instanceof Error ? error : new Error ( String ( error ) ) ;
141- logger . debug ( LogId . telemetryDeviceIdFailure , "telemetry" , err . message ) ;
142- }
143- this . cachedCommonProperties = {
144- ...MACHINE_METADATA ,
145- mcp_client_version : this . session . agentRunner ?. version ,
146- mcp_client_name : this . session . agentRunner ?. name ,
147- session_id : this . session . sessionId ,
148- config_atlas_auth : this . session . apiClient . hasCredentials ( ) ? "true" : "false" ,
149- config_connection_string : this . userConfig . connectionString ? "true" : "false" ,
150- is_container_env : containerEnv ? "true" : "false" ,
151- device_id : deviceId ,
152- } ;
153- }
154-
155- return this . cachedCommonProperties ;
109+ public getCommonProperties ( ) : CommonProperties {
110+ return {
111+ ...this . commonProperties ,
112+ mcp_client_version : this . session . agentRunner ?. version ,
113+ mcp_client_name : this . session . agentRunner ?. name ,
114+ session_id : this . session . sessionId ,
115+ config_atlas_auth : this . session . apiClient . hasCredentials ( ) ? "true" : "false" ,
116+ config_connection_string : this . userConfig . connectionString ? "true" : "false" ,
117+ } ;
156118 }
157119
158120 /**
@@ -173,74 +135,60 @@ export class Telemetry {
173135 }
174136
175137 /**
176- * Attempts to flush events through authenticated and unauthenticated clients
138+ * Attempts to emit events through authenticated and unauthenticated clients
177139 * Falls back to caching if both attempts fail
178140 */
179- public async flush ( events ?: BaseEvent [ ] ) : Promise < void > {
180- if ( ! this . isTelemetryEnabled ( ) ) {
181- logger . info ( LogId . telemetryEmitFailure , "telemetry" , `Telemetry is disabled.` ) ;
182- return ;
183- }
184-
185- if ( this . flushing ) {
186- this . eventCache . appendEvents ( events ?? [ ] ) ;
187- process . nextTick ( async ( ) => {
188- // try again if in the middle of a flush
189- await this . flush ( ) ;
190- } ) ;
141+ private async emit ( events : BaseEvent [ ] ) : Promise < void > {
142+ if ( this . isBufferingEvents ) {
143+ this . eventCache . appendEvents ( events ) ;
191144 return ;
192145 }
193146
194- this . flushing = true ;
147+ const cachedEvents = this . eventCache . getEvents ( ) ;
148+ const allEvents = [ ...cachedEvents , ...events ] ;
195149
196- try {
197- const cachedEvents = this . eventCache . getEvents ( ) ;
198- const allEvents = [ ...cachedEvents , ...( events ?? [ ] ) ] ;
199- if ( allEvents . length <= 0 ) {
200- this . flushing = false ;
201- return ;
202- }
203-
204- logger . debug (
205- LogId . telemetryEmitStart ,
206- "telemetry" ,
207- `Attempting to send ${ allEvents . length } events (${ cachedEvents . length } cached)`
208- ) ;
150+ logger . debug (
151+ LogId . telemetryEmitStart ,
152+ "telemetry" ,
153+ `Attempting to send ${ allEvents . length } events (${ cachedEvents . length } cached)`
154+ ) ;
209155
210- await this . sendEvents ( this . session . apiClient , allEvents ) ;
156+ const result = await this . sendEvents ( this . session . apiClient , allEvents ) ;
157+ if ( result . success ) {
211158 this . eventCache . clearEvents ( ) ;
212159 logger . debug (
213160 LogId . telemetryEmitSuccess ,
214161 "telemetry" ,
215162 `Sent ${ allEvents . length } events successfully: ${ JSON . stringify ( allEvents , null , 2 ) } `
216163 ) ;
217- } catch ( error : unknown ) {
218- logger . debug (
219- LogId . telemetryEmitFailure ,
220- "telemetry" ,
221- `Error sending event to client: ${ error instanceof Error ? error . message : String ( error ) } `
222- ) ;
223- this . eventCache . appendEvents ( events ?? [ ] ) ;
224- process . nextTick ( async ( ) => {
225- // try again
226- await this . flush ( ) ;
227- } ) ;
164+ return ;
228165 }
229166
230- this . flushing = false ;
167+ logger . debug (
168+ LogId . telemetryEmitFailure ,
169+ "telemetry" ,
170+ `Error sending event to client: ${ result . error instanceof Error ? result . error . message : String ( result . error ) } `
171+ ) ;
172+ this . eventCache . appendEvents ( events ) ;
231173 }
232174
233175 /**
234176 * Attempts to send events through the provided API client
235177 */
236- private async sendEvents ( client : ApiClient , events : BaseEvent [ ] ) : Promise < void > {
237- const commonProperties = await this . getCommonProperties ( ) ;
238-
239- await client . sendEvents (
240- events . map ( ( event ) => ( {
241- ...event ,
242- properties : { ...commonProperties , ...event . properties } ,
243- } ) )
244- ) ;
178+ private async sendEvents ( client : ApiClient , events : BaseEvent [ ] ) : Promise < EventResult > {
179+ try {
180+ await client . sendEvents (
181+ events . map ( ( event ) => ( {
182+ ...event ,
183+ properties : { ...this . getCommonProperties ( ) , ...event . properties } ,
184+ } ) )
185+ ) ;
186+ return { success : true } ;
187+ } catch ( error ) {
188+ return {
189+ success : false ,
190+ error : error instanceof Error ? error : new Error ( String ( error ) ) ,
191+ } ;
192+ }
245193 }
246194}
0 commit comments