diff --git a/util/opentelemetry-util-genai/CHANGELOG.md b/util/opentelemetry-util-genai/CHANGELOG.md index 873df73a1d..7a7fb7d9b3 100644 --- a/util/opentelemetry-util-genai/CHANGELOG.md +++ b/util/opentelemetry-util-genai/CHANGELOG.md @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- Add metrics to LLMInvocation traces + ([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3891](#3891)) + ## Version 0.2b0 (2025-10-14) - Add jsonlines support to fsspec uploader diff --git a/util/opentelemetry-util-genai/pyproject.toml b/util/opentelemetry-util-genai/pyproject.toml index cba9252f65..4078203960 100644 --- a/util/opentelemetry-util-genai/pyproject.toml +++ b/util/opentelemetry-util-genai/pyproject.toml @@ -25,8 +25,8 @@ classifiers = [ "Programming Language :: Python :: 3.13", ] dependencies = [ - "opentelemetry-instrumentation ~= 0.57b0", - "opentelemetry-semantic-conventions ~= 0.57b0", + "opentelemetry-instrumentation ~= 0.58b0", + "opentelemetry-semantic-conventions ~= 0.58b0", "opentelemetry-api>=1.31.0", ] diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py index 0fb0771bc5..524e5e645f 100644 --- a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/handler.py @@ -64,16 +64,19 @@ from typing import Iterator, Optional from opentelemetry import context as otel_context +from opentelemetry.metrics import MeterProvider, get_meter from opentelemetry.semconv._incubating.attributes import ( gen_ai_attributes as GenAI, ) from opentelemetry.semconv.schemas import Schemas from opentelemetry.trace import ( + Span, SpanKind, TracerProvider, get_tracer, set_span_in_context, ) +from opentelemetry.util.genai.metrics import InvocationMetricsRecorder from opentelemetry.util.genai.span_utils import ( _apply_error_attributes, _apply_finish_attributes, @@ -88,13 +91,41 @@ class TelemetryHandler: them as spans, metrics, and events. """ - def __init__(self, tracer_provider: TracerProvider | None = None): + def __init__( + self, + tracer_provider: TracerProvider | None = None, + meter_provider: MeterProvider | None = None, + ): self._tracer = get_tracer( __name__, __version__, tracer_provider, - schema_url=Schemas.V1_36_0.value, + schema_url=Schemas.V1_37_0.value, ) + self._metrics_recorder: Optional[InvocationMetricsRecorder] = None + try: + meter = get_meter(__name__, meter_provider=meter_provider) + self._metrics_recorder = InvocationMetricsRecorder(meter) + except Exception: # pragma: no cover - defensive fallback # pylint: disable=broad-exception-caught + self._metrics_recorder = None + + def _record_llm_metrics( + self, + invocation: LLMInvocation, + span: Optional[Span], + *, + error_type: Optional[str] = None, + ) -> None: + if self._metrics_recorder is None or span is None: + return + try: + self._metrics_recorder.record( + span, + invocation, + error_type=error_type, + ) + except Exception: # pragma: no cover - defensive fallback # pylint: disable=broad-exception-caught + pass def start_llm( self, @@ -118,10 +149,12 @@ def stop_llm(self, invocation: LLMInvocation) -> LLMInvocation: # pylint: disab # TODO: Provide feedback that this invocation was not started return invocation - _apply_finish_attributes(invocation.span, invocation) + span = invocation.span + _apply_finish_attributes(span, invocation) + self._record_llm_metrics(invocation, span) # Detach context and end span otel_context.detach(invocation.context_token) - invocation.span.end() + span.end() return invocation def fail_llm( # pylint: disable=no-self-use @@ -132,10 +165,13 @@ def fail_llm( # pylint: disable=no-self-use # TODO: Provide feedback that this invocation was not started return invocation - _apply_error_attributes(invocation.span, error) + span = invocation.span + _apply_error_attributes(span, error) + error_type = getattr(error.type, "__qualname__", None) + self._record_llm_metrics(invocation, span, error_type=error_type) # Detach context and end span otel_context.detach(invocation.context_token) - invocation.span.end() + span.end() return invocation @contextmanager @@ -165,6 +201,7 @@ def llm( def get_telemetry_handler( tracer_provider: TracerProvider | None = None, + meter_provider: MeterProvider | None = None, ) -> TelemetryHandler: """ Returns a singleton TelemetryHandler instance. @@ -173,6 +210,9 @@ def get_telemetry_handler( get_telemetry_handler, "_default_handler", None ) if handler is None: - handler = TelemetryHandler(tracer_provider=tracer_provider) + handler = TelemetryHandler( + tracer_provider=tracer_provider, + meter_provider=meter_provider, + ) setattr(get_telemetry_handler, "_default_handler", handler) return handler diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/instruments.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/instruments.py new file mode 100644 index 0000000000..efc7acf345 --- /dev/null +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/instruments.py @@ -0,0 +1,52 @@ +from opentelemetry.metrics import Histogram, Meter +from opentelemetry.semconv._incubating.metrics import gen_ai_metrics + +_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS = [ + 0.01, + 0.02, + 0.04, + 0.08, + 0.16, + 0.32, + 0.64, + 1.28, + 2.56, + 5.12, + 10.24, + 20.48, + 40.96, + 81.92, +] + +_GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS = [ + 1, + 4, + 16, + 64, + 256, + 1024, + 4096, + 16384, + 65536, + 262144, + 1048576, + 4194304, + 16777216, + 67108864, +] + + +class Instruments: + def __init__(self, meter: Meter): + self.operation_duration_histogram: Histogram = meter.create_histogram( + name=gen_ai_metrics.GEN_AI_CLIENT_OPERATION_DURATION, + description="Duration of GenAI client operation", + unit="s", + explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS, + ) + self.token_usage_histogram: Histogram = meter.create_histogram( + name=gen_ai_metrics.GEN_AI_CLIENT_TOKEN_USAGE, + description="Number of input and output tokens used by GenAI clients", + unit="{token}", + explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS, + ) diff --git a/util/opentelemetry-util-genai/src/opentelemetry/util/genai/metrics.py b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/metrics.py new file mode 100644 index 0000000000..c69ae69a35 --- /dev/null +++ b/util/opentelemetry-util-genai/src/opentelemetry/util/genai/metrics.py @@ -0,0 +1,125 @@ +"""Helpers for emitting GenAI metrics from LLM invocations.""" + +from __future__ import annotations + +import time +from numbers import Number +from typing import Dict, Optional + +from opentelemetry.metrics import Histogram, Meter +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAI, +) +from opentelemetry.trace import Span, set_span_in_context +from opentelemetry.util.genai.instruments import Instruments +from opentelemetry.util.genai.types import LLMInvocation +from opentelemetry.util.types import AttributeValue + +_NS_PER_SECOND = 1_000_000_000 + + +def _now_ns() -> int: + return time.time_ns() + + +def _get_span_start_time_ns(span: Optional[Span]) -> Optional[int]: + if span is None: + return None + for attr in ("start_time", "_start_time"): + value = getattr(span, attr, None) + if isinstance(value, int): + return value + return None + + +def _calculate_duration_seconds(span: Optional[Span]) -> Optional[float]: + """Calculate duration in seconds from span start time to now.""" + start_time_ns = _get_span_start_time_ns(span) + if start_time_ns is None: + return None + elapsed_ns = max(_now_ns() - start_time_ns, 0) + return elapsed_ns / _NS_PER_SECOND + + +class InvocationMetricsRecorder: + """Records duration and token usage histograms for GenAI invocations.""" + + def __init__(self, meter: Meter): + instruments = Instruments(meter) + self._duration_histogram: Histogram = ( + instruments.operation_duration_histogram + ) + self._token_histogram: Histogram = instruments.token_usage_histogram + + def record( + self, + span: Optional[Span], + invocation: LLMInvocation, + *, + error_type: Optional[str] = None, + ) -> None: + """Record duration and token metrics for an invocation if possible.""" + if span is None: + return + + tokens: list[tuple[int, str]] = [] + if isinstance(invocation.input_tokens, int): + tokens.append( + ( + invocation.input_tokens, + GenAI.GenAiTokenTypeValues.INPUT.value, + ) + ) + if isinstance(invocation.output_tokens, int): + tokens.append( + ( + invocation.output_tokens, + GenAI.GenAiTokenTypeValues.COMPLETION.value, + ) + ) + + if not tokens: + return + + attributes: Dict[str, AttributeValue] = { + GenAI.GEN_AI_OPERATION_NAME: GenAI.GenAiOperationNameValues.CHAT.value + } + if invocation.request_model: + attributes[GenAI.GEN_AI_REQUEST_MODEL] = invocation.request_model + if invocation.provider: + attributes[GenAI.GEN_AI_PROVIDER_NAME] = invocation.provider + if invocation.response_model_name: + attributes[GenAI.GEN_AI_RESPONSE_MODEL] = ( + invocation.response_model_name + ) + + # Calculate duration from span timing + duration_seconds = _calculate_duration_seconds(span) + + span_context = set_span_in_context(span) + if error_type: + attributes["error.type"] = error_type + + if ( + duration_seconds is not None + and isinstance(duration_seconds, Number) + and duration_seconds >= 0 + ): + duration_attributes: Dict[str, AttributeValue] = dict(attributes) + self._duration_histogram.record( + float(duration_seconds), + attributes=duration_attributes, + context=span_context, + ) + + for token in tokens: + token_attributes: Dict[str, AttributeValue] = dict(attributes) + token_attributes[GenAI.GEN_AI_TOKEN_TYPE] = token[1] + self._token_histogram.record( + token[0], + attributes=token_attributes, + context=span_context, + ) + + +__all__ = ["InvocationMetricsRecorder"] diff --git a/util/opentelemetry-util-genai/tests/test_handler_metrics.py b/util/opentelemetry-util-genai/tests/test_handler_metrics.py new file mode 100644 index 0000000000..8a17e8e35e --- /dev/null +++ b/util/opentelemetry-util-genai/tests/test_handler_metrics.py @@ -0,0 +1,160 @@ +from __future__ import annotations + +from typing import Any, Dict, List +from unittest import TestCase +from unittest.mock import patch + +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import InMemoryMetricReader +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAI, +) +from opentelemetry.util.genai.handler import TelemetryHandler +from opentelemetry.util.genai.types import Error, LLMInvocation + + +class TelemetryHandlerMetricsTest(TestCase): + def setUp(self) -> None: + self.metric_reader = InMemoryMetricReader() + self.meter_provider = MeterProvider( + metric_readers=[self.metric_reader] + ) + self.span_exporter = InMemorySpanExporter() + self.tracer_provider = TracerProvider() + self.tracer_provider.add_span_processor( + SimpleSpanProcessor(self.span_exporter) + ) + + def test_stop_llm_records_duration_and_tokens(self) -> None: + handler = TelemetryHandler( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + invocation = LLMInvocation(request_model="model", provider="prov") + invocation.input_tokens = 5 + invocation.output_tokens = 7 + handler.start_llm(invocation) + span = invocation.span + self.assertIsNotNone(span) + start_ns = self._get_span_start_time(span) + self.assertIsNotNone(start_ns) + + with patch( + "time.time_ns", + return_value=start_ns + 2_000_000_000, + ): + handler.stop_llm(invocation) + + metrics = self._harvest_metrics() + self.assertIn("gen_ai.client.operation.duration", metrics) + duration_points = metrics["gen_ai.client.operation.duration"] + self.assertEqual(len(duration_points), 1) + duration_point = duration_points[0] + self.assertEqual( + duration_point.attributes[GenAI.GEN_AI_OPERATION_NAME], + GenAI.GenAiOperationNameValues.CHAT.value, + ) + self.assertEqual( + duration_point.attributes[GenAI.GEN_AI_REQUEST_MODEL], "model" + ) + self.assertEqual( + duration_point.attributes[GenAI.GEN_AI_PROVIDER_NAME], "prov" + ) + self.assertAlmostEqual(duration_point.sum, 2.0, places=3) + + self.assertIn("gen_ai.client.token.usage", metrics) + token_points = metrics["gen_ai.client.token.usage"] + token_by_type = { + point.attributes[GenAI.GEN_AI_TOKEN_TYPE]: point + for point in token_points + } + self.assertEqual(len(token_by_type), 2) + self.assertAlmostEqual( + token_by_type[GenAI.GenAiTokenTypeValues.INPUT.value].sum, + 5.0, + places=3, + ) + self.assertAlmostEqual( + token_by_type[GenAI.GenAiTokenTypeValues.COMPLETION.value].sum, + 7.0, + places=3, + ) + + def test_fail_llm_records_error_and_available_tokens(self) -> None: + handler = TelemetryHandler( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + invocation = LLMInvocation(request_model="err-model", provider=None) + invocation.input_tokens = 11 + handler.start_llm(invocation) + span = invocation.span + self.assertIsNotNone(span) + start_ns = self._get_span_start_time(span) + self.assertIsNotNone(start_ns) + + error = Error(message="boom", type=ValueError) + with patch( + "time.time_ns", + return_value=start_ns + 1_000_000_000, + ): + handler.fail_llm(invocation, error) + + metrics = self._harvest_metrics() + self.assertIn("gen_ai.client.operation.duration", metrics) + duration_points = metrics["gen_ai.client.operation.duration"] + self.assertEqual(len(duration_points), 1) + duration_point = duration_points[0] + self.assertEqual( + duration_point.attributes.get("error.type"), "ValueError" + ) + self.assertEqual( + duration_point.attributes.get(GenAI.GEN_AI_REQUEST_MODEL), + "err-model", + ) + self.assertAlmostEqual(duration_point.sum, 1.0, places=3) + + self.assertIn("gen_ai.client.token.usage", metrics) + token_points = metrics["gen_ai.client.token.usage"] + self.assertEqual(len(token_points), 1) + token_point = token_points[0] + self.assertEqual( + token_point.attributes[GenAI.GEN_AI_TOKEN_TYPE], + GenAI.GenAiTokenTypeValues.INPUT.value, + ) + self.assertAlmostEqual(token_point.sum, 11.0, places=3) + + @staticmethod + def _get_span_start_time(span) -> int: + for attr in ("start_time", "_start_time"): + value = getattr(span, attr, None) + if isinstance(value, int): + return value + raise AssertionError("Span start time not available") + + def _harvest_metrics(self) -> Dict[str, List[Any]]: + try: + self.meter_provider.force_flush() + except Exception: # pylint: disable=broad-except + pass + self.metric_reader.collect() + metrics_by_name: Dict[str, List[Any]] = {} + data = self.metric_reader.get_metrics_data() + for resource_metric in getattr(data, "resource_metrics", []) or []: + for scope_metric in ( + getattr(resource_metric, "scope_metrics", []) or [] + ): + for metric in getattr(scope_metric, "metrics", []) or []: + points = list( + getattr(metric.data, "data_points", []) or [] + ) + if points: + metrics_by_name.setdefault(metric.name, []).extend( + points + ) + return metrics_by_name