DBZ-2862 Adaptation of traceability with OpenTelemetry dependencies in the scope provided. Moved the interceptor to a new module.

This commit is contained in:
REMY David 2023-02-25 11:22:20 +01:00 committed by Vojtech Juranek
parent 5846446672
commit 76c536ec23
9 changed files with 245 additions and 113 deletions

View File

@ -29,8 +29,7 @@
<version.grpc>1.56.1</version.grpc>
<!-- Tracing -->
<version.opentelemetry>1.21.0</version.opentelemetry>
<version.opentelemetry.alpha>1.22.1-alpha</version.opentelemetry.alpha>
<version.opentelemetry.alpha>1.23.0-alpha</version.opentelemetry.alpha>
<!-- HTTP client -->
<version.okhttp>4.8.1</version.okhttp>

View File

@ -52,6 +52,7 @@
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-kafka-clients-2.6</artifactId>
<scope>provided</scope>
</dependency>
<!-- Testing -->

View File

@ -112,7 +112,7 @@ public R apply(R r, RecordConverter<R> recordConverter) {
r = recordConverter.convert(r);
if (ActivateTracingSpan.isOpenTracingAvailable()) {
if (ActivateTracingSpan.isOpenTelemetryAvailable()) {
tracingSmt.apply(r);
}
@ -312,13 +312,13 @@ public ConfigDef config() {
}
public void close() {
if (ActivateTracingSpan.isOpenTracingAvailable()) {
if (ActivateTracingSpan.isOpenTelemetryAvailable()) {
tracingSmt.close();
}
}
public void configure(Map<String, ?> configMap) {
if (ActivateTracingSpan.isOpenTracingAvailable()) {
if (ActivateTracingSpan.isOpenTelemetryAvailable()) {
tracingSmt.configure(configMap);
if (!configMap.containsKey(ActivateTracingSpan.TRACING_CONTEXT_FIELD_REQUIRED.name())) {
tracingSmt.setRequireContextField(true);

View File

@ -6,15 +6,11 @@
package io.debezium.transforms.tracing;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.transforms.Transformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -22,21 +18,10 @@
import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.data.Envelope;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.transforms.SmtManager;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.context.propagation.TextMapSetter;
/**
* This SMT enables integration with a tracing system.
@ -53,23 +38,12 @@
*/
public class ActivateTracingSpan<R extends ConnectRecord<R>> implements Transformation<R> {
private static final String DB_FIELDS_PREFIX = "db.";
private static final Logger LOGGER = LoggerFactory.getLogger(ActivateTracingSpan.class);
private static final String DEFAULT_TRACING_SPAN_CONTEXT_FIELD = "tracingspancontext";
private static final String DEFAULT_TRACING_OPERATION_NAME = "debezium-read";
private static final String TRACING_COMPONENT = ActivateTracingSpan.class.getName();
private static final String TX_LOG_WRITE_OPERATION_NAME = "db-log-write";
private static final boolean OPEN_TRACING_AVAILABLE = resolveOpenTracingApiAvailable();
private static final OpenTelemetry openTelemetry = GlobalOpenTelemetry.get();
private static final Tracer tracer = openTelemetry.getTracer(TRACING_COMPONENT);
private static final TextMapPropagator TEXT_MAP_PROPAGATOR = openTelemetry.getPropagators().getTextMapPropagator();
private static final TextMapSetter<Headers> SETTER = KafkaConnectHeadersSetter.INSTANCE;
private static final TextMapGetter<Properties> GETTER = PropertiesGetter.INSTANCE;
private static final boolean OPEN_TELEMETRY_AVAILABLE = resolveOpenTelemetryApiAvailable();
public static final Field TRACING_SPAN_CONTEXT_FIELD = Field.create("tracing.span.context.field")
.withDisplayName("Serialized tracing span context field")
@ -145,67 +119,13 @@ public R apply(R connectRecord) {
}
try {
return traceRecord(connectRecord, envelope, source, propagatedSpanContext);
return ActivateTracingSpanDelegate.Builder.build().traceRecord(connectRecord, envelope, source, propagatedSpanContext, operationName);
}
catch (NoClassDefFoundError e) {
throw new DebeziumException("Failed to record tracing information, tracing libraries not available", e);
}
}
private R traceRecord(R connectRecord, Struct envelope, Struct source, String propagatedSpanContext) {
if (propagatedSpanContext != null) {
Properties props = PropertiesGetter.extract(propagatedSpanContext);
Context parentSpanContext = openTelemetry.getPropagators().getTextMapPropagator()
.extract(Context.current(), props, GETTER);
SpanBuilder txLogSpanBuilder = tracer.spanBuilder(TX_LOG_WRITE_OPERATION_NAME)
.setSpanKind(SpanKind.INTERNAL)
.setParent(parentSpanContext);
if (source != null) {
Long eventTimestamp = source.getInt64(AbstractSourceInfo.TIMESTAMP_KEY);
if (Objects.nonNull(eventTimestamp)) {
txLogSpanBuilder.setStartTimestamp(eventTimestamp, TimeUnit.MILLISECONDS);
}
}
Span txLogSpan = txLogSpanBuilder.startSpan();
try (Scope ignored = txLogSpan.makeCurrent()) {
if (source != null) {
for (org.apache.kafka.connect.data.Field field : source.schema().fields()) {
addFieldToSpan(txLogSpan, source, field.name(), DB_FIELDS_PREFIX);
}
}
debeziumSpan(envelope);
TEXT_MAP_PROPAGATOR.inject(Context.current(), connectRecord.headers(), SETTER);
}
finally {
txLogSpan.end();
}
}
return connectRecord;
}
private void debeziumSpan(Struct envelope) {
final Long processingTimestamp = envelope.getInt64(Envelope.FieldName.TIMESTAMP);
Span debeziumSpan = tracer.spanBuilder(operationName)
.setStartTimestamp(processingTimestamp, TimeUnit.MILLISECONDS)
.startSpan();
try (Scope ignored = debeziumSpan.makeCurrent()) {
addFieldToSpan(debeziumSpan, envelope, Envelope.FieldName.OPERATION, "");
addFieldToSpan(debeziumSpan, envelope, Envelope.FieldName.TIMESTAMP, "");
}
finally {
debeziumSpan.end();
}
}
@Override
public void close() {
}
@ -222,30 +142,11 @@ public ConfigDef config() {
return config;
}
private void addFieldToSpan(Span span, Struct struct, String field, String prefix) {
final Object fieldValue = struct.get(field);
if (fieldValue != null) {
String targetFieldName = prefix + field;
if (DB_FIELDS_PREFIX.equals(prefix)) {
if ("db".equals(field)) {
targetFieldName = prefix + "instance";
}
else if ("connector".equals(field)) {
targetFieldName = prefix + "type";
}
else if ("name".equals(field)) {
targetFieldName = prefix + "cdc-name";
}
}
span.setAttribute(targetFieldName, fieldValue.toString());
}
public static boolean isOpenTelemetryAvailable() {
return OPEN_TELEMETRY_AVAILABLE;
}
public static boolean isOpenTracingAvailable() {
return OPEN_TRACING_AVAILABLE;
}
private static boolean resolveOpenTracingApiAvailable() {
private static boolean resolveOpenTelemetryApiAvailable() {
try {
GlobalOpenTelemetry.get();
return true;

View File

@ -0,0 +1,154 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.transforms.tracing;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.data.Envelope;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.internal.ConfigUtil;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.context.propagation.TextMapSetter;
public class ActivateTracingSpanDelegate {
private static final Logger LOGGER = LoggerFactory.getLogger(ActivateTracingSpanDelegate.class);
private static final String DB_FIELDS_PREFIX = "db.";
private static final String TX_LOG_WRITE_OPERATION_NAME = "db-log-write";
public static final String ARG_OTEL_JAVAAGENT_ENABLED = "otel.javaagent.enabled";
public static final String ARG_OTEL_INSTRUMENTATION_OPENTELEMETRY_API_ENABLED = "otel.instrumentation.opentelemetry-api.enabled";
private static final String TRACING_COMPONENT = ActivateTracingSpanDelegate.class.getName();
private static final OpenTelemetry openTelemetry = GlobalOpenTelemetry.get();
private static final Tracer tracer = openTelemetry.getTracer(TRACING_COMPONENT);
private static final TextMapPropagator TEXT_MAP_PROPAGATOR = openTelemetry.getPropagators().getTextMapPropagator();
private static final TextMapSetter<Headers> SETTER = KafkaConnectHeadersSetter.INSTANCE;
private static final TextMapGetter<Properties> GETTER = PropertiesGetter.INSTANCE;
private ActivateTracingSpanDelegate() {
}
public <R extends ConnectRecord<R>> R traceRecord(R connectRecord, Struct envelope, Struct source, String propagatedSpanContext, String operationName) {
if (!is0penTelemetryJavaagentEnable()) {
LOGGER.debug(
"OpenTelemetry javaagent is disabled. To enable, run your JVM with -D{}=true\"",
ARG_OTEL_JAVAAGENT_ENABLED);
}
if (!isOpenTelemetryApiEnable()) {
LOGGER.debug(
"OpenTelemetry API is disabled. To enable, run your JVM with -D{}=true\"",
ARG_OTEL_INSTRUMENTATION_OPENTELEMETRY_API_ENABLED);
}
if (propagatedSpanContext != null) {
Properties props = PropertiesGetter.extract(propagatedSpanContext);
Context parentSpanContext = openTelemetry.getPropagators().getTextMapPropagator()
.extract(Context.current(), props, GETTER);
SpanBuilder txLogSpanBuilder = tracer.spanBuilder(TX_LOG_WRITE_OPERATION_NAME)
.setSpanKind(SpanKind.INTERNAL)
.setParent(parentSpanContext);
if (source != null) {
Long eventTimestamp = source.getInt64(AbstractSourceInfo.TIMESTAMP_KEY);
if (Objects.nonNull(eventTimestamp)) {
txLogSpanBuilder.setStartTimestamp(eventTimestamp, TimeUnit.MILLISECONDS);
}
}
Span txLogSpan = txLogSpanBuilder.startSpan();
try (Scope ignored = txLogSpan.makeCurrent()) {
if (source != null) {
for (org.apache.kafka.connect.data.Field field : source.schema().fields()) {
addFieldToSpan(txLogSpan, source, field.name(), DB_FIELDS_PREFIX);
}
}
debeziumSpan(envelope, operationName);
TEXT_MAP_PROPAGATOR.inject(Context.current(), connectRecord.headers(), SETTER);
}
finally {
txLogSpan.end();
}
}
return connectRecord;
}
private void debeziumSpan(Struct envelope, String operationName) {
final Long processingTimestamp = envelope.getInt64(Envelope.FieldName.TIMESTAMP);
Span debeziumSpan = ActivateTracingSpanDelegate.tracer.spanBuilder(operationName)
.setStartTimestamp(processingTimestamp, TimeUnit.MILLISECONDS)
.startSpan();
try (Scope ignored = debeziumSpan.makeCurrent()) {
addFieldToSpan(debeziumSpan, envelope, Envelope.FieldName.OPERATION, "");
addFieldToSpan(debeziumSpan, envelope, Envelope.FieldName.TIMESTAMP, "");
}
finally {
debeziumSpan.end();
}
}
private void addFieldToSpan(Span span, Struct struct, String field, String prefix) {
final Object fieldValue = struct.get(field);
if (fieldValue != null) {
String targetFieldName = prefix + field;
if (DB_FIELDS_PREFIX.equals(prefix)) {
if ("db".equals(field)) {
targetFieldName = prefix + "instance";
}
else if ("connector".equals(field)) {
targetFieldName = prefix + "type";
}
else if ("name".equals(field)) {
targetFieldName = prefix + "cdc-name";
}
}
span.setAttribute(targetFieldName, fieldValue.toString());
}
}
public static class Builder {
private Builder() {
}
public static ActivateTracingSpanDelegate build() {
return new ActivateTracingSpanDelegate();
}
}
private static boolean is0penTelemetryJavaagentEnable() {
return Boolean.parseBoolean(ConfigUtil.getString(ARG_OTEL_JAVAAGENT_ENABLED, "true"));
}
private static boolean isOpenTelemetryApiEnable() {
return Boolean.parseBoolean(ConfigUtil.getString(ARG_OTEL_INSTRUMENTATION_OPENTELEMETRY_API_ENABLED, "true"));
}
}

View File

@ -0,0 +1,60 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.debezium</groupId>
<artifactId>debezium-parent</artifactId>
<version>2.2.0-SNAPSHOT</version>
<relativePath>../debezium-parent/pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>debezium-interceptor</artifactId>
<name>Debezium Interceptor</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-kafka-clients-2.6</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>15</source>
<target>15</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<appendAssemblyId>false</appendAssemblyId>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -3,12 +3,15 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.transforms.tracing;
package io.debezium.tracing;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.internal.ConfigUtil;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.Tracer;
@ -16,9 +19,12 @@
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.instrumentation.kafkaclients.TracingProducerInterceptor;
import io.opentelemetry.instrumentation.kafkaclients.v2_6.TracingProducerInterceptor;
public class DebeziumTracingProducerInterceptor<K, V> extends TracingProducerInterceptor<K, V> {
private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumTracingProducerInterceptor.class);
public static final String ARG_OTEL_INSTRUMENTATION_KAFKA_ENABLED = "otel.instrumentation.kafka.enabled";
private static final OpenTelemetry openTelemetry = GlobalOpenTelemetry.get();
private static final Tracer tracer = openTelemetry.getTracer(DebeziumTracingProducerInterceptor.class.getName());
private static final TextMapPropagator TEXT_MAP_PROPAGATOR = openTelemetry.getPropagators().getTextMapPropagator();
@ -26,8 +32,14 @@ public class DebeziumTracingProducerInterceptor<K, V> extends TracingProducerInt
@Override
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> producerRecord) {
Context parentContext = TEXT_MAP_PROPAGATOR.extract(Context.current(), producerRecord, GETTER);
if (isInstrumentationKafkaEnabled()) {
LOGGER.warn(
"To enable end-to-end traceability with Debezium you need to disable automatic Kafka instrumentation. To disable, run your JVM with -D{}=false\"",
ARG_OTEL_INSTRUMENTATION_KAFKA_ENABLED);
return producerRecord;
}
Context parentContext = TEXT_MAP_PROPAGATOR.extract(Context.current(), producerRecord, GETTER);
Span interceptorSpan = tracer.spanBuilder("onSend")
.setSpanKind(SpanKind.INTERNAL)
.setParent(parentContext)
@ -39,6 +51,9 @@ public ProducerRecord<K, V> onSend(ProducerRecord<K, V> producerRecord) {
finally {
interceptorSpan.end();
}
}
private static boolean isInstrumentationKafkaEnabled() {
return Boolean.parseBoolean(ConfigUtil.getString(ARG_OTEL_INSTRUMENTATION_KAFKA_ENABLED, "true"));
}
}

View File

@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.transforms.tracing;
package io.debezium.tracing;
import java.nio.charset.StandardCharsets;
import java.util.stream.Collectors;

View File

@ -176,6 +176,8 @@
<module>debezium-connect-rest-extension</module>
<module>debezium-schema-generator</module>
<module>debezium-storage</module>
<module>debezium-interceptor</module>
</modules>
<distributionManagement>