diff --git a/debezium-bom/pom.xml b/debezium-bom/pom.xml
index 5c8bd8978..fd1821a59 100644
--- a/debezium-bom/pom.xml
+++ b/debezium-bom/pom.xml
@@ -29,8 +29,7 @@
1.56.1
- 1.21.0
- 1.22.1-alpha
+ 1.23.0-alpha
4.8.1
diff --git a/debezium-core/pom.xml b/debezium-core/pom.xml
index 1e1f9efc7..620975347 100644
--- a/debezium-core/pom.xml
+++ b/debezium-core/pom.xml
@@ -52,6 +52,7 @@
io.opentelemetry.instrumentation
opentelemetry-kafka-clients-2.6
+ provided
diff --git a/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouterDelegate.java b/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouterDelegate.java
index 60fa2c690..b03b13fec 100644
--- a/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouterDelegate.java
+++ b/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouterDelegate.java
@@ -112,7 +112,7 @@ public R apply(R r, RecordConverter recordConverter) {
r = recordConverter.convert(r);
- if (ActivateTracingSpan.isOpenTracingAvailable()) {
+ if (ActivateTracingSpan.isOpenTelemetryAvailable()) {
tracingSmt.apply(r);
}
@@ -312,13 +312,13 @@ public ConfigDef config() {
}
public void close() {
- if (ActivateTracingSpan.isOpenTracingAvailable()) {
+ if (ActivateTracingSpan.isOpenTelemetryAvailable()) {
tracingSmt.close();
}
}
public void configure(Map configMap) {
- if (ActivateTracingSpan.isOpenTracingAvailable()) {
+ if (ActivateTracingSpan.isOpenTelemetryAvailable()) {
tracingSmt.configure(configMap);
if (!configMap.containsKey(ActivateTracingSpan.TRACING_CONTEXT_FIELD_REQUIRED.name())) {
tracingSmt.setRequireContextField(true);
diff --git a/debezium-core/src/main/java/io/debezium/transforms/tracing/ActivateTracingSpan.java b/debezium-core/src/main/java/io/debezium/transforms/tracing/ActivateTracingSpan.java
index b983b0768..354a1c4bf 100644
--- a/debezium-core/src/main/java/io/debezium/transforms/tracing/ActivateTracingSpan.java
+++ b/debezium-core/src/main/java/io/debezium/transforms/tracing/ActivateTracingSpan.java
@@ -6,15 +6,11 @@
package io.debezium.transforms.tracing;
import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
-import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.transforms.Transformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -22,21 +18,10 @@
import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
-import io.debezium.connector.AbstractSourceInfo;
import io.debezium.data.Envelope;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.transforms.SmtManager;
import io.opentelemetry.api.GlobalOpenTelemetry;
-import io.opentelemetry.api.OpenTelemetry;
-import io.opentelemetry.api.trace.Span;
-import io.opentelemetry.api.trace.SpanBuilder;
-import io.opentelemetry.api.trace.SpanKind;
-import io.opentelemetry.api.trace.Tracer;
-import io.opentelemetry.context.Context;
-import io.opentelemetry.context.Scope;
-import io.opentelemetry.context.propagation.TextMapGetter;
-import io.opentelemetry.context.propagation.TextMapPropagator;
-import io.opentelemetry.context.propagation.TextMapSetter;
/**
* This SMT enables integration with a tracing system.
@@ -53,23 +38,12 @@
*/
public class ActivateTracingSpan> implements Transformation {
- private static final String DB_FIELDS_PREFIX = "db.";
-
private static final Logger LOGGER = LoggerFactory.getLogger(ActivateTracingSpan.class);
private static final String DEFAULT_TRACING_SPAN_CONTEXT_FIELD = "tracingspancontext";
private static final String DEFAULT_TRACING_OPERATION_NAME = "debezium-read";
- private static final String TRACING_COMPONENT = ActivateTracingSpan.class.getName();
- private static final String TX_LOG_WRITE_OPERATION_NAME = "db-log-write";
-
- private static final boolean OPEN_TRACING_AVAILABLE = resolveOpenTracingApiAvailable();
-
- private static final OpenTelemetry openTelemetry = GlobalOpenTelemetry.get();
- private static final Tracer tracer = openTelemetry.getTracer(TRACING_COMPONENT);
- private static final TextMapPropagator TEXT_MAP_PROPAGATOR = openTelemetry.getPropagators().getTextMapPropagator();
- private static final TextMapSetter SETTER = KafkaConnectHeadersSetter.INSTANCE;
- private static final TextMapGetter GETTER = PropertiesGetter.INSTANCE;
+ private static final boolean OPEN_TELEMETRY_AVAILABLE = resolveOpenTelemetryApiAvailable();
public static final Field TRACING_SPAN_CONTEXT_FIELD = Field.create("tracing.span.context.field")
.withDisplayName("Serialized tracing span context field")
@@ -145,67 +119,13 @@ public R apply(R connectRecord) {
}
try {
- return traceRecord(connectRecord, envelope, source, propagatedSpanContext);
+ return ActivateTracingSpanDelegate.Builder.build().traceRecord(connectRecord, envelope, source, propagatedSpanContext, operationName);
}
catch (NoClassDefFoundError e) {
throw new DebeziumException("Failed to record tracing information, tracing libraries not available", e);
}
}
- private R traceRecord(R connectRecord, Struct envelope, Struct source, String propagatedSpanContext) {
- if (propagatedSpanContext != null) {
-
- Properties props = PropertiesGetter.extract(propagatedSpanContext);
-
- Context parentSpanContext = openTelemetry.getPropagators().getTextMapPropagator()
- .extract(Context.current(), props, GETTER);
-
- SpanBuilder txLogSpanBuilder = tracer.spanBuilder(TX_LOG_WRITE_OPERATION_NAME)
- .setSpanKind(SpanKind.INTERNAL)
- .setParent(parentSpanContext);
-
- if (source != null) {
- Long eventTimestamp = source.getInt64(AbstractSourceInfo.TIMESTAMP_KEY);
- if (Objects.nonNull(eventTimestamp)) {
- txLogSpanBuilder.setStartTimestamp(eventTimestamp, TimeUnit.MILLISECONDS);
- }
- }
-
- Span txLogSpan = txLogSpanBuilder.startSpan();
-
- try (Scope ignored = txLogSpan.makeCurrent()) {
- if (source != null) {
- for (org.apache.kafka.connect.data.Field field : source.schema().fields()) {
- addFieldToSpan(txLogSpan, source, field.name(), DB_FIELDS_PREFIX);
- }
- }
- debeziumSpan(envelope);
- TEXT_MAP_PROPAGATOR.inject(Context.current(), connectRecord.headers(), SETTER);
-
- }
- finally {
- txLogSpan.end();
- }
- }
-
- return connectRecord;
- }
-
- private void debeziumSpan(Struct envelope) {
- final Long processingTimestamp = envelope.getInt64(Envelope.FieldName.TIMESTAMP);
- Span debeziumSpan = tracer.spanBuilder(operationName)
- .setStartTimestamp(processingTimestamp, TimeUnit.MILLISECONDS)
- .startSpan();
-
- try (Scope ignored = debeziumSpan.makeCurrent()) {
- addFieldToSpan(debeziumSpan, envelope, Envelope.FieldName.OPERATION, "");
- addFieldToSpan(debeziumSpan, envelope, Envelope.FieldName.TIMESTAMP, "");
- }
- finally {
- debeziumSpan.end();
- }
- }
-
@Override
public void close() {
}
@@ -222,30 +142,11 @@ public ConfigDef config() {
return config;
}
- private void addFieldToSpan(Span span, Struct struct, String field, String prefix) {
- final Object fieldValue = struct.get(field);
- if (fieldValue != null) {
- String targetFieldName = prefix + field;
- if (DB_FIELDS_PREFIX.equals(prefix)) {
- if ("db".equals(field)) {
- targetFieldName = prefix + "instance";
- }
- else if ("connector".equals(field)) {
- targetFieldName = prefix + "type";
- }
- else if ("name".equals(field)) {
- targetFieldName = prefix + "cdc-name";
- }
- }
- span.setAttribute(targetFieldName, fieldValue.toString());
- }
+ public static boolean isOpenTelemetryAvailable() {
+ return OPEN_TELEMETRY_AVAILABLE;
}
- public static boolean isOpenTracingAvailable() {
- return OPEN_TRACING_AVAILABLE;
- }
-
- private static boolean resolveOpenTracingApiAvailable() {
+ private static boolean resolveOpenTelemetryApiAvailable() {
try {
GlobalOpenTelemetry.get();
return true;
diff --git a/debezium-core/src/main/java/io/debezium/transforms/tracing/ActivateTracingSpanDelegate.java b/debezium-core/src/main/java/io/debezium/transforms/tracing/ActivateTracingSpanDelegate.java
new file mode 100644
index 000000000..09ea33e7f
--- /dev/null
+++ b/debezium-core/src/main/java/io/debezium/transforms/tracing/ActivateTracingSpanDelegate.java
@@ -0,0 +1,154 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.transforms.tracing;
+
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.header.Headers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.debezium.connector.AbstractSourceInfo;
+import io.debezium.data.Envelope;
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.internal.ConfigUtil;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanBuilder;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+import io.opentelemetry.context.propagation.TextMapGetter;
+import io.opentelemetry.context.propagation.TextMapPropagator;
+import io.opentelemetry.context.propagation.TextMapSetter;
+
+public class ActivateTracingSpanDelegate {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ActivateTracingSpanDelegate.class);
+
+ private static final String DB_FIELDS_PREFIX = "db.";
+ private static final String TX_LOG_WRITE_OPERATION_NAME = "db-log-write";
+ public static final String ARG_OTEL_JAVAAGENT_ENABLED = "otel.javaagent.enabled";
+ public static final String ARG_OTEL_INSTRUMENTATION_OPENTELEMETRY_API_ENABLED = "otel.instrumentation.opentelemetry-api.enabled";
+
+ private static final String TRACING_COMPONENT = ActivateTracingSpanDelegate.class.getName();
+ private static final OpenTelemetry openTelemetry = GlobalOpenTelemetry.get();
+ private static final Tracer tracer = openTelemetry.getTracer(TRACING_COMPONENT);
+ private static final TextMapPropagator TEXT_MAP_PROPAGATOR = openTelemetry.getPropagators().getTextMapPropagator();
+ private static final TextMapSetter SETTER = KafkaConnectHeadersSetter.INSTANCE;
+ private static final TextMapGetter GETTER = PropertiesGetter.INSTANCE;
+
+ private ActivateTracingSpanDelegate() {
+
+ }
+
+ public > R traceRecord(R connectRecord, Struct envelope, Struct source, String propagatedSpanContext, String operationName) {
+ if (!is0penTelemetryJavaagentEnable()) {
+ LOGGER.debug(
+ "OpenTelemetry javaagent is disabled. To enable, run your JVM with -D{}=true\"",
+ ARG_OTEL_JAVAAGENT_ENABLED);
+ }
+ if (!isOpenTelemetryApiEnable()) {
+ LOGGER.debug(
+ "OpenTelemetry API is disabled. To enable, run your JVM with -D{}=true\"",
+ ARG_OTEL_INSTRUMENTATION_OPENTELEMETRY_API_ENABLED);
+ }
+ if (propagatedSpanContext != null) {
+
+ Properties props = PropertiesGetter.extract(propagatedSpanContext);
+
+ Context parentSpanContext = openTelemetry.getPropagators().getTextMapPropagator()
+ .extract(Context.current(), props, GETTER);
+
+ SpanBuilder txLogSpanBuilder = tracer.spanBuilder(TX_LOG_WRITE_OPERATION_NAME)
+ .setSpanKind(SpanKind.INTERNAL)
+ .setParent(parentSpanContext);
+
+ if (source != null) {
+ Long eventTimestamp = source.getInt64(AbstractSourceInfo.TIMESTAMP_KEY);
+ if (Objects.nonNull(eventTimestamp)) {
+ txLogSpanBuilder.setStartTimestamp(eventTimestamp, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ Span txLogSpan = txLogSpanBuilder.startSpan();
+
+ try (Scope ignored = txLogSpan.makeCurrent()) {
+ if (source != null) {
+ for (org.apache.kafka.connect.data.Field field : source.schema().fields()) {
+ addFieldToSpan(txLogSpan, source, field.name(), DB_FIELDS_PREFIX);
+ }
+ }
+ debeziumSpan(envelope, operationName);
+
+ TEXT_MAP_PROPAGATOR.inject(Context.current(), connectRecord.headers(), SETTER);
+ }
+ finally {
+ txLogSpan.end();
+ }
+ }
+
+ return connectRecord;
+ }
+
+ private void debeziumSpan(Struct envelope, String operationName) {
+ final Long processingTimestamp = envelope.getInt64(Envelope.FieldName.TIMESTAMP);
+ Span debeziumSpan = ActivateTracingSpanDelegate.tracer.spanBuilder(operationName)
+ .setStartTimestamp(processingTimestamp, TimeUnit.MILLISECONDS)
+ .startSpan();
+
+ try (Scope ignored = debeziumSpan.makeCurrent()) {
+ addFieldToSpan(debeziumSpan, envelope, Envelope.FieldName.OPERATION, "");
+ addFieldToSpan(debeziumSpan, envelope, Envelope.FieldName.TIMESTAMP, "");
+ }
+ finally {
+ debeziumSpan.end();
+ }
+ }
+
+ private void addFieldToSpan(Span span, Struct struct, String field, String prefix) {
+ final Object fieldValue = struct.get(field);
+ if (fieldValue != null) {
+ String targetFieldName = prefix + field;
+ if (DB_FIELDS_PREFIX.equals(prefix)) {
+ if ("db".equals(field)) {
+ targetFieldName = prefix + "instance";
+ }
+ else if ("connector".equals(field)) {
+ targetFieldName = prefix + "type";
+ }
+ else if ("name".equals(field)) {
+ targetFieldName = prefix + "cdc-name";
+ }
+ }
+ span.setAttribute(targetFieldName, fieldValue.toString());
+ }
+ }
+
+ public static class Builder {
+
+ private Builder() {
+
+ }
+
+ public static ActivateTracingSpanDelegate build() {
+ return new ActivateTracingSpanDelegate();
+ }
+ }
+
+ private static boolean is0penTelemetryJavaagentEnable() {
+ return Boolean.parseBoolean(ConfigUtil.getString(ARG_OTEL_JAVAAGENT_ENABLED, "true"));
+ }
+
+ private static boolean isOpenTelemetryApiEnable() {
+ return Boolean.parseBoolean(ConfigUtil.getString(ARG_OTEL_INSTRUMENTATION_OPENTELEMETRY_API_ENABLED, "true"));
+ }
+}
diff --git a/debezium-interceptor/pom.xml b/debezium-interceptor/pom.xml
new file mode 100644
index 000000000..ce4ad7ab3
--- /dev/null
+++ b/debezium-interceptor/pom.xml
@@ -0,0 +1,60 @@
+
+
+
+ io.debezium
+ debezium-parent
+ 2.2.0-SNAPSHOT
+ ../debezium-parent/pom.xml
+
+ 4.0.0
+ debezium-interceptor
+ Debezium Interceptor
+ jar
+
+
+
+ org.slf4j
+ slf4j-api
+ provided
+
+
+ org.apache.kafka
+ kafka-clients
+ provided
+
+
+ io.opentelemetry.instrumentation
+ opentelemetry-kafka-clients-2.6
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+
+ 15
+
+
+
+ maven-assembly-plugin
+
+
+ jar-with-dependencies
+
+ false
+
+
+
+ make-assembly
+ package
+
+ single
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/debezium-core/src/main/java/io/debezium/transforms/tracing/DebeziumTracingProducerInterceptor.java b/debezium-interceptor/src/main/java/io/debezium/tracing/DebeziumTracingProducerInterceptor.java
similarity index 64%
rename from debezium-core/src/main/java/io/debezium/transforms/tracing/DebeziumTracingProducerInterceptor.java
rename to debezium-interceptor/src/main/java/io/debezium/tracing/DebeziumTracingProducerInterceptor.java
index 113eed03f..039f1e660 100644
--- a/debezium-core/src/main/java/io/debezium/transforms/tracing/DebeziumTracingProducerInterceptor.java
+++ b/debezium-interceptor/src/main/java/io/debezium/tracing/DebeziumTracingProducerInterceptor.java
@@ -3,12 +3,15 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
-package io.debezium.transforms.tracing;
+package io.debezium.tracing;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.internal.ConfigUtil;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.Tracer;
@@ -16,9 +19,12 @@
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.context.propagation.TextMapPropagator;
-import io.opentelemetry.instrumentation.kafkaclients.TracingProducerInterceptor;
+import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor;
public class DebeziumTracingProducerInterceptor extends TracingProducerInterceptor {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumTracingProducerInterceptor.class);
+ public static final String ARG_OTEL_INSTRUMENTATION_KAFKA_ENABLED = "otel.instrumentation.kafka.enabled";
private static final OpenTelemetry openTelemetry = GlobalOpenTelemetry.get();
private static final Tracer tracer = openTelemetry.getTracer(DebeziumTracingProducerInterceptor.class.getName());
private static final TextMapPropagator TEXT_MAP_PROPAGATOR = openTelemetry.getPropagators().getTextMapPropagator();
@@ -26,8 +32,14 @@ public class DebeziumTracingProducerInterceptor extends TracingProducerInt
@Override
public ProducerRecord onSend(ProducerRecord producerRecord) {
- Context parentContext = TEXT_MAP_PROPAGATOR.extract(Context.current(), producerRecord, GETTER);
+ if (isInstrumentationKafkaEnabled()) {
+ LOGGER.warn(
+ "To enable end-to-end traceability with Debezium you need to disable automatic Kafka instrumentation. To disable, run your JVM with -D{}=false\"",
+ ARG_OTEL_INSTRUMENTATION_KAFKA_ENABLED);
+ return producerRecord;
+ }
+ Context parentContext = TEXT_MAP_PROPAGATOR.extract(Context.current(), producerRecord, GETTER);
Span interceptorSpan = tracer.spanBuilder("onSend")
.setSpanKind(SpanKind.INTERNAL)
.setParent(parentContext)
@@ -39,6 +51,9 @@ public ProducerRecord onSend(ProducerRecord producerRecord) {
finally {
interceptorSpan.end();
}
+ }
+ private static boolean isInstrumentationKafkaEnabled() {
+ return Boolean.parseBoolean(ConfigUtil.getString(ARG_OTEL_INSTRUMENTATION_KAFKA_ENABLED, "true"));
}
}
diff --git a/debezium-core/src/main/java/io/debezium/transforms/tracing/KafkaProducerRecordGetter.java b/debezium-interceptor/src/main/java/io/debezium/tracing/KafkaProducerRecordGetter.java
similarity index 96%
rename from debezium-core/src/main/java/io/debezium/transforms/tracing/KafkaProducerRecordGetter.java
rename to debezium-interceptor/src/main/java/io/debezium/tracing/KafkaProducerRecordGetter.java
index c2454814f..310d54731 100644
--- a/debezium-core/src/main/java/io/debezium/transforms/tracing/KafkaProducerRecordGetter.java
+++ b/debezium-interceptor/src/main/java/io/debezium/tracing/KafkaProducerRecordGetter.java
@@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
-package io.debezium.transforms.tracing;
+package io.debezium.tracing;
import java.nio.charset.StandardCharsets;
import java.util.stream.Collectors;
diff --git a/pom.xml b/pom.xml
index fae23b4c9..48d1bd96f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -176,6 +176,8 @@
debezium-connect-rest-extension
debezium-schema-generator
debezium-storage
+ debezium-interceptor
+