DBZ-4809 Add task id and partition to the logging context for

multi-partition connectors.

Co-authored-by: Sergei Morozov <morozov@tut.by>
This commit is contained in:
Josh Ribera 2022-03-08 12:19:42 -07:00 committed by Gunnar Morling
parent b46773e5e4
commit 020e845f44
7 changed files with 68 additions and 30 deletions

View File

@ -67,6 +67,7 @@ protected void executeChangeEventSources(CdcSourceTaskContext taskContext, Snaps
SqlServerPartition partition = entry.getKey();
SqlServerOffsetContext previousOffset = entry.getValue();
previousLogContext.set(taskContext.configureLoggingContext("snapshot", partition));
SnapshotResult<SqlServerOffsetContext> snapshotResult = doSnapshot(snapshotSource, context, partition, previousOffset);
if (snapshotResult.isCompletedOrSkipped()) {
@ -75,11 +76,6 @@ protected void executeChangeEventSources(CdcSourceTaskContext taskContext, Snaps
}
previousLogContext.set(taskContext.configureLoggingContext("streaming"));
streamEvents(context, streamingOffsets);
}
private void streamEvents(ChangeEventSourceContext context, Offsets<SqlServerPartition, SqlServerOffsetContext> streamingOffsets)
throws InterruptedException {
// TODO: Determine how to do incremental snapshots with multiple partitions
for (Map.Entry<SqlServerPartition, SqlServerOffsetContext> entry : streamingOffsets) {
@ -96,6 +92,8 @@ private void streamEvents(ChangeEventSourceContext context, Offsets<SqlServerPar
SqlServerPartition partition = entry.getKey();
SqlServerOffsetContext previousOffset = entry.getValue();
previousLogContext.set(taskContext.configureLoggingContext("streaming", partition));
if (context.isRunning()) {
if (streamingSource.executeIteration(context, partition, previousOffset)) {
streamedEvents = true;

View File

@ -19,6 +19,7 @@
import io.debezium.config.Configuration;
import io.debezium.pipeline.spi.Partition;
import io.debezium.util.Collect;
import io.debezium.util.LoggingContext;
public class SqlServerPartition implements Partition {
private static final String SERVER_PARTITION_KEY = "server";
@ -46,6 +47,11 @@ public Map<String, String> getSourcePartition() {
return sourcePartition;
}
@Override
public Map<String, String> getLoggingContext() {
return Collections.singletonMap(LoggingContext.DATABASE_NAME, databaseName);
}
/**
* Returns the SQL Server database name corresponding to the partition.
*/

View File

@ -3,7 +3,7 @@
<appender name="CONSOLE"
class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{ISO8601} %-5p %X{dbz.connectorType}|%X{dbz.connectorName}|%X{dbz.connectorContext} %m [%c]%n</pattern>
<pattern>%d{ISO8601} %-5p %X{dbz.connectorType}|%X{dbz.connectorName}|%X{dbz.taskId}|%X{dbz.connectorContext}|%X{dbz.databaseName} %m [%c]%n</pattern>
</encoder>
</appender>

View File

@ -12,6 +12,7 @@
import org.apache.kafka.connect.source.SourceTask;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.pipeline.spi.Partition;
import io.debezium.schema.DataCollectionId;
import io.debezium.util.Clock;
import io.debezium.util.LoggingContext;
@ -57,6 +58,10 @@ public LoggingContext.PreviousContext configureLoggingContext(String contextName
return LoggingContext.forConnector(connectorType, connectorName, contextName);
}
public LoggingContext.PreviousContext configureLoggingContext(String contextName, Partition partition) {
return LoggingContext.forConnector(connectorType, connectorName, taskId, contextName, partition);
}
/**
* Run the supplied function in the temporary connector MDC context, and when complete always return the MDC context to its
* state before this method was called.

View File

@ -133,10 +133,11 @@ protected void executeChangeEventSources(CdcSourceTaskContext taskContext, Snaps
final P partition = previousOffsets.getTheOnlyPartition();
final O previousOffset = previousOffsets.getTheOnlyOffset();
previousLogContext.set(taskContext.configureLoggingContext("snapshot", partition));
SnapshotResult<O> snapshotResult = doSnapshot(snapshotSource, context, partition, previousOffset);
if (running && snapshotResult.isCompletedOrSkipped()) {
previousLogContext.set(taskContext.configureLoggingContext("streaming"));
previousLogContext.set(taskContext.configureLoggingContext("streaming", partition));
streamEvents(context, partition, snapshotResult.getOffset());
}
}

View File

@ -5,6 +5,7 @@
*/
package io.debezium.pipeline.spi;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
@ -15,6 +16,13 @@
public interface Partition {
Map<String, String> getSourcePartition();
/**
* Returns the partition representation in the logging context.
*/
default Map<String, String> getLoggingContext() {
return Collections.emptyMap();
}
/**
* Implementations provide a set of connector-specific partitions based on the connector task configuration.
*/

View File

@ -7,9 +7,12 @@
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import org.slf4j.MDC;
import io.debezium.pipeline.spi.Partition;
/**
* A utility that provides a consistent set of properties for the Mapped Diagnostic Context (MDC) properties used by Debezium
* components.
@ -31,6 +34,14 @@ public class LoggingContext {
* The key for the connector context name MDC property.
*/
public static final String CONNECTOR_CONTEXT = "dbz.connectorContext";
/**
* The key for the task id MDC property.
*/
public static final String TASK_ID = "dbz.taskId";
/**
* The key for the database name MDC property.
*/
public static final String DATABASE_NAME = "dbz.databaseName";
private LoggingContext() {
}
@ -62,19 +73,35 @@ public void restore() {
* @param connectorName the name of the connector; may not be null
* @param contextName the name of the context; may not be null
* @return the previous MDC context; never null
* @throws IllegalArgumentException if any of the parameters are null
* @throws NullPointerException if any of the parameters are null
*/
public static PreviousContext forConnector(String connectorType, String connectorName, String contextName) {
if (connectorType == null) {
throw new IllegalArgumentException("The MDC value for the connector type may not be null");
}
if (connectorName == null) {
throw new IllegalArgumentException("The MDC value for the connector name may not be null");
}
if (contextName == null) {
throw new IllegalArgumentException("The MDC value for the connector context may not be null");
}
return forConnector(connectorType, connectorName, null, contextName, null);
}
/**
* Configure for a connector the logger's Mapped Diagnostic Context (MDC) properties for the thread making this call.
*
* @param connectorType the type of connector; may not be null
* @param connectorName the name of the connector; may not be null
* @param taskId the task id; may be null
* @param contextName the name of the context; may not be null
* @param partition the partition; may be null
* @return the previous MDC context; never null
* @throws NullPointerException if connectorType, connectorName, or contextName parameters are null
*/
public static PreviousContext forConnector(String connectorType, String connectorName, String taskId, String contextName, Partition partition) {
Objects.requireNonNull(connectorType, "The MDC value for the connector type may not be null");
Objects.requireNonNull(connectorName, "The MDC value for the connector name may not be null");
Objects.requireNonNull(contextName, "The MDC value for the connector context may not be null");
PreviousContext previous = new PreviousContext();
if (taskId != null) {
MDC.put(TASK_ID, taskId);
}
if (partition != null) {
partition.getLoggingContext().forEach(MDC::put);
}
MDC.put(CONNECTOR_TYPE, connectorType);
MDC.put(CONNECTOR_NAME, connectorName);
MDC.put(CONNECTOR_CONTEXT, contextName);
@ -89,21 +116,14 @@ public static PreviousContext forConnector(String connectorType, String connecto
* @param connectorName the logical name of the connector; may not be null
* @param contextName the name of the context; may not be null
* @param operation the function to run in the new MDC context; may not be null
* @throws IllegalArgumentException if any of the parameters are null
* @throws NullPointerException if any of the parameters are null
*/
public static void temporarilyForConnector(String connectorType, String connectorName, String contextName, Runnable operation) {
if (connectorType == null) {
throw new IllegalArgumentException("The MDC value for the connector type may not be null");
}
if (connectorName == null) {
throw new IllegalArgumentException("The MDC value for the connector name may not be null");
}
if (contextName == null) {
throw new IllegalArgumentException("The MDC value for the connector context may not be null");
}
if (operation == null) {
throw new IllegalArgumentException("The operation may not be null");
}
Objects.requireNonNull(connectorType, "The MDC value for the connector type may not be null");
Objects.requireNonNull(connectorName, "The MDC value for the connector name may not be null");
Objects.requireNonNull(contextName, "The MDC value for the connector context may not be null");
Objects.requireNonNull(operation, "The operation may not be null");
PreviousContext previous = new PreviousContext();
try {
forConnector(connectorType, connectorName, contextName);