From 565075a320ef1da061a874f1ef632c0977bfdd6d Mon Sep 17 00:00:00 2001 From: david remy Date: Mon, 24 Jul 2023 14:08:29 +0200 Subject: [PATCH] DBZ-2862 Improved OpenTelemetry Handling in Debezium - Implemented a dynamic solution for handling version changes in OpenTelemetry, using Java introspection to determine the correct interceptor class at runtime. - Utilized a custom "ProducerInterceptor" to delegate logic based on the presence of specific OpenTelemetry classes. - Removed direct usage of OpenTelemetry's internal API 'ConfigUtils'. - Aligned solution with Debezium's aim of ensuring compatibility with Strimzi. This update ensures a more robust handling of OpenTelemetry's version transitions. --- debezium-bom/pom.xml | 6 +- debezium-core/pom.xml | 4 +- .../transforms/tracing/TracingSpanUtil.java | 21 ------ debezium-interceptor/pom.xml | 4 +- .../DebeziumTracingProducerInterceptor.java | 68 +++++++++++++++---- .../debezium/tracing/InterceptorVersion.java | 57 ++++++++++++++++ 6 files changed, 118 insertions(+), 42 deletions(-) create mode 100644 debezium-interceptor/src/main/java/io/debezium/tracing/InterceptorVersion.java diff --git a/debezium-bom/pom.xml b/debezium-bom/pom.xml index 859fa2657..a9680cea6 100644 --- a/debezium-bom/pom.xml +++ b/debezium-bom/pom.xml @@ -29,7 +29,7 @@ 1.56.1 - 1.23.0-alpha + 1.23.0 4.8.1 @@ -284,8 +284,8 @@ - io.opentelemetry.instrumentation - opentelemetry-kafka-clients-2.6 + io.opentelemetry + opentelemetry-api ${version.opentelemetry} diff --git a/debezium-core/pom.xml b/debezium-core/pom.xml index 5ea650398..7549633b2 100644 --- a/debezium-core/pom.xml +++ b/debezium-core/pom.xml @@ -50,8 +50,8 @@ - io.opentelemetry.instrumentation - opentelemetry-kafka-clients-2.6 + io.opentelemetry + opentelemetry-api provided diff --git a/debezium-core/src/main/java/io/debezium/transforms/tracing/TracingSpanUtil.java b/debezium-core/src/main/java/io/debezium/transforms/tracing/TracingSpanUtil.java index ab4036ab9..97db77bc4 100644 --- a/debezium-core/src/main/java/io/debezium/transforms/tracing/TracingSpanUtil.java +++ b/debezium-core/src/main/java/io/debezium/transforms/tracing/TracingSpanUtil.java @@ -17,7 +17,6 @@ import io.debezium.data.Envelope; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.OpenTelemetry; -import io.opentelemetry.api.internal.ConfigUtil; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanBuilder; import io.opentelemetry.api.trace.SpanKind; @@ -32,10 +31,6 @@ public class TracingSpanUtil { private static final String DB_FIELDS_PREFIX = "db."; private static final String TX_LOG_WRITE_OPERATION_NAME = "db-log-write"; - private static final String ARG_OTEL_JAVAAGENT_ENABLED = "otel.javaagent.enabled"; - private static final String ARG_OTEL_INSTRUMENTATION_OPENTELEMETRY_API_ENABLED = "otel.instrumentation.opentelemetry-api.enabled"; - private static final boolean OPEN_TELEMETRY_JAVAAGENT_ENABLE = isOpenTelemetryJavaagentEnable(); - private static final boolean OPEN_TELEMETRY_API_ENABLE = isOpenTelemetryApiEnable(); private static final String TRACING_COMPONENT = TracingSpanUtil.class.getName(); private static final OpenTelemetry openTelemetry = GlobalOpenTelemetry.get(); private static final Tracer tracer = openTelemetry.getTracer(TRACING_COMPONENT); @@ -45,14 +40,6 @@ private TracingSpanUtil() { } public static > R traceRecord(R connectRecord, Struct envelope, Struct source, String propagatedSpanContext, String operationName) { - if (!OPEN_TELEMETRY_JAVAAGENT_ENABLE && LOGGER.isDebugEnabled()) { - LOGGER.debug("OpenTelemetry javaagent is disabled. To enable, run your JVM with -D{}=true\"", - ARG_OTEL_JAVAAGENT_ENABLED); - } - if (!OPEN_TELEMETRY_API_ENABLE && LOGGER.isDebugEnabled()) { - LOGGER.debug("OpenTelemetry API is disabled. To enable, run your JVM with -D{}=true\"", - ARG_OTEL_INSTRUMENTATION_OPENTELEMETRY_API_ENABLED); - } if (propagatedSpanContext != null) { @@ -126,12 +113,4 @@ else if ("name".equals(field)) { span.setAttribute(targetFieldName, fieldValue.toString()); } } - - private static boolean isOpenTelemetryJavaagentEnable() { - return Boolean.parseBoolean(ConfigUtil.getString(ARG_OTEL_JAVAAGENT_ENABLED, "true")); - } - - private static boolean isOpenTelemetryApiEnable() { - return Boolean.parseBoolean(ConfigUtil.getString(ARG_OTEL_INSTRUMENTATION_OPENTELEMETRY_API_ENABLED, "true")); - } } diff --git a/debezium-interceptor/pom.xml b/debezium-interceptor/pom.xml index a11cda8a8..226bf9d2a 100644 --- a/debezium-interceptor/pom.xml +++ b/debezium-interceptor/pom.xml @@ -23,8 +23,8 @@ provided - io.opentelemetry.instrumentation - opentelemetry-kafka-clients-2.6 + io.opentelemetry + opentelemetry-api provided diff --git a/debezium-interceptor/src/main/java/io/debezium/tracing/DebeziumTracingProducerInterceptor.java b/debezium-interceptor/src/main/java/io/debezium/tracing/DebeziumTracingProducerInterceptor.java index 039f1e660..446d348f1 100644 --- a/debezium-interceptor/src/main/java/io/debezium/tracing/DebeziumTracingProducerInterceptor.java +++ b/debezium-interceptor/src/main/java/io/debezium/tracing/DebeziumTracingProducerInterceptor.java @@ -5,13 +5,18 @@ */ package io.debezium.tracing; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Map; + +import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.OpenTelemetry; -import io.opentelemetry.api.internal.ConfigUtil; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.api.trace.Tracer; @@ -19,26 +24,44 @@ import io.opentelemetry.context.Scope; import io.opentelemetry.context.propagation.TextMapGetter; import io.opentelemetry.context.propagation.TextMapPropagator; -import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor; -public class DebeziumTracingProducerInterceptor extends TracingProducerInterceptor { +public class DebeziumTracingProducerInterceptor implements ProducerInterceptor { private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumTracingProducerInterceptor.class); - public static final String ARG_OTEL_INSTRUMENTATION_KAFKA_ENABLED = "otel.instrumentation.kafka.enabled"; private static final OpenTelemetry openTelemetry = GlobalOpenTelemetry.get(); private static final Tracer tracer = openTelemetry.getTracer(DebeziumTracingProducerInterceptor.class.getName()); private static final TextMapPropagator TEXT_MAP_PROPAGATOR = openTelemetry.getPropagators().getTextMapPropagator(); private static final TextMapGetter> GETTER = KafkaProducerRecordGetter.INSTANCE; - @Override - public ProducerRecord onSend(ProducerRecord producerRecord) { - if (isInstrumentationKafkaEnabled()) { - LOGGER.warn( - "To enable end-to-end traceability with Debezium you need to disable automatic Kafka instrumentation. To disable, run your JVM with -D{}=false\"", - ARG_OTEL_INSTRUMENTATION_KAFKA_ENABLED); - return producerRecord; + private Object interceptorInstance; + private Method onSendMethod; + + public DebeziumTracingProducerInterceptor() { + InterceptorVersion[] versions = { + new InterceptorVersion("io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor"), + new InterceptorVersion("io.opentelemetry.instrumentation.kafkaclients.TracingProducerInterceptor"), + }; + + for (InterceptorVersion version : versions) { + interceptorInstance = version.createInstance(); + if (interceptorInstance != null) { + onSendMethod = version.getMethod("onSend", ProducerRecord.class); + if (onSendMethod != null) { + break; + } + } } + if (interceptorInstance == null || onSendMethod == null) { + LOGGER.error("Unable to instantiate any known version of the interceptor"); + throw new IllegalStateException("Unable to instantiate interceptor"); + } + } + + @Override + @SuppressWarnings("unchecked") + public ProducerRecord onSend(ProducerRecord producerRecord) { + Context parentContext = TEXT_MAP_PROPAGATOR.extract(Context.current(), producerRecord, GETTER); Span interceptorSpan = tracer.spanBuilder("onSend") .setSpanKind(SpanKind.INTERNAL) @@ -46,14 +69,31 @@ public ProducerRecord onSend(ProducerRecord producerRecord) { .startSpan(); try (Scope ignored = interceptorSpan.makeCurrent()) { - return super.onSend(producerRecord); + try { + return (ProducerRecord) onSendMethod.invoke(interceptorInstance, producerRecord); + } + catch (IllegalAccessException | InvocationTargetException e) { + LOGGER.error("Error invoking onSend method", e); + throw new RuntimeException("Error invoking onSend method", e); + } } finally { interceptorSpan.end(); } } - private static boolean isInstrumentationKafkaEnabled() { - return Boolean.parseBoolean(ConfigUtil.getString(ARG_OTEL_INSTRUMENTATION_KAFKA_ENABLED, "true")); + @Override + public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { + + } + + @Override + public void close() { + + } + + @Override + public void configure(Map map) { + } } diff --git a/debezium-interceptor/src/main/java/io/debezium/tracing/InterceptorVersion.java b/debezium-interceptor/src/main/java/io/debezium/tracing/InterceptorVersion.java new file mode 100644 index 000000000..677d45eeb --- /dev/null +++ b/debezium-interceptor/src/main/java/io/debezium/tracing/InterceptorVersion.java @@ -0,0 +1,57 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.tracing; + +import java.lang.reflect.Method; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class InterceptorVersion { + private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorVersion.class); + + private final String className; + private Class interceptorClass; + + public InterceptorVersion(String className) { + this.className = className; + try { + this.interceptorClass = Class.forName(className); + LOGGER.debug("Class {} found", className); + + } + catch (ClassNotFoundException e) { + LOGGER.debug("Class {} not found", className); + this.interceptorClass = null; + } + } + + public Object createInstance() { + if (interceptorClass == null) { + return null; + } + try { + return interceptorClass.getDeclaredConstructor().newInstance(); + } + catch (Exception e) { + LOGGER.error("Unable to instantiate {}", className, e); + return null; + } + } + + public Method getMethod(String name, Class... parameterTypes) { + if (interceptorClass == null) { + return null; + } + try { + return interceptorClass.getMethod(name, parameterTypes); + } + catch (Exception e) { + LOGGER.error("Unable to get method {} from {}", name, className, e); + return null; + } + } +}