DBZ-2862 Improved OpenTelemetry Handling in Debezium

- Implemented a dynamic solution for handling version changes in OpenTelemetry, using Java introspection to determine the correct interceptor class at runtime.
- Utilized a custom "ProducerInterceptor" to delegate logic based on the presence of specific OpenTelemetry classes.
- Removed direct usage of OpenTelemetry's internal API 'ConfigUtils'.
- Aligned solution with Debezium's aim of ensuring compatibility with Strimzi.

This update ensures a more robust handling of OpenTelemetry's version transitions.
This commit is contained in:
david remy 2023-07-24 14:08:29 +02:00 committed by Vojtech Juranek
parent 8a127e0173
commit 565075a320
6 changed files with 118 additions and 42 deletions

View File

@ -29,7 +29,7 @@
<version.grpc>1.56.1</version.grpc>
<!-- Tracing -->
<version.opentelemetry>1.23.0-alpha</version.opentelemetry>
<version.opentelemetry>1.23.0</version.opentelemetry>
<!-- HTTP client -->
<version.okhttp>4.8.1</version.okhttp>
@ -284,8 +284,8 @@
<!-- OpenTelemetry Integration -->
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-kafka-clients-2.6</artifactId>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
<version>${version.opentelemetry}</version>
</dependency>

View File

@ -50,8 +50,8 @@
<!-- OpenTelemetry Integration -->
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-kafka-clients-2.6</artifactId>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
<scope>provided</scope>
</dependency>

View File

@ -17,7 +17,6 @@
import io.debezium.data.Envelope;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.internal.ConfigUtil;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanKind;
@ -32,10 +31,6 @@ public class TracingSpanUtil {
private static final String DB_FIELDS_PREFIX = "db.";
private static final String TX_LOG_WRITE_OPERATION_NAME = "db-log-write";
private static final String ARG_OTEL_JAVAAGENT_ENABLED = "otel.javaagent.enabled";
private static final String ARG_OTEL_INSTRUMENTATION_OPENTELEMETRY_API_ENABLED = "otel.instrumentation.opentelemetry-api.enabled";
private static final boolean OPEN_TELEMETRY_JAVAAGENT_ENABLE = isOpenTelemetryJavaagentEnable();
private static final boolean OPEN_TELEMETRY_API_ENABLE = isOpenTelemetryApiEnable();
private static final String TRACING_COMPONENT = TracingSpanUtil.class.getName();
private static final OpenTelemetry openTelemetry = GlobalOpenTelemetry.get();
private static final Tracer tracer = openTelemetry.getTracer(TRACING_COMPONENT);
@ -45,14 +40,6 @@ private TracingSpanUtil() {
}
public static <R extends ConnectRecord<R>> R traceRecord(R connectRecord, Struct envelope, Struct source, String propagatedSpanContext, String operationName) {
if (!OPEN_TELEMETRY_JAVAAGENT_ENABLE && LOGGER.isDebugEnabled()) {
LOGGER.debug("OpenTelemetry javaagent is disabled. To enable, run your JVM with -D{}=true\"",
ARG_OTEL_JAVAAGENT_ENABLED);
}
if (!OPEN_TELEMETRY_API_ENABLE && LOGGER.isDebugEnabled()) {
LOGGER.debug("OpenTelemetry API is disabled. To enable, run your JVM with -D{}=true\"",
ARG_OTEL_INSTRUMENTATION_OPENTELEMETRY_API_ENABLED);
}
if (propagatedSpanContext != null) {
@ -126,12 +113,4 @@ else if ("name".equals(field)) {
span.setAttribute(targetFieldName, fieldValue.toString());
}
}
private static boolean isOpenTelemetryJavaagentEnable() {
return Boolean.parseBoolean(ConfigUtil.getString(ARG_OTEL_JAVAAGENT_ENABLED, "true"));
}
private static boolean isOpenTelemetryApiEnable() {
return Boolean.parseBoolean(ConfigUtil.getString(ARG_OTEL_INSTRUMENTATION_OPENTELEMETRY_API_ENABLED, "true"));
}
}

View File

@ -23,8 +23,8 @@
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-kafka-clients-2.6</artifactId>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>

View File

@ -5,13 +5,18 @@
*/
package io.debezium.tracing;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.internal.ConfigUtil;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.Tracer;
@ -19,25 +24,43 @@
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor;
public class DebeziumTracingProducerInterceptor<K, V> extends TracingProducerInterceptor<K, V> {
public class DebeziumTracingProducerInterceptor<K, V> implements ProducerInterceptor<K, V> {
private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumTracingProducerInterceptor.class);
public static final String ARG_OTEL_INSTRUMENTATION_KAFKA_ENABLED = "otel.instrumentation.kafka.enabled";
private static final OpenTelemetry openTelemetry = GlobalOpenTelemetry.get();
private static final Tracer tracer = openTelemetry.getTracer(DebeziumTracingProducerInterceptor.class.getName());
private static final TextMapPropagator TEXT_MAP_PROPAGATOR = openTelemetry.getPropagators().getTextMapPropagator();
private static final TextMapGetter<ProducerRecord<?, ?>> GETTER = KafkaProducerRecordGetter.INSTANCE;
@Override
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> producerRecord) {
if (isInstrumentationKafkaEnabled()) {
LOGGER.warn(
"To enable end-to-end traceability with Debezium you need to disable automatic Kafka instrumentation. To disable, run your JVM with -D{}=false\"",
ARG_OTEL_INSTRUMENTATION_KAFKA_ENABLED);
return producerRecord;
private Object interceptorInstance;
private Method onSendMethod;
public DebeziumTracingProducerInterceptor() {
InterceptorVersion[] versions = {
new InterceptorVersion("io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor"),
new InterceptorVersion("io.opentelemetry.instrumentation.kafkaclients.TracingProducerInterceptor"),
};
for (InterceptorVersion version : versions) {
interceptorInstance = version.createInstance();
if (interceptorInstance != null) {
onSendMethod = version.getMethod("onSend", ProducerRecord.class);
if (onSendMethod != null) {
break;
}
}
}
if (interceptorInstance == null || onSendMethod == null) {
LOGGER.error("Unable to instantiate any known version of the interceptor");
throw new IllegalStateException("Unable to instantiate interceptor");
}
}
@Override
@SuppressWarnings("unchecked")
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> producerRecord) {
Context parentContext = TEXT_MAP_PROPAGATOR.extract(Context.current(), producerRecord, GETTER);
Span interceptorSpan = tracer.spanBuilder("onSend")
@ -46,14 +69,31 @@ public ProducerRecord<K, V> onSend(ProducerRecord<K, V> producerRecord) {
.startSpan();
try (Scope ignored = interceptorSpan.makeCurrent()) {
return super.onSend(producerRecord);
try {
return (ProducerRecord<K, V>) onSendMethod.invoke(interceptorInstance, producerRecord);
}
catch (IllegalAccessException | InvocationTargetException e) {
LOGGER.error("Error invoking onSend method", e);
throw new RuntimeException("Error invoking onSend method", e);
}
}
finally {
interceptorSpan.end();
}
}
private static boolean isInstrumentationKafkaEnabled() {
return Boolean.parseBoolean(ConfigUtil.getString(ARG_OTEL_INSTRUMENTATION_KAFKA_ENABLED, "true"));
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}

View File

@ -0,0 +1,57 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.tracing;
import java.lang.reflect.Method;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class InterceptorVersion {
private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorVersion.class);
private final String className;
private Class<?> interceptorClass;
public InterceptorVersion(String className) {
this.className = className;
try {
this.interceptorClass = Class.forName(className);
LOGGER.debug("Class {} found", className);
}
catch (ClassNotFoundException e) {
LOGGER.debug("Class {} not found", className);
this.interceptorClass = null;
}
}
public Object createInstance() {
if (interceptorClass == null) {
return null;
}
try {
return interceptorClass.getDeclaredConstructor().newInstance();
}
catch (Exception e) {
LOGGER.error("Unable to instantiate {}", className, e);
return null;
}
}
public Method getMethod(String name, Class<?>... parameterTypes) {
if (interceptorClass == null) {
return null;
}
try {
return interceptorClass.getMethod(name, parameterTypes);
}
catch (Exception e) {
LOGGER.error("Unable to get method {} from {}", name, className, e);
return null;
}
}
}