DBZ-2862 Add comments, minor modifications
This commit is contained in:
parent
565075a320
commit
bfc3db3021
@ -36,13 +36,28 @@ public class DebeziumTracingProducerInterceptor<K, V> implements ProducerInterce
|
||||
private Object interceptorInstance;
|
||||
private Method onSendMethod;
|
||||
|
||||
/**
|
||||
* The constructor for the DebeziumTracingProducerInterceptor.
|
||||
* <p>
|
||||
* In this interceptor, we use a dynamic approach to handle the OpenTelemetry tracing interceptor due to versioning issues.
|
||||
* The problem arises because different versions of OpenTelemetry have their tracing interceptor in different packages.
|
||||
* For example, in versions before 1.23.0, the tracing interceptor is in the "io.opentelemetry.instrumentation.kafkaclients" package,
|
||||
* but from version 1.23.0 onwards, it is in the "io.opentelemetry.instrumentation.kafkaclients.v2_6" package.
|
||||
* <p>
|
||||
* The OpenTelemetry interceptor is also part of an alpha package, which means it's subject to change,
|
||||
* and there is no guarantee of backward compatibility. That's why a dynamic approach is used here.
|
||||
* We maintain an array of possible class names (InterceptorVersion) for the OpenTelemetry tracing interceptor,
|
||||
* and we attempt to instantiate one of them at runtime. We also use reflection to access the 'onSend' method from the interceptor.
|
||||
* <p>
|
||||
* This allows the Debezium Kafka Connector to work with different versions of OpenTelemetry.
|
||||
*/
|
||||
public DebeziumTracingProducerInterceptor() {
|
||||
InterceptorVersion[] versions = {
|
||||
new InterceptorVersion("io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor"),
|
||||
new InterceptorVersion("io.opentelemetry.instrumentation.kafkaclients.TracingProducerInterceptor"),
|
||||
OpenTelemetryInterceptorVersion[] versions = {
|
||||
new OpenTelemetryInterceptorVersion("io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor"),
|
||||
new OpenTelemetryInterceptorVersion("io.opentelemetry.instrumentation.kafkaclients.TracingProducerInterceptor"),
|
||||
};
|
||||
|
||||
for (InterceptorVersion version : versions) {
|
||||
for (OpenTelemetryInterceptorVersion version : versions) {
|
||||
interceptorInstance = version.createInstance();
|
||||
if (interceptorInstance != null) {
|
||||
onSendMethod = version.getMethod("onSend", ProducerRecord.class);
|
||||
@ -52,7 +67,7 @@ public DebeziumTracingProducerInterceptor() {
|
||||
}
|
||||
}
|
||||
|
||||
if (interceptorInstance == null || onSendMethod == null) {
|
||||
if (onSendMethod == null) {
|
||||
LOGGER.error("Unable to instantiate any known version of the interceptor");
|
||||
throw new IllegalStateException("Unable to instantiate interceptor");
|
||||
}
|
||||
|
@ -10,13 +10,25 @@
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class InterceptorVersion {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorVersion.class);
|
||||
/**
|
||||
* This class represents a specific version of the OpenTelemetry interceptor by its class name.
|
||||
* <p>
|
||||
* An instance of this class attempts to dynamically load the OpenTelemetry interceptor class from the provided class name.
|
||||
* If the class is present in the classpath (meaning that the relevant version of OpenTelemetry is being used),
|
||||
* it gets loaded and methods can be retrieved from it for further invocation.
|
||||
* If the class is not present, all operations gracefully degrade to return null, indicating that the version is not available.
|
||||
* <p>
|
||||
* This dynamic approach allows the Debezium Kafka Connector to interact with different versions of the OpenTelemetry interceptor,
|
||||
* without having a direct compile-time dependency. It provides a level of abstraction over the changes in the OpenTelemetry interceptor's
|
||||
* class name and package between different versions of OpenTelemetry.
|
||||
*/
|
||||
public class OpenTelemetryInterceptorVersion {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(OpenTelemetryInterceptorVersion.class);
|
||||
|
||||
private final String className;
|
||||
private Class<?> interceptorClass;
|
||||
|
||||
public InterceptorVersion(String className) {
|
||||
public OpenTelemetryInterceptorVersion(String className) {
|
||||
this.className = className;
|
||||
try {
|
||||
this.interceptorClass = Class.forName(className);
|
||||
@ -37,7 +49,7 @@ public Object createInstance() {
|
||||
return interceptorClass.getDeclaredConstructor().newInstance();
|
||||
}
|
||||
catch (Exception e) {
|
||||
LOGGER.error("Unable to instantiate {}", className, e);
|
||||
LOGGER.debug("Unable to instantiate {}", className, e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@ -50,7 +62,7 @@ public Method getMethod(String name, Class<?>... parameterTypes) {
|
||||
return interceptorClass.getMethod(name, parameterTypes);
|
||||
}
|
||||
catch (Exception e) {
|
||||
LOGGER.error("Unable to get method {} from {}", name, className, e);
|
||||
LOGGER.debug("Unable to get method {} from {}", name, className, e);
|
||||
return null;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user