Skip to content

Commit 1106f8a

Browse files
committed
wip: add OpenTelemetry metrics instrumentation
1 parent 100c039 commit 1106f8a

File tree

13 files changed

+1540
-4
lines changed

13 files changed

+1540
-4
lines changed

package-lock.json

Lines changed: 92 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/client/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,4 @@ export { GEO_REPLY_WITH, GeoReplyWith } from './lib/commands/GEOSEARCH_WITH';
3535
export { SetOptions, CLIENT_KILL_FILTERS, FAILOVER_MODES, CLUSTER_SLOT_STATES, COMMAND_LIST_FILTER_BY, REDIS_FLUSH_MODES } from './lib/commands'
3636

3737
export { BasicClientSideCache, BasicPooledClientSideCache } from './lib/client/cache';
38+
export { OpenTelemetry } from './lib/opentelemetry';

packages/client/lib/client/enterprise-maintenance-manager.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import assert from "node:assert";
77
import { setTimeout } from "node:timers/promises";
88
import RedisSocket, { RedisTcpSocketOptions } from "./socket";
99
import diagnostics_channel from "node:diagnostics_channel";
10+
import { METRIC_ERROR_TYPE, OTelClientAttributes, OTelMetrics } from "../opentelemetry";
1011

1112
export const MAINTENANCE_EVENTS = {
1213
PAUSE_WRITING: "pause-writing",
@@ -51,6 +52,7 @@ interface Client {
5152
_pause: () => void;
5253
_unpause: () => void;
5354
_maintenanceUpdate: (update: MaintenanceUpdate) => void;
55+
_getClientOTelAttributes: () => OTelClientAttributes;
5456
duplicate: () => Client;
5557
connect: () => Promise<Client>;
5658
destroy: () => void;
@@ -109,6 +111,12 @@ export default class EnterpriseMaintenanceManager {
109111
if (options.maintNotifications === "enabled") {
110112
throw error;
111113
}
114+
115+
OTelMetrics.instance.recordClientErrorsHandled(METRIC_ERROR_TYPE.HANDSHAKE_FAILED, {
116+
host,
117+
// TODO add port
118+
// port: options?.socket?.port,
119+
});
112120
},
113121
};
114122
}
@@ -134,6 +142,8 @@ export default class EnterpriseMaintenanceManager {
134142

135143
const type = String(push[0]);
136144

145+
OTelMetrics.instance.recordMaintenanceNotifications(this.#client._getClientOTelAttributes());
146+
137147
emitDiagnostics({
138148
type,
139149
timestamp: Date.now(),
@@ -267,6 +277,7 @@ export default class EnterpriseMaintenanceManager {
267277
dbgMaintenance("Resume writing");
268278
this.#client._unpause();
269279
this.#onMigrated();
280+
OTelMetrics.instance.recordConnectionHandoff(this.#client._getClientOTelAttributes());
270281
};
271282

272283
#onMigrating = () => {

packages/client/lib/client/index.ts

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import { BasicCommandParser, CommandParser } from './parser';
2121
import SingleEntryCache from '../single-entry-cache';
2222
import { version } from '../../package.json'
2323
import EnterpriseMaintenanceManager, { MaintenanceUpdate, MovingEndpointType } from './enterprise-maintenance-manager';
24+
import { OTelClientAttributes, OTelMetrics } from '../opentelemetry';
2425

2526
export interface RedisClientOptions<
2627
M extends RedisModules = RedisModules,
@@ -1060,25 +1061,55 @@ export default class RedisClient<
10601061
reply;
10611062
}
10621063

1064+
/**
1065+
* @internal
1066+
*/
1067+
_getClientOTelAttributes(): OTelClientAttributes { // TODO maybe rename this to something more generic
1068+
return {
1069+
host: this._self.#socket.host,
1070+
port: this._self.#socket.port,
1071+
db: this._self.#selectedDB,
1072+
};
1073+
}
1074+
10631075
sendCommand<T = ReplyUnion>(
10641076
args: ReadonlyArray<RedisArgument>,
10651077
options?: CommandOptions
10661078
): Promise<T> {
1079+
const recordOperation = OTelMetrics.instance.createRecordOperationDuration(args, this._self._getClientOTelAttributes());
1080+
10671081
if (!this._self.#socket.isOpen) {
1082+
recordOperation(new ClientClosedError());
10681083
return Promise.reject(new ClientClosedError());
1069-
} else if (!this._self.#socket.isReady && this._self.#options.disableOfflineQueue) {
1084+
} else if (
1085+
!this._self.#socket.isReady &&
1086+
this._self.#options.disableOfflineQueue
1087+
) {
1088+
recordOperation(new ClientOfflineError());
10701089
return Promise.reject(new ClientOfflineError());
10711090
}
10721091

10731092
// Merge global options with provided options
10741093
const opts = {
10751094
...this._self._commandOptions,
1076-
...options
1077-
}
1095+
...options,
1096+
};
10781097

10791098
const promise = this._self.#queue.addCommand<T>(args, opts);
1099+
OTelMetrics.instance.recordPendingRequests(1, this._self._getClientOTelAttributes());
1100+
1101+
const trackedPromise = promise.then((reply) => {
1102+
recordOperation();
1103+
return reply;
1104+
}).catch((err) => {
1105+
recordOperation(err);
1106+
throw err;
1107+
}).finally(() => {
1108+
OTelMetrics.instance.recordPendingRequests(-1, this._self._getClientOTelAttributes());
1109+
});
1110+
10801111
this._self.#scheduleWrite();
1081-
return promise;
1112+
return trackedPromise;
10821113
}
10831114

10841115
async SELECT(db: number): Promise<void> {

packages/client/lib/client/socket.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { ConnectionTimeoutError, ClientClosedError, SocketClosedUnexpectedlyErro
55
import { setTimeout } from 'node:timers/promises';
66
import { RedisArgument } from '../RESP/types';
77
import { dbgMaintenance } from './enterprise-maintenance-manager';
8+
import { OTelMetrics } from '../opentelemetry';
89

910
type NetOptions = {
1011
tls?: false;
@@ -85,6 +86,14 @@ export default class RedisSocket extends EventEmitter {
8586
return this.#socketEpoch;
8687
}
8788

89+
get host() {
90+
return this.#socket?.remoteAddress;
91+
}
92+
93+
get port() {
94+
return this.#socket?.remotePort;
95+
}
96+
8897
constructor(initiator: RedisSocketInitiator, options?: RedisSocketOptions) {
8998
super();
9099

@@ -215,6 +224,7 @@ export default class RedisSocket extends EventEmitter {
215224
let retries = 0;
216225
do {
217226
try {
227+
const started = performance.now();
218228
this.#socket = await this.#createSocket();
219229
this.emit('connect');
220230

@@ -228,6 +238,14 @@ export default class RedisSocket extends EventEmitter {
228238
this.#isReady = true;
229239
this.#socketEpoch++;
230240
this.emit('ready');
241+
OTelMetrics.instance.recordConnectionCount(1, {
242+
host: this.host,
243+
port: this.port,
244+
});
245+
OTelMetrics.instance.recordConnectionCreateTime(performance.now() - started, {
246+
host: this.host,
247+
port: this.port,
248+
});
231249
} catch (err) {
232250
const retryIn = this.#shouldReconnect(retries++, err as Error);
233251
if (typeof retryIn !== 'number') {
@@ -252,8 +270,16 @@ export default class RedisSocket extends EventEmitter {
252270

253271
if(ms !== undefined) {
254272
this.#socket?.setTimeout(ms);
273+
OTelMetrics.instance.recordConnectionRelaxedTimeout(1, {
274+
host: this.host,
275+
port: this.port,
276+
});
255277
} else {
256278
this.#socket?.setTimeout(this.#socketTimeout ?? 0);
279+
OTelMetrics.instance.recordConnectionRelaxedTimeout(-1, {
280+
host: this.host,
281+
port: this.port,
282+
});
257283
}
258284
}
259285

@@ -304,6 +330,13 @@ export default class RedisSocket extends EventEmitter {
304330
this.#isReady = false;
305331
this.emit('error', err);
306332

333+
if (wasReady) {
334+
OTelMetrics.instance.recordConnectionCount(-1, {
335+
host: this.host,
336+
port: this.port,
337+
});
338+
}
339+
307340
if (!wasReady || !this.#isOpen || typeof this.#shouldReconnect(0, err) !== 'number') return;
308341

309342
this.emit('reconnecting');
@@ -362,6 +395,10 @@ export default class RedisSocket extends EventEmitter {
362395
this.#socket = undefined;
363396
}
364397

398+
OTelMetrics.instance.recordConnectionCount(-1, {
399+
host: this.host,
400+
port: this.port,
401+
});
365402
this.emit('end');
366403
}
367404

packages/client/lib/cluster/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import { BasicCommandParser } from '../client/parser';
1414
import { ASKING_CMD } from '../commands/ASKING';
1515
import SingleEntryCache from '../single-entry-cache'
1616
import { WithCommands, WithFunctions, WithModules, WithScripts } from '../client';
17+
import { METRIC_ERROR_TYPE, OTelMetrics } from '../opentelemetry';
1718

1819
interface ClusterCommander<
1920
M extends RedisModules,
@@ -433,6 +434,7 @@ export default class RedisCluster<
433434
}
434435

435436
if (err.message.startsWith('ASK')) {
437+
OTelMetrics.instance.recordClientErrorsHandled(METRIC_ERROR_TYPE.ASK, client._getClientOTelAttributes());
436438
const address = err.message.substring(err.message.lastIndexOf(' ') + 1);
437439
let redirectTo = await this._slots.getMasterByAddress(address);
438440
if (!redirectTo) {
@@ -450,6 +452,7 @@ export default class RedisCluster<
450452
}
451453

452454
if (err.message.startsWith('MOVED')) {
455+
OTelMetrics.instance.recordClientErrorsHandled(METRIC_ERROR_TYPE.MOVED, client._getClientOTelAttributes());
453456
await this._slots.rediscover(client);
454457
client = await this._slots.getClient(firstKey, isReadonly);
455458
continue;
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import { OTelMetrics } from "./metrics";
2+
import { ObservabilityConfig } from "./types";
3+
4+
export class OpenTelemetry {
5+
private static _instance: OpenTelemetry | null = null;
6+
7+
// Make sure it's a singleton
8+
private constructor() {}
9+
10+
public static init(config?: ObservabilityConfig) {
11+
if (OpenTelemetry._instance) {
12+
throw new Error("OpenTelemetry already initialized");
13+
}
14+
15+
let api: typeof import("@opentelemetry/api") | undefined;
16+
17+
try {
18+
api = require("@opentelemetry/api");
19+
} catch (err: unknown) {
20+
// TODO add custom errors
21+
throw new Error("OpenTelemetry not found");
22+
}
23+
24+
OpenTelemetry._instance = new OpenTelemetry();
25+
OTelMetrics.init({ api, config });
26+
}
27+
}
28+
29+
export { METRIC_ERROR_TYPE, OTelClientAttributes } from "./types";
30+
export { OTelMetrics } from "./metrics";

0 commit comments

Comments
 (0)