From bfc3db3021460adfb7b01b310e3b67632d15df3c Mon Sep 17 00:00:00 2001 From: REMY David Date: Mon, 31 Jul 2023 16:11:32 +0200 Subject: [PATCH] DBZ-2862 Add comments, minor modifications --- .../DebeziumTracingProducerInterceptor.java | 25 +++++++++++++++---- ...a => OpenTelemetryInterceptorVersion.java} | 22 ++++++++++++---- 2 files changed, 37 insertions(+), 10 deletions(-) rename debezium-interceptor/src/main/java/io/debezium/tracing/{InterceptorVersion.java => OpenTelemetryInterceptorVersion.java} (53%) diff --git a/debezium-interceptor/src/main/java/io/debezium/tracing/DebeziumTracingProducerInterceptor.java b/debezium-interceptor/src/main/java/io/debezium/tracing/DebeziumTracingProducerInterceptor.java index 446d348f1..be611d0b8 100644 --- a/debezium-interceptor/src/main/java/io/debezium/tracing/DebeziumTracingProducerInterceptor.java +++ b/debezium-interceptor/src/main/java/io/debezium/tracing/DebeziumTracingProducerInterceptor.java @@ -36,13 +36,28 @@ public class DebeziumTracingProducerInterceptor implements ProducerInterce private Object interceptorInstance; private Method onSendMethod; + /** + * The constructor for the DebeziumTracingProducerInterceptor. + *

+ * In this interceptor, we use a dynamic approach to handle the OpenTelemetry tracing interceptor due to versioning issues. + * The problem arises because different versions of OpenTelemetry have their tracing interceptor in different packages. + * For example, in versions before 1.23.0, the tracing interceptor is in the "io.opentelemetry.instrumentation.kafkaclients" package, + * but from version 1.23.0 onwards, it is in the "io.opentelemetry.instrumentation.kafkaclients.v2_6" package. + *

+ * The OpenTelemetry interceptor is also part of an alpha package, which means it's subject to change, + * and there is no guarantee of backward compatibility. That's why a dynamic approach is used here. + * We maintain an array of possible class names (InterceptorVersion) for the OpenTelemetry tracing interceptor, + * and we attempt to instantiate one of them at runtime. We also use reflection to access the 'onSend' method from the interceptor. + *

+ * This allows the Debezium Kafka Connector to work with different versions of OpenTelemetry. + */ public DebeziumTracingProducerInterceptor() { - InterceptorVersion[] versions = { - new InterceptorVersion("io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor"), - new InterceptorVersion("io.opentelemetry.instrumentation.kafkaclients.TracingProducerInterceptor"), + OpenTelemetryInterceptorVersion[] versions = { + new OpenTelemetryInterceptorVersion("io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor"), + new OpenTelemetryInterceptorVersion("io.opentelemetry.instrumentation.kafkaclients.TracingProducerInterceptor"), }; - for (InterceptorVersion version : versions) { + for (OpenTelemetryInterceptorVersion version : versions) { interceptorInstance = version.createInstance(); if (interceptorInstance != null) { onSendMethod = version.getMethod("onSend", ProducerRecord.class); @@ -52,7 +67,7 @@ public DebeziumTracingProducerInterceptor() { } } - if (interceptorInstance == null || onSendMethod == null) { + if (onSendMethod == null) { LOGGER.error("Unable to instantiate any known version of the interceptor"); throw new IllegalStateException("Unable to instantiate interceptor"); } diff --git a/debezium-interceptor/src/main/java/io/debezium/tracing/InterceptorVersion.java b/debezium-interceptor/src/main/java/io/debezium/tracing/OpenTelemetryInterceptorVersion.java similarity index 53% rename from debezium-interceptor/src/main/java/io/debezium/tracing/InterceptorVersion.java rename to debezium-interceptor/src/main/java/io/debezium/tracing/OpenTelemetryInterceptorVersion.java index 677d45eeb..36c0679d1 100644 --- a/debezium-interceptor/src/main/java/io/debezium/tracing/InterceptorVersion.java +++ b/debezium-interceptor/src/main/java/io/debezium/tracing/OpenTelemetryInterceptorVersion.java @@ -10,13 +10,25 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class InterceptorVersion { - private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorVersion.class); +/** + * This class represents a specific version of the OpenTelemetry interceptor by its class name. + *

+ * An instance of this class attempts to dynamically load the OpenTelemetry interceptor class from the provided class name. + * If the class is present in the classpath (meaning that the relevant version of OpenTelemetry is being used), + * it gets loaded and methods can be retrieved from it for further invocation. + * If the class is not present, all operations gracefully degrade to return null, indicating that the version is not available. + *

+ * This dynamic approach allows the Debezium Kafka Connector to interact with different versions of the OpenTelemetry interceptor, + * without having a direct compile-time dependency. It provides a level of abstraction over the changes in the OpenTelemetry interceptor's + * class name and package between different versions of OpenTelemetry. + */ +public class OpenTelemetryInterceptorVersion { + private static final Logger LOGGER = LoggerFactory.getLogger(OpenTelemetryInterceptorVersion.class); private final String className; private Class interceptorClass; - public InterceptorVersion(String className) { + public OpenTelemetryInterceptorVersion(String className) { this.className = className; try { this.interceptorClass = Class.forName(className); @@ -37,7 +49,7 @@ public Object createInstance() { return interceptorClass.getDeclaredConstructor().newInstance(); } catch (Exception e) { - LOGGER.error("Unable to instantiate {}", className, e); + LOGGER.debug("Unable to instantiate {}", className, e); return null; } } @@ -50,7 +62,7 @@ public Method getMethod(String name, Class... parameterTypes) { return interceptorClass.getMethod(name, parameterTypes); } catch (Exception e) { - LOGGER.error("Unable to get method {} from {}", name, className, e); + LOGGER.debug("Unable to get method {} from {}", name, className, e); return null; } }