diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlChangeEventSourceFactory.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlChangeEventSourceFactory.java index 7e1b9bfc4..a6a8db9fb 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlChangeEventSourceFactory.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlChangeEventSourceFactory.java @@ -30,6 +30,10 @@ public class MySqlChangeEventSourceFactory implements ChangeEventSourceFactory { private final Clock clock; private final MySqlTaskContext taskContext; private final MySqlStreamingChangeEventSourceMetrics streamingMetrics; + // MySQL snapshot requires buffering to modify the last record in the snapshot as sometimes it is + // impossible to detect it till the snapshot is ended. Mainly when the last snapshotted table is empty. + // Based on the DBZ-3113 the code can change in the future and it will be handled not in MySQL + // but in the core shared code. private final ChangeEventQueue queue; public MySqlChangeEventSourceFactory(MySqlConnectorConfig configuration, MySqlConnection connection, diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDatabaseSchema.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDatabaseSchema.java index e30722f68..fce9d7b82 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDatabaseSchema.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDatabaseSchema.java @@ -8,11 +8,11 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import org.apache.kafka.connect.data.Schema; import org.slf4j.Logger; @@ -77,7 +77,7 @@ public class MySqlDatabaseSchema extends HistorizedRelationalDatabaseSchema { private final DdlParser ddlParser; private final RelationalTableFilters filters; private final DdlChanges ddlChanges; - private final Map tableIdsByTableNumber = new HashMap<>(); + private final Map tableIdsByTableNumber = new ConcurrentHashMap<>(); private boolean storageInitialiationExecuted = false; /** @@ -244,7 +244,7 @@ private List parseDdl(String ddlStatements, String databaseNa events.forEach(event -> { final TableId tableId = getTableId(event); offset.tableEvent(dbName, tableIds, sourceTime); - // For SET with multpile parameters + // For SET with multiple parameters if (event instanceof TableCreatedEvent) { emitChangeEvent(offset, schemaChangeEvents, sanitizedDbName, event, tableId, SchemaChangeEventType.CREATE, snapshot); } diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java index dde51b11a..35da9ecb7 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java @@ -216,7 +216,7 @@ protected void releaseSchemaSnapshotLocks(RelationalSnapshotContext snapshotCont // would implicitly commit our active transaction, and this would break our consistent snapshot logic. // Therefore, we cannot unlock the tables here! // https://dev.mysql.com/doc/refman/5.7/en/flush.html - LOGGER.info("Tables were locked explicitly, but to get a consistent snapshot we cannot release the locks until we've read all tables."); + LOGGER.warn("Tables were locked explicitly, but to get a consistent snapshot we cannot release the locks until we've read all tables."); } } } @@ -250,19 +250,7 @@ protected void releaseDataSnapshotLocks(RelationalSnapshotContext snapshotContex super.lastSnapshotRecord(snapshotContext); } - try { - dispatcher.dispatchSchemaChangeEvent(tableId, (receiver) -> { - try { - receiver.schemaChangeEvent(event); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - }); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + dispatcher.dispatchSchemaChangeEvent(tableId, (receiver) -> receiver.schemaChangeEvent(event)); } // Make schema available for snapshot source @@ -656,19 +644,7 @@ protected void createSchemaChangeEventsForTables(ChangeEventSourceContext source if (!snapshottingTask.snapshotData() && !i.hasNext()) { lastSnapshotRecord(snapshotContext); } - try { - dispatcher.dispatchSchemaChangeEvent(tableId, (receiver) -> { - try { - receiver.schemaChangeEvent(event); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - }); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + dispatcher.dispatchSchemaChangeEvent(tableId, (receiver) -> receiver.schemaChangeEvent(event)); } // Make schema available for snapshot source diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlMetricsIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlMetricsIT.java index 312d9c806..a25ea8647 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlMetricsIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlMetricsIT.java @@ -124,6 +124,9 @@ public void testSnapshotOnlyMetrics() throws Exception { .build()); assertSnapshotMetrics(); + // The legacy implementation did not exposed streaming metrics when only snapshot was executed. + // All other connectors based on new framework exposes streaming metrics always so we are + // following the same behaviour in the new implementation if (isLegacy()) { assertNoStreamingMetricsExist(); } diff --git a/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueue.java b/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueue.java index 561fbf0f6..dbfaa4c39 100644 --- a/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueue.java +++ b/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueue.java @@ -68,6 +68,11 @@ public class ChangeEventQueue implements ChangeEventQueueMetrics { private final Supplier loggingContextSupplier; private AtomicLong currentQueueSizeInBytes = new AtomicLong(0); private Map objectMap = new ConcurrentHashMap<>(); + // Sometimes it is necessary to update the record before it is delivered depending on the content + // of the following record. In that cases the easiest solution is to provide a single cell buffer + // that will allow the modification of it during the explicit flush. + // Typical example is MySQL connector when sometimes it is impossible to detect when the record + // in process is the last one. In this case the snapshot flags are set during the explicit flush. private boolean buffering; private T bufferedEvent; diff --git a/debezium-core/src/main/java/io/debezium/connector/common/BaseSourceTask.java b/debezium-core/src/main/java/io/debezium/connector/common/BaseSourceTask.java index ea15b42c9..220c21730 100644 --- a/debezium-core/src/main/java/io/debezium/connector/common/BaseSourceTask.java +++ b/debezium-core/src/main/java/io/debezium/connector/common/BaseSourceTask.java @@ -6,6 +6,7 @@ package io.debezium.connector.common; import java.time.Duration; +import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.Collections; import java.util.List; @@ -80,7 +81,7 @@ protected static enum State { private final ElapsedTimeStrategy pollOutputDelay; private final Clock clock = Clock.system(); private long recordCounter = 0L; - private long previousOutputMillis = 0L; + private Instant previousOutputInstant; protected BaseSourceTask() { // Use exponential delay to log the progress frequently at first, but the quickly tapering off to once an hour... @@ -88,7 +89,7 @@ protected BaseSourceTask() { // Initial our poll output delay logic ... pollOutputDelay.hasElapsed(); - previousOutputMillis = clock.currentTimeInMillis(); + previousOutputInstant = clock.currentTimeAsInstant(); } @Override @@ -162,7 +163,7 @@ public final List poll() throws InterruptedException { } void logStatistics(final List records) { - if (records == null) { + if (records == null || !LOGGER.isInfoEnabled()) { return; } int batchSize = records.size(); @@ -172,17 +173,11 @@ void logStatistics(final List records) { lastOffset = lastRecord.sourceOffset(); if (pollOutputDelay.hasElapsed()) { // We want to record the status ... - long millisSinceLastOutput = clock.currentTimeInMillis() - previousOutputMillis; - try { - if (LOGGER.isInfoEnabled()) { - LOGGER.info("{} records sent during previous {}, last recorded offset: {}", recordCounter, - Strings.duration(millisSinceLastOutput), lastOffset); - } - } - finally { - recordCounter = 0; - previousOutputMillis += millisSinceLastOutput; - } + final Instant currentTime = clock.currentTime(); + LOGGER.info("{} records sent during previous {}, last recorded offset: {}", recordCounter, + Strings.duration(Duration.between(previousOutputInstant, currentTime).toMillis()), lastOffset); + recordCounter = 0; + previousOutputInstant = currentTime; } } } diff --git a/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java b/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java index 56da607f0..83ebe5894 100644 --- a/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java +++ b/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java @@ -285,7 +285,7 @@ protected void createSchemaChangeEventsForTables(ChangeEventSourceContext source */ protected abstract SchemaChangeEvent getCreateTableEvent(RelationalSnapshotContext snapshotContext, Table table) throws Exception; - private void createDataEvents(ChangeEventSourceContext sourceContext, RelationalSnapshotContext snapshotContext) throws InterruptedException { + private void createDataEvents(ChangeEventSourceContext sourceContext, RelationalSnapshotContext snapshotContext) throws Exception { SnapshotReceiver snapshotReceiver = dispatcher.getSnapshotChangeEventReceiver(); tryStartingSnapshot(snapshotContext); @@ -305,16 +305,7 @@ private void createDataEvents(ChangeEventSourceContext sourceContext, Relational createDataEventsForTable(sourceContext, snapshotContext, snapshotReceiver, snapshotContext.tables.forTable(tableId), tableOrder++, tableCount); } - try { - releaseDataSnapshotLocks(snapshotContext); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw e; - } - catch (Exception e) { - throw new DebeziumException(e); - } + releaseDataSnapshotLocks(snapshotContext); snapshotContext.offset.preSnapshotCompletion(); snapshotReceiver.completeSnapshot(); snapshotContext.offset.postSnapshotCompletion(); diff --git a/debezium-core/src/main/java/io/debezium/util/Collect.java b/debezium-core/src/main/java/io/debezium/util/Collect.java index f4074754b..ba572e89b 100644 --- a/debezium-core/src/main/java/io/debezium/util/Collect.java +++ b/debezium-core/src/main/java/io/debezium/util/Collect.java @@ -17,7 +17,6 @@ import java.util.Properties; import java.util.Set; import java.util.function.Function; -import java.util.stream.Collectors; /** * A set of utilities for more easily creating various kinds of collections. @@ -288,9 +287,9 @@ public static void set(List list, int index, T value, T defaultValue) { * @param minuend the elements to be removed */ public static Set minus(Set subtrahend, Set minuend) { - return subtrahend.stream() - .filter(x -> !minuend.contains(x)) - .collect(Collectors.toSet()); + final Set r = new HashSet(subtrahend); + r.removeAll(minuend); + return r; } private Collect() {