diff --git a/.fossa.yml b/.fossa.yml index 161ceb50792f..6f5dd0dfe0a6 100644 --- a/.fossa.yml +++ b/.fossa.yml @@ -154,6 +154,9 @@ targets: - type: gradle path: ./ target: ':instrumentation:hystrix-1.4:javaagent' + - type: gradle + path: ./ + target: ':instrumentation:iceberg-1.8:library' - type: gradle path: ./ target: ':instrumentation:influxdb-2.4:javaagent' diff --git a/instrumentation/iceberg-1.8/README.md b/instrumentation/iceberg-1.8/README.md new file mode 100644 index 000000000000..e218632b3b83 --- /dev/null +++ b/instrumentation/iceberg-1.8/README.md @@ -0,0 +1,43 @@ +# Library Instrumentation for Apache Iceberg Version 1.8 and Higher + +Provides OpenTelemetry instrumentation for [Apache Iceberg](https://iceberg.apache.org/). + +## Quickstart + +### Add These Dependencies to Your Project + +Replace `OPENTELEMETRY_VERSION` with the [latest release](https://central.sonatype.com/artifact/io.opentelemetry.instrumentation/opentelemetry-iceberg-1.8). + +For Maven, add to your `pom.xml` dependencies: + +```xml + + + io.opentelemetry.instrumentation + opentelemetry-iceberg-1.8 + OPENTELEMETRY_VERSION + + +``` + +For Gradle, add to your dependencies: + +```groovy +implementation("io.opentelemetry.instrumentation:opentelemetry-iceberg-1.8:OPENTELEMETRY_VERSION") +``` + +### Usage + +The instrumentation library allows creating instrumented `Scan` (e.g., `TableScan`) instances for collecting and reporting OpenTelemetry-based scan metrics. For example: + +```java +OpenTelemetry openTelemetry = // ... +IcebergTelemetry icebergTelemetry = IcebergTelemetry.create(openTelemetry); +TableScan tableScan = icebergTelemetry.wrapScan(table.newScan()); + +try (CloseableIterable fileScanTasks = tableScan.planFiles()) { + // Process the scan tasks +} + +// The metrics will be reported after the scan tasks iterable is closed +``` diff --git a/instrumentation/iceberg-1.8/library/build.gradle.kts b/instrumentation/iceberg-1.8/library/build.gradle.kts new file mode 100644 index 000000000000..739bb00d9f1a --- /dev/null +++ b/instrumentation/iceberg-1.8/library/build.gradle.kts @@ -0,0 +1,13 @@ +plugins { + id("otel.library-instrumentation") + id("otel.nullaway-conventions") +} + +dependencies { + library("org.apache.iceberg:iceberg-core:1.8.1") + testImplementation(project(":instrumentation:iceberg-1.8:testing")) +} + +otelJava { + minJavaVersionSupported.set(JavaVersion.VERSION_11) +} diff --git a/instrumentation/iceberg-1.8/library/src/main/java/io/opentelemetry/instrumentation/iceberg/v1_8/IcebergMetricsReporter.java b/instrumentation/iceberg-1.8/library/src/main/java/io/opentelemetry/instrumentation/iceberg/v1_8/IcebergMetricsReporter.java new file mode 100644 index 000000000000..5075c5a491eb --- /dev/null +++ b/instrumentation/iceberg-1.8/library/src/main/java/io/opentelemetry/instrumentation/iceberg/v1_8/IcebergMetricsReporter.java @@ -0,0 +1,194 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.iceberg.v1_8; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.LongGauge; +import java.util.Locale; +import org.apache.iceberg.metrics.CounterResult; +import org.apache.iceberg.metrics.MetricsReport; +import org.apache.iceberg.metrics.MetricsReporter; +import org.apache.iceberg.metrics.ScanMetricsResult; +import org.apache.iceberg.metrics.ScanReport; +import org.apache.iceberg.metrics.TimerResult; + +public class IcebergMetricsReporter implements MetricsReporter { + private static final String INSTRUMENTATION_NAME = "io.opentelemetry.iceberg-1.8"; + private static final AttributeKey SCHEMA_ID = AttributeKey.longKey("iceberg.schema.id"); + private static final AttributeKey TABLE_NAME = + AttributeKey.stringKey("iceberg.table.name"); + private static final AttributeKey SNAPHSOT_ID = AttributeKey.longKey("iceberg.snapshot.id"); + + private final OpenTelemetry openTelemetry; + + IcebergMetricsReporter(OpenTelemetry openTelemetry) { + this.openTelemetry = openTelemetry; + } + + @Override + public void report(MetricsReport report) { + if (report instanceof ScanReport) { + reportScanMetrics((ScanReport) report); + } + } + + void reportScanMetrics(ScanReport scanReport) { + Attributes scanAttributes = + Attributes.of( + SCHEMA_ID, + Long.valueOf(scanReport.schemaId()), + TABLE_NAME, + scanReport.tableName(), + SNAPHSOT_ID, + scanReport.snapshotId()); + ScanMetricsResult metrics = scanReport.scanMetrics(); + TimerResult duration = metrics.totalPlanningDuration(); + + if (duration != null) { + LongGauge metric = + ScanMetricsBuilder.totalPlanningDuration( + openTelemetry.getMeter(INSTRUMENTATION_NAME), + duration.timeUnit().name().toLowerCase(Locale.getDefault())); + metric.set(duration.totalDuration().toMillis(), scanAttributes); + } + + CounterResult current = metrics.resultDataFiles(); + + if (current != null) { + LongCounter metric = + ScanMetricsBuilder.scannedDataFilesCount(openTelemetry.getMeter(INSTRUMENTATION_NAME)); + metric.add(current.value(), scanAttributes); + } + + current = metrics.resultDeleteFiles(); + + if (current != null) { + LongCounter metric = + ScanMetricsBuilder.scannedDeleteFilesCount(openTelemetry.getMeter(INSTRUMENTATION_NAME)); + metric.add(current.value(), scanAttributes); + } + + current = metrics.scannedDataManifests(); + + if (current != null) { + LongCounter metric = + ScanMetricsBuilder.scannedDataManifestsCount( + openTelemetry.getMeter(INSTRUMENTATION_NAME)); + metric.add(current.value(), scanAttributes); + } + + current = metrics.scannedDeleteManifests(); + + if (current != null) { + LongCounter metric = + ScanMetricsBuilder.scannedDeleteManifestsCount( + openTelemetry.getMeter(INSTRUMENTATION_NAME)); + metric.add(current.value(), scanAttributes); + } + + current = metrics.totalDataManifests(); + + if (current != null) { + LongCounter metric = + ScanMetricsBuilder.totalDataManifestsCount(openTelemetry.getMeter(INSTRUMENTATION_NAME)); + metric.add(current.value(), scanAttributes); + } + + current = metrics.totalDeleteManifests(); + + if (current != null) { + LongCounter metric = + ScanMetricsBuilder.totalDeleteManifestsCount( + openTelemetry.getMeter(INSTRUMENTATION_NAME)); + metric.add(current.value(), scanAttributes); + } + + current = metrics.totalFileSizeInBytes(); + + if (current != null) { + LongCounter metric = + ScanMetricsBuilder.scannedDataFilesSize(openTelemetry.getMeter(INSTRUMENTATION_NAME)); + metric.add(current.value(), scanAttributes); + } + + current = metrics.totalDeleteFileSizeInBytes(); + + if (current != null) { + LongCounter metric = + ScanMetricsBuilder.scannedDeleteFilesSize(openTelemetry.getMeter(INSTRUMENTATION_NAME)); + metric.add(current.value(), scanAttributes); + } + + current = metrics.skippedDataManifests(); + + if (current != null) { + LongCounter metric = + ScanMetricsBuilder.skippedDataManifestsCount( + openTelemetry.getMeter(INSTRUMENTATION_NAME)); + metric.add(current.value(), scanAttributes); + } + + current = metrics.skippedDeleteManifests(); + + if (current != null) { + LongCounter metric = + ScanMetricsBuilder.skippedDeleteManifestsCount( + openTelemetry.getMeter(INSTRUMENTATION_NAME)); + metric.add(current.value(), scanAttributes); + } + + current = metrics.skippedDataFiles(); + + if (current != null) { + LongCounter metric = + ScanMetricsBuilder.skippedDataFilesCount(openTelemetry.getMeter(INSTRUMENTATION_NAME)); + metric.add(current.value(), scanAttributes); + } + + current = metrics.skippedDeleteFiles(); + + if (current != null) { + LongCounter metric = + ScanMetricsBuilder.skippedDeleteFilesCount(openTelemetry.getMeter(INSTRUMENTATION_NAME)); + metric.add(current.value(), scanAttributes); + } + + current = metrics.indexedDeleteFiles(); + + if (current != null) { + LongCounter metric = + ScanMetricsBuilder.indexedDeleteFilesCount(openTelemetry.getMeter(INSTRUMENTATION_NAME)); + metric.add(current.value(), scanAttributes); + } + + current = metrics.equalityDeleteFiles(); + + if (current != null) { + LongCounter metric = + ScanMetricsBuilder.equalityDeleteFilesCount(openTelemetry.getMeter(INSTRUMENTATION_NAME)); + metric.add(current.value(), scanAttributes); + } + + current = metrics.positionalDeleteFiles(); + + if (current != null) { + LongCounter metric = + ScanMetricsBuilder.positionDeleteFilesCount(openTelemetry.getMeter(INSTRUMENTATION_NAME)); + metric.add(current.value(), scanAttributes); + } + + current = metrics.dvs(); + + if (current != null) { + LongCounter metric = + ScanMetricsBuilder.deletionVectorFilesCount(openTelemetry.getMeter(INSTRUMENTATION_NAME)); + metric.add(current.value(), scanAttributes); + } + } +} diff --git a/instrumentation/iceberg-1.8/library/src/main/java/io/opentelemetry/instrumentation/iceberg/v1_8/IcebergTelemetry.java b/instrumentation/iceberg-1.8/library/src/main/java/io/opentelemetry/instrumentation/iceberg/v1_8/IcebergTelemetry.java new file mode 100644 index 000000000000..41f43c3b9a79 --- /dev/null +++ b/instrumentation/iceberg-1.8/library/src/main/java/io/opentelemetry/instrumentation/iceberg/v1_8/IcebergTelemetry.java @@ -0,0 +1,28 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.iceberg.v1_8; + +import io.opentelemetry.api.OpenTelemetry; +import org.apache.iceberg.Scan; +import org.apache.iceberg.ScanTask; +import org.apache.iceberg.ScanTaskGroup; + +public class IcebergTelemetry { + private final OpenTelemetry openTelemetry; + + public static IcebergTelemetry create(OpenTelemetry openTelemetry) { + return new IcebergTelemetry(openTelemetry); + } + + IcebergTelemetry(OpenTelemetry openTelemetry) { + this.openTelemetry = openTelemetry; + } + + public > T1 wrapScan( + Scan scan) { + return scan.metricsReporter(new IcebergMetricsReporter(openTelemetry)); + } +} diff --git a/instrumentation/iceberg-1.8/library/src/main/java/io/opentelemetry/instrumentation/iceberg/v1_8/ScanMetricsBuilder.java b/instrumentation/iceberg-1.8/library/src/main/java/io/opentelemetry/instrumentation/iceberg/v1_8/ScanMetricsBuilder.java new file mode 100644 index 000000000000..c6ffb9544fd0 --- /dev/null +++ b/instrumentation/iceberg-1.8/library/src/main/java/io/opentelemetry/instrumentation/iceberg/v1_8/ScanMetricsBuilder.java @@ -0,0 +1,174 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.iceberg.v1_8; + +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.LongGauge; +import io.opentelemetry.api.metrics.Meter; + +final class ScanMetricsBuilder { + private static final String ROOT = "iceberg.scan"; + private static final String TOTAL_PLANNING_DURATION = ROOT + ".planning.duration"; + private static final String RESULT_DATA_FILES = ROOT + ".scanned.data_files.count"; + private static final String RESULT_DELETE_FILES = ROOT + ".scanned.delete_files.count"; + private static final String SCANNED_DATA_MANIFESTS = ROOT + ".scanned.data_manifests.count"; + private static final String SCANNED_DELETE_MANIFESTS = ROOT + ".scanned.delete_manifests.count"; + private static final String TOTAL_DATA_MANIFESTS = ROOT + ".total.data_manifests.count"; + private static final String TOTAL_DELETE_MANIFESTS = ROOT + ".total.delete_manifests.count"; + private static final String TOTAL_FILE_SIZE_IN_BYTES = ROOT + ".scanned.data_files.size"; + private static final String TOTAL_DELETE_FILE_SIZE_IN_BYTES = ROOT + ".scanned.delete_files.size"; + private static final String SKIPPED_DATA_MANIFESTS = ROOT + ".skipped.data_manifests.count"; + private static final String SKIPPED_DELETE_MANIFESTS = ROOT + ".skipped.delete_manifests.count"; + private static final String SKIPPED_DATA_FILES = ROOT + ".skipped.data_files.count"; + private static final String SKIPPED_DELETE_FILES = ROOT + ".skipped.delete_files.count"; + private static final String INDEXED_DELETE_FILES = ROOT + ".scanned.indexed_delete_files.count"; + private static final String EQUALITY_DELETE_FILES = ROOT + ".scanned.equality_delete_files.count"; + private static final String POSITIONAL_DELETE_FILES = + ROOT + ".scanned.positional_delete_files.count"; + private static final String DVS = ROOT + ".scanned.dvs.count"; + + private ScanMetricsBuilder() { + // prevents instantiation + } + + static LongGauge totalPlanningDuration(Meter meter, String unit) { + return meter + .gaugeBuilder(TOTAL_PLANNING_DURATION) + .setDescription("The total duration needed to plan the scan.") + .setUnit(unit) + .ofLongs() + .build(); + } + + static LongCounter scannedDataFilesCount(Meter meter) { + return meter + .counterBuilder(RESULT_DATA_FILES) + .setDescription("The number of scanned data files.") + .setUnit("{file}") + .build(); + } + + static LongCounter scannedDeleteFilesCount(Meter meter) { + return meter + .counterBuilder(RESULT_DELETE_FILES) + .setDescription("The number of scanned delete files.") + .setUnit("{file}") + .build(); + } + + static LongCounter scannedDataManifestsCount(Meter meter) { + return meter + .counterBuilder(SCANNED_DATA_MANIFESTS) + .setDescription("The number of scanned data manifests.") + .setUnit("{file}") + .build(); + } + + static LongCounter scannedDeleteManifestsCount(Meter meter) { + return meter + .counterBuilder(SCANNED_DELETE_MANIFESTS) + .setDescription("The number of scanned delete manifests.") + .setUnit("{file}") + .build(); + } + + static LongCounter totalDataManifestsCount(Meter meter) { + return meter + .counterBuilder(TOTAL_DATA_MANIFESTS) + .setDescription("The number of all data manifests.") + .setUnit("{file}") + .build(); + } + + static LongCounter totalDeleteManifestsCount(Meter meter) { + return meter + .counterBuilder(TOTAL_DELETE_MANIFESTS) + .setDescription("The number of all delete manifests.") + .setUnit("{file}") + .build(); + } + + static LongCounter scannedDataFilesSize(Meter meter) { + return meter + .counterBuilder(TOTAL_FILE_SIZE_IN_BYTES) + .setDescription("The total size of all scanned data files.") + .setUnit("By") + .build(); + } + + static LongCounter scannedDeleteFilesSize(Meter meter) { + return meter + .counterBuilder(TOTAL_DELETE_FILE_SIZE_IN_BYTES) + .setDescription("The total size of all scanned delete files.") + .setUnit("By") + .build(); + } + + static LongCounter skippedDataManifestsCount(Meter meter) { + return meter + .counterBuilder(SKIPPED_DATA_MANIFESTS) + .setDescription("The number of data manifests that were skipped during the scan.") + .setUnit("{file}") + .build(); + } + + static LongCounter skippedDeleteManifestsCount(Meter meter) { + return meter + .counterBuilder(SKIPPED_DELETE_MANIFESTS) + .setDescription("The number of delete manifests that were skipped during the scan.") + .setUnit("{file}") + .build(); + } + + static LongCounter skippedDataFilesCount(Meter meter) { + return meter + .counterBuilder(SKIPPED_DATA_FILES) + .setDescription("The number of data files that were skipped during the scan.") + .setUnit("{file}") + .build(); + } + + static LongCounter skippedDeleteFilesCount(Meter meter) { + return meter + .counterBuilder(SKIPPED_DELETE_FILES) + .setDescription("The number of delete files that were skipped during the scan.") + .setUnit("{file}") + .build(); + } + + static LongCounter indexedDeleteFilesCount(Meter meter) { + return meter + .counterBuilder(INDEXED_DELETE_FILES) + .setDescription( + "The number of delete files constituting the delete file index for this scan.") + .setUnit("{file}") + .build(); + } + + static LongCounter equalityDeleteFilesCount(Meter meter) { + return meter + .counterBuilder(EQUALITY_DELETE_FILES) + .setDescription("The number of equality delete files relevant for the current scan.") + .setUnit("{file}") + .build(); + } + + static LongCounter positionDeleteFilesCount(Meter meter) { + return meter + .counterBuilder(POSITIONAL_DELETE_FILES) + .setDescription("The number of position delete files relevant for the current scan.") + .setUnit("{file}") + .build(); + } + + static LongCounter deletionVectorFilesCount(Meter meter) { + return meter + .counterBuilder(DVS) + .setDescription("The number of deletion vector (DV) files relevant for the current scan.") + .setUnit("{file}") + .build(); + } +} diff --git a/instrumentation/iceberg-1.8/library/src/test/java/io/opentelemetry/instrumentation/iceberg/v1_8/IcebergTest.java b/instrumentation/iceberg-1.8/library/src/test/java/io/opentelemetry/instrumentation/iceberg/v1_8/IcebergTest.java new file mode 100644 index 000000000000..177cb7f8a8a0 --- /dev/null +++ b/instrumentation/iceberg-1.8/library/src/test/java/io/opentelemetry/instrumentation/iceberg/v1_8/IcebergTest.java @@ -0,0 +1,29 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.iceberg.v1_8; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; +import org.apache.iceberg.TableScan; +import org.junit.jupiter.api.extension.RegisterExtension; + +class IcebergTest extends AbstractIcebergTest { + @RegisterExtension + final InstrumentationExtension testing = LibraryInstrumentationExtension.create(); + + @Override + protected InstrumentationExtension testing() { + return testing; + } + + @Override + protected TableScan configure(TableScan tableScan) { + OpenTelemetry openTelemetry = testing.getOpenTelemetry(); + IcebergTelemetry icebergTelemetry = IcebergTelemetry.create(openTelemetry); + return icebergTelemetry.wrapScan(tableScan); + } +} diff --git a/instrumentation/iceberg-1.8/metadata.yaml b/instrumentation/iceberg-1.8/metadata.yaml new file mode 100644 index 000000000000..3000e45310d8 --- /dev/null +++ b/instrumentation/iceberg-1.8/metadata.yaml @@ -0,0 +1,2 @@ +description: This standalone instrumentation enables metrics for Apache Iceberg scans. +library_link: https://iceberg.apache.org/ diff --git a/instrumentation/iceberg-1.8/testing/build.gradle.kts b/instrumentation/iceberg-1.8/testing/build.gradle.kts new file mode 100644 index 000000000000..88527621ebb4 --- /dev/null +++ b/instrumentation/iceberg-1.8/testing/build.gradle.kts @@ -0,0 +1,13 @@ +plugins { + id("otel.java-conventions") +} + +dependencies { + implementation("org.apache.iceberg:iceberg-core:1.8.1") + implementation("org.apache.iceberg:iceberg-core:1.8.1") { + artifact { + classifier = "tests" + } + } + api(project(":testing-common")) +} diff --git a/instrumentation/iceberg-1.8/testing/src/main/java/io/opentelemetry/instrumentation/iceberg/v1_8/AbstractIcebergTest.java b/instrumentation/iceberg-1.8/testing/src/main/java/io/opentelemetry/instrumentation/iceberg/v1_8/AbstractIcebergTest.java new file mode 100644 index 000000000000..9b506bd3daf6 --- /dev/null +++ b/instrumentation/iceberg-1.8/testing/src/main/java/io/opentelemetry/instrumentation/iceberg/v1_8/AbstractIcebergTest.java @@ -0,0 +1,378 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.iceberg.v1_8; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Locale; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.TestTables; +import org.apache.iceberg.TestTables.TestTable; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.metrics.CounterResult; +import org.apache.iceberg.metrics.MetricsReport; +import org.apache.iceberg.metrics.MetricsReporter; +import org.apache.iceberg.metrics.ScanReport; +import org.apache.iceberg.metrics.TimerResult; +import org.apache.iceberg.types.Types.IntegerType; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StringType; +import org.assertj.core.api.Condition; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public abstract class AbstractIcebergTest { + protected static final int FORMAT_VERSION = 2; + protected static final Schema SCHEMA = + new Schema( + NestedField.required(3, "id", IntegerType.get()), + NestedField.required(4, "data", StringType.get())); + protected static final int BUCKETS_NUMBER = 16; + protected static final PartitionSpec SPEC = + PartitionSpec.builderFor(SCHEMA).bucket("data", 16).build(); + protected static final DataFile FILE_1 = + DataFiles.builder(SPEC) + .withPath("/path/to/data-a.parquet") + .withFileSizeInBytes(10L) + .withPartitionPath("data_bucket=0") + .withRecordCount(1L) + .build(); + protected static final DataFile FILE_2 = + DataFiles.builder(SPEC) + .withPath("/path/to/data-b.parquet") + .withFileSizeInBytes(10L) + .withPartitionPath("data_bucket=1") + .withRecordCount(1L) + .withSplitOffsets(Arrays.asList(1L)) + .build(); + + @TempDir protected File tableDir = null; + protected TestTable table; + + protected abstract InstrumentationExtension testing(); + + protected abstract TableScan configure(TableScan tableScan); + + @BeforeEach + void init() { + this.table = TestTables.create(this.tableDir, "test", SCHEMA, SPEC, FORMAT_VERSION); + this.table.newFastAppend().appendFile(FILE_1).appendFile(FILE_2).commit(); + } + + @Test + void testCreateTelemetry() throws IOException { + + SimpleReporter reporter = new SimpleReporter(); + TableScan scan = + table + .newScan() + .filter(Expressions.lessThan("id", 5)) + .select("id", "data") + .metricsReporter(reporter); + scan = configure(scan); + + try (CloseableIterable tasks = scan.planFiles()) { + assertNotNull(tasks); + assertNotNull(tasks.iterator()); + } + + assertNotNull(reporter.report); + assertTrue(reporter.report instanceof ScanReport); + ScanReport expected = (ScanReport) reporter.report; + CounterResult currentExpectedMetric = expected.scanMetrics().resultDataFiles(); + + if (currentExpectedMetric != null) { + assertIcebergCounterMetric( + "iceberg.scan.scanned.data_files.count", + "{file}", + expected, + currentExpectedMetric.value()); + } else { + assertIcebergMetricNotReported("iceberg.scan.scanned.data_files.count"); + } + + currentExpectedMetric = expected.scanMetrics().resultDeleteFiles(); + + if (currentExpectedMetric != null) { + assertIcebergCounterMetric( + "iceberg.scan.scanned.delete_files.count", + "{file}", + expected, + currentExpectedMetric.value()); + } else { + assertIcebergMetricNotReported("iceberg.scan.scanned.delete_files.count"); + } + + currentExpectedMetric = expected.scanMetrics().scannedDataManifests(); + + if (currentExpectedMetric != null) { + assertIcebergCounterMetric( + "iceberg.scan.scanned.data_manifests.count", + "{file}", + expected, + currentExpectedMetric.value()); + } else { + assertIcebergMetricNotReported("iceberg.scan.scanned.data_manifests.count"); + } + + currentExpectedMetric = expected.scanMetrics().scannedDeleteManifests(); + + if (currentExpectedMetric != null) { + assertIcebergCounterMetric( + "iceberg.scan.scanned.delete_manifests.count", + "{file}", + expected, + currentExpectedMetric.value()); + } else { + assertIcebergMetricNotReported("iceberg.scan.scanned.delete_manifests.count"); + } + + currentExpectedMetric = expected.scanMetrics().totalDataManifests(); + + if (currentExpectedMetric != null) { + assertIcebergCounterMetric( + "iceberg.scan.total.data_manifests.count", + "{file}", + expected, + currentExpectedMetric.value()); + } else { + assertIcebergMetricNotReported("iceberg.scan.total.data_manifests.count"); + } + + currentExpectedMetric = expected.scanMetrics().totalDeleteManifests(); + + if (currentExpectedMetric != null) { + assertIcebergCounterMetric( + "iceberg.scan.total.delete_manifests.count", + "{file}", + expected, + currentExpectedMetric.value()); + } else { + assertIcebergMetricNotReported("iceberg.scan.total.delete_manifests.count"); + } + + currentExpectedMetric = expected.scanMetrics().totalFileSizeInBytes(); + + if (currentExpectedMetric != null) { + assertIcebergCounterMetric( + "iceberg.scan.scanned.data_files.size", "By", expected, currentExpectedMetric.value()); + } else { + assertIcebergMetricNotReported("iceberg.scan.scanned.data_files.size"); + } + + currentExpectedMetric = expected.scanMetrics().totalDeleteFileSizeInBytes(); + + if (currentExpectedMetric != null) { + assertIcebergCounterMetric( + "iceberg.scan.scanned.delete_files.size", "By", expected, currentExpectedMetric.value()); + } else { + assertIcebergMetricNotReported("iceberg.scan.scanned.delete_files.size"); + } + + currentExpectedMetric = expected.scanMetrics().skippedDataManifests(); + + if (currentExpectedMetric != null) { + assertIcebergCounterMetric( + "iceberg.scan.skipped.data_manifests.count", + "{file}", + expected, + currentExpectedMetric.value()); + } else { + assertIcebergMetricNotReported("iceberg.scan.skipped.data_manifests.count"); + } + + currentExpectedMetric = expected.scanMetrics().skippedDeleteManifests(); + + if (currentExpectedMetric != null) { + assertIcebergCounterMetric( + "iceberg.scan.skipped.delete_manifests.count", + "{file}", + expected, + currentExpectedMetric.value()); + } else { + assertIcebergMetricNotReported("iceberg.scan.skipped.delete_manifests.count"); + } + + currentExpectedMetric = expected.scanMetrics().skippedDataFiles(); + + if (currentExpectedMetric != null) { + assertIcebergCounterMetric( + "iceberg.scan.skipped.data_files.count", + "{file}", + expected, + currentExpectedMetric.value()); + } else { + assertIcebergMetricNotReported("iceberg.scan.skipped.data_files.count"); + } + + currentExpectedMetric = expected.scanMetrics().skippedDeleteFiles(); + + if (currentExpectedMetric != null) { + assertIcebergCounterMetric( + "iceberg.scan.skipped.delete_files.count", + "{file}", + expected, + currentExpectedMetric.value()); + } else { + assertIcebergMetricNotReported("iceberg.scan.skipped.delete_files.count"); + } + + currentExpectedMetric = expected.scanMetrics().indexedDeleteFiles(); + + if (currentExpectedMetric != null) { + assertIcebergCounterMetric( + "iceberg.scan.scanned.indexed_delete_files.count", + "{file}", + expected, + currentExpectedMetric.value()); + } else { + assertIcebergMetricNotReported("iceberg.scan.scanned.indexed_delete_files.count"); + } + + currentExpectedMetric = expected.scanMetrics().equalityDeleteFiles(); + + if (currentExpectedMetric != null) { + assertIcebergCounterMetric( + "iceberg.scan.scanned.equality_delete_files.count", + "{file}", + expected, + currentExpectedMetric.value()); + } else { + assertIcebergMetricNotReported("iceberg.scan.scanned.equality_delete_files.count"); + } + + currentExpectedMetric = expected.scanMetrics().positionalDeleteFiles(); + + if (currentExpectedMetric != null) { + assertIcebergCounterMetric( + "iceberg.scan.scanned.positional_delete_files.count", + "{file}", + expected, + currentExpectedMetric.value()); + } else { + assertIcebergMetricNotReported("iceberg.scan.scanned.positional_delete_files.count"); + } + + currentExpectedMetric = expected.scanMetrics().dvs(); + + if (currentExpectedMetric != null) { + assertIcebergCounterMetric( + "iceberg.scan.scanned.dvs.count", "{file}", expected, currentExpectedMetric.value()); + } else { + assertIcebergMetricNotReported("iceberg.scan.scanned.dvs.count"); + } + + TimerResult timer = expected.scanMetrics().totalPlanningDuration(); + + if (timer != null) { + assertIcebergGaugeMetric( + "iceberg.scan.planning.duration", + timer.timeUnit().name().toLowerCase(Locale.getDefault()), + expected, + timer.totalDuration().toMillis()); + } else { + assertIcebergMetricNotReported("iceberg.scan.planning.duration"); + } + } + + private void assertIcebergMetricNotReported(String otelMetricName) { + testing() + .waitAndAssertMetrics( + otelMetricName, + metricAssert -> + metricAssert.doesNotHave( + new Condition<>( + spanData -> otelMetricName.equals(spanData.getName()), + "metric is not reported"))); + } + + private void assertIcebergGaugeMetric( + String otelMetricName, String expectedUnit, ScanReport expectedReport, long expectedValue) { + testing() + .waitAndAssertMetrics( + "io.opentelemetry.iceberg-1.8", + metricAssert -> + metricAssert + .hasName(otelMetricName) + .hasUnit(expectedUnit) + .hasLongGaugeSatisfying( + sum -> + sum.hasPointsSatisfying( + longAssert -> + longAssert + .hasValue(expectedValue) + .hasAttributesSatisfying( + attributes -> + assertEquals( + Attributes.builder() + .put( + "iceberg.schema.id", + expectedReport.schemaId()) + .put( + "iceberg.table.name", + expectedReport.tableName()) + .put( + "iceberg.snapshot.id", + expectedReport.snapshotId()) + .build(), + attributes))))); + } + + private void assertIcebergCounterMetric( + String otelMetricName, String expectedUnit, ScanReport expectedReport, long expectedValue) { + testing() + .waitAndAssertMetrics( + "io.opentelemetry.iceberg-1.8", + metricAssert -> + metricAssert + .hasName(otelMetricName) + .hasUnit(expectedUnit) + .hasLongSumSatisfying( + sum -> + sum.hasPointsSatisfying( + longSumAssert -> + longSumAssert + .hasValue(expectedValue) + .hasAttributesSatisfying( + attributes -> + assertEquals( + Attributes.builder() + .put( + "iceberg.schema.id", + expectedReport.schemaId()) + .put( + "iceberg.table.name", + expectedReport.tableName()) + .put( + "iceberg.snapshot.id", + expectedReport.snapshotId()) + .build(), + attributes))))); + } + + static final class SimpleReporter implements MetricsReporter { + MetricsReport report; + + @Override + public void report(MetricsReport report) { + this.report = report; + } + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index d97ed912157c..63d5ac2b2045 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -303,6 +303,8 @@ include(":instrumentation:hikaricp-3.0:library") include(":instrumentation:hikaricp-3.0:testing") include(":instrumentation:http-url-connection:javaagent") include(":instrumentation:hystrix-1.4:javaagent") +include(":instrumentation:iceberg-1.8:library") +include(":instrumentation:iceberg-1.8:testing") include(":instrumentation:influxdb-2.4:javaagent") include(":instrumentation:internal:internal-application-logger:bootstrap") include(":instrumentation:internal:internal-application-logger:javaagent")