diff --git a/debezium-connector-mysql/pom.xml b/debezium-connector-mysql/pom.xml
index 700540296..6d92a5d19 100644
--- a/debezium-connector-mysql/pom.xml
+++ b/debezium-connector-mysql/pom.xml
@@ -77,6 +77,11 @@
junit
test
+
+ org.mockito
+ mockito-core
+ test
+
org.easytesting
fest-assert
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/AbstractReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/AbstractReader.java
index e4b5fe12e..fdb6f69e1 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/AbstractReader.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/AbstractReader.java
@@ -13,6 +13,7 @@
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
@@ -49,13 +50,19 @@ public abstract class AbstractReader implements Reader {
private final AtomicReference uponCompletion = new AtomicReference<>();
private final Duration pollInterval;
+ private final Predicate acceptAndContinue;
+
/**
* Create a snapshot reader.
*
* @param name the name of the reader
* @param context the task context in which this reader is running; may not be null
+ * @param acceptAndContinue a predicate that returns true if the tested {@link SourceRecord} should be accepted and
+ * false if the record and all subsequent records should be ignored. The reader will stop
+ * accepting records once {@link #enqueueRecord(SourceRecord)} is called with a record
+ * that tests as false. Can be null. If null, all records will be accepted.
*/
- public AbstractReader(String name, MySqlTaskContext context) {
+ public AbstractReader(String name, MySqlTaskContext context, Predicate acceptAndContinue) {
this.name = name;
this.context = context;
this.connectionContext = context.getConnectionContext();
@@ -63,6 +70,7 @@ public AbstractReader(String name, MySqlTaskContext context) {
this.maxBatchSize = context.getConnectorConfig().getMaxBatchSize();
this.pollInterval = context.getConnectorConfig().getPollInterval();
this.metronome = Metronome.parker(pollInterval, Clock.SYSTEM);
+ this.acceptAndContinue = acceptAndContinue == null? new AcceptAllPredicate() : acceptAndContinue;
}
@Override
@@ -300,14 +308,34 @@ protected void pollComplete(List batch) {
* queue is full.
*
* @param record the record to be enqueued
+ * @return true if the record was successfully enqueued, false if not.
* @throws InterruptedException if interrupted while waiting for the queue to have room for this record
*/
- protected void enqueueRecord(SourceRecord record) throws InterruptedException {
+ protected boolean enqueueRecord(SourceRecord record) throws InterruptedException {
if (record != null) {
- if (logger.isTraceEnabled()) {
- logger.trace("Enqueuing source record: {}", record);
+ if (acceptAndContinue.test(record)) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Enqueuing source record: {}", record);
+ }
+ this.records.put(record);
+ return true;
+ } else {
+ // if we found a record we should not accept, we are done.
+ logger.info("predicate returned false; completing reader {}", this.name);
+ completeSuccessfully();
}
- this.records.put(record);
+ }
+ return false;
+ }
+
+ /**
+ * A predicate that returns true for all sourceRecords
+ */
+ public static class AcceptAllPredicate implements Predicate {
+
+ @Override
+ public boolean test(SourceRecord sourceRecord) {
+ return true;
}
}
}
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java
index 417af587b..da1e3ce32 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java
@@ -144,9 +144,22 @@ public boolean equals(Object obj) {
*
* @param name the name of this reader; may not be null
* @param context the task context in which this reader is running; may not be null
+ * @param acceptAndContinue see {@link AbstractReader#AbstractReader(String, MySqlTaskContext, Predicate)}
*/
- public BinlogReader(String name, MySqlTaskContext context) {
- super(name, context);
+ public BinlogReader(String name, MySqlTaskContext context, Predicate acceptAndContinue) {
+ this(name, context, acceptAndContinue, context.serverId());
+ }
+
+ /**
+ * Create a binlog reader.
+ *
+ * @param name the name of this reader; may not be null
+ * @param context the task context in which this reader is running; may not be null
+ * @param acceptAndContinue see {@link AbstractReader#AbstractReader(String, MySqlTaskContext, Predicate)}
+ * @param serverId the server id to use for the {@link BinaryLogClient}
+ */
+ public BinlogReader(String name, MySqlTaskContext context, Predicate acceptAndContinue, long serverId) {
+ super(name, context, acceptAndContinue);
connectionContext = context.getConnectionContext();
source = context.source();
@@ -163,7 +176,7 @@ public BinlogReader(String name, MySqlTaskContext context) {
client = new BinaryLogClient(connectionContext.hostname(), connectionContext.port(), connectionContext.username(), connectionContext.password());
// BinaryLogClient will overwrite thread names later
client.setThreadFactory(Threads.threadFactory(MySqlConnector.class, context.getConnectorConfig().getLogicalName(), "binlog-client", false));
- client.setServerId(context.serverId());
+ client.setServerId(serverId);
client.setSSLMode(sslModeFor(connectionContext.sslMode()));
client.setKeepAlive(context.config().getBoolean(MySqlConnectorConfig.KEEP_ALIVE));
client.setKeepAliveInterval(context.config().getLong(MySqlConnectorConfig.KEEP_ALIVE_INTERVAL_MS));
@@ -238,6 +251,10 @@ public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
context.getConnectorConfig().getLogicalName());
}
+ public BinlogReader(String name, MySqlTaskContext context) {
+ this(name, context, null);
+ }
+
@Override
protected void doInitialize() {
metrics.register(logger);
@@ -360,16 +377,23 @@ protected void rewindBinaryLogClient(BinlogPosition position) {
}
}
+ /**
+ * @return a copy of the last offset of this reader, or null if this reader has not completed a poll.
+ */
+ public Map getLastOffset() {
+ return lastOffset == null? null : new HashMap<>(lastOffset);
+ }
+
@Override
protected void doStop() {
try {
- if (isRunning()) {
- logger.debug("Stopping binlog reader, last recorded offset: {}", lastOffset);
+ if (client.isConnected()) {
+ logger.debug("Stopping binlog reader '{}', last recorded offset: {}", this.name(), lastOffset);
client.disconnect();
}
cleanupResources();
} catch (IOException e) {
- logger.error("Unexpected error when disconnecting from the MySQL binary log reader", e);
+ logger.error("Unexpected error when disconnecting from the MySQL binary log reader '{}'", this.name(), e);
}
}
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/ChainedReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/ChainedReader.java
index 4fce2470b..113305255 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/ChainedReader.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/ChainedReader.java
@@ -118,7 +118,7 @@ public synchronized void stop() {
Reader current = currentReader.get();
if (current != null) {
try {
- logger.info("Stopping the {} reader", current.name());
+ logger.info("ChainedReader: Stopping the {} reader", current.name());
current.stop();
} catch (Throwable t) {
logger.error("Unexpected error stopping the {} reader", current.name(), t);
@@ -196,7 +196,7 @@ private boolean startNextReader() {
// There is at least one more reader, so start it ...
Reader lastReader = currentReader.getAndSet(null);
if (lastReader != null) {
- logger.debug("Transitioning from the {} reader to the {} reader", lastReader.name(), reader.name());
+ logger.info("Transitioning from the {} reader to the {} reader", lastReader.name(), reader.name());
} else {
logger.debug("Starting the {} reader", reader.name());
}
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/Filters.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/Filters.java
index e4f0cdea8..b49c0bfde 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/Filters.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/Filters.java
@@ -7,6 +7,7 @@
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -58,38 +59,16 @@ protected static List withoutBuiltInDatabases(Collection dbNames
private final Predicate isBuiltInTable;
private final Predicate columnFilter;
- /**
- * @param config the configuration; may not be null
- */
- public Filters(Configuration config) {
- this.isBuiltInDb = Filters::isBuiltInDatabase;
- this.isBuiltInTable = Filters::isBuiltInTable;
-
- // Define the filter used for database names ...
- Predicate dbFilter = Selectors.databaseSelector()
- .includeDatabases(config.getString(MySqlConnectorConfig.DATABASE_WHITELIST))
- .excludeDatabases(config.getString(MySqlConnectorConfig.DATABASE_BLACKLIST))
- .build();
-
- // Define the filter using the whitelists and blacklists for tables and database names ...
- Predicate tableFilter = Selectors.tableSelector()
- .includeDatabases(config.getString(MySqlConnectorConfig.DATABASE_WHITELIST))
- .excludeDatabases(config.getString(MySqlConnectorConfig.DATABASE_BLACKLIST))
- .includeTables(config.getString(MySqlConnectorConfig.TABLE_WHITELIST))
- .excludeTables(config.getString(MySqlConnectorConfig.TABLE_BLACKLIST))
- .build();
-
- // Ignore built-in databases and tables ...
- if (config.getBoolean(MySqlConnectorConfig.TABLES_IGNORE_BUILTIN)) {
- this.tableFilter = tableFilter.and(isBuiltInTable.negate());
- this.dbFilter = dbFilter.and(isBuiltInDb.negate());
- } else {
- this.tableFilter = tableFilter;
- this.dbFilter = dbFilter;
- }
-
- // Define the filter that excludes blacklisted columns, truncated columns, and masked columns ...
- this.columnFilter = Selectors.excludeColumns(config.getString(MySqlConnectorConfig.COLUMN_BLACKLIST));
+ private Filters(Predicate dbFilter,
+ Predicate tableFilter,
+ Predicate isBuiltInDb,
+ Predicate isBuiltInTable,
+ Predicate columnFilter) {
+ this.dbFilter = dbFilter;
+ this.tableFilter = tableFilter;
+ this.isBuiltInDb = isBuiltInDb;
+ this.isBuiltInTable = isBuiltInTable;
+ this.columnFilter = columnFilter;
}
public Predicate databaseFilter() {
@@ -117,4 +96,151 @@ public Predicate builtInDatabaseFilter() {
public Predicate columnFilter() {
return columnFilter;
}
+
+ public static class Builder {
+
+ private Predicate dbFilter;
+ private Predicate tableFilter;
+ private Predicate isBuiltInDb = Filters::isBuiltInDatabase;
+ private Predicate isBuiltInTable = Filters::isBuiltInTable;
+ private Predicate columnFilter;
+ private final Configuration config;
+
+ /**
+ * Create a Builder for a filter.
+ * Set the initial filter data to match the filter data in the given configuration.
+ * @param config the configuration of the connector.
+ */
+ public Builder(Configuration config) {
+ this.config = config;
+ setFiltersFromStrings(config.getString(MySqlConnectorConfig.DATABASE_WHITELIST),
+ config.getString(MySqlConnectorConfig.DATABASE_BLACKLIST),
+ config.getString(MySqlConnectorConfig.TABLE_WHITELIST),
+ config.getString(MySqlConnectorConfig.TABLE_BLACKLIST));
+
+ // Define the filter that excludes blacklisted columns, truncated columns, and masked columns ...
+ this.columnFilter = Selectors.excludeColumns(config.getString(MySqlConnectorConfig.COLUMN_BLACKLIST));
+ }
+
+ /**
+ * Completely reset the filter to match the filter info in the given offsets.
+ * This will completely reset the filters to those passed in.
+ * @param offsets The offsets to set the filter info to.
+ * @return this
+ */
+ public Builder setFiltersFromOffsets(Map offsets) {
+ setFiltersFromStrings((String)offsets.get(SourceInfo.DATABASE_WHITELIST_KEY),
+ (String)offsets.get(SourceInfo.DATABASE_BLACKLIST_KEY),
+ (String)offsets.get(SourceInfo.TABLE_WHITELIST_KEY),
+ (String)offsets.get(SourceInfo.TABLE_BLACKLIST_KEY));
+ return this;
+ }
+
+ private void setFiltersFromStrings(String dbWhitelist,
+ String dbBlacklist,
+ String tableWhitelist,
+ String tableBlacklist) {
+ Predicate dbFilter = Selectors.databaseSelector()
+ .includeDatabases(dbWhitelist)
+ .excludeDatabases(dbBlacklist)
+ .build();
+
+ // Define the filter using the whitelists and blacklists for tables and database names ...
+ Predicate tableFilter = Selectors.tableSelector()
+ .includeDatabases(dbWhitelist)
+ .excludeDatabases(dbBlacklist)
+ .includeTables(tableWhitelist)
+ .excludeTables(tableBlacklist)
+ .build();
+
+ // Ignore built-in databases and tables ...
+ if (config.getBoolean(MySqlConnectorConfig.TABLES_IGNORE_BUILTIN)) {
+ this.tableFilter = tableFilter.and(isBuiltInTable.negate());
+ this.dbFilter = dbFilter.and(isBuiltInDb.negate());
+ } else {
+ this.tableFilter = tableFilter;
+ this.dbFilter = dbFilter;
+ }
+ }
+
+ /**
+ * Set the filter to match the given other filter.
+ * This will completely reset the filters to those passed in.
+ * @param filters The other filter
+ * @return this
+ */
+ public Builder setFiltersFromFilters(Filters filters) {
+ this.dbFilter = filters.dbFilter;
+ this.tableFilter = filters.tableFilter;
+ this.isBuiltInDb = filters.isBuiltInDb;
+ this.isBuiltInTable = filters.isBuiltInTable;
+ this.columnFilter = filters.columnFilter;
+ return this;
+ }
+
+ /**
+ * Exclude all those tables included by the given filter.
+ * @param otherFilter the filter
+ * @return this
+ */
+ public Builder excludeAllTables(Filters otherFilter) {
+ excludeDatabases(otherFilter.dbFilter);
+ excludeTables(otherFilter.tableFilter);
+ return this;
+ }
+
+ /**
+ * Exclude all the databases that the given predicate tests as true for.
+ * @param databases the databases to excluded
+ * @return
+ */
+ public Builder excludeDatabases(Predicate databases) {
+ this.dbFilter = this.dbFilter.and(databases.negate());
+ return this;
+ }
+
+ /**
+ * Include all the databases that the given predicate tests as true for.
+ * All databases previously included will still be included.
+ * @param databases the databases to be included
+ * @return
+ */
+ public Builder includeDatabases(Predicate databases) {
+ this.dbFilter = this.dbFilter.or(databases);
+ return this;
+ }
+
+ /**
+ * Exclude all the tables that the given predicate tests as true for.
+ * @param tables the tables to be excluded.
+ * @return this
+ */
+ public Builder excludeTables(Predicate tables) {
+ this.tableFilter = this.tableFilter.and(tables.negate());
+ return this;
+ }
+
+ /**
+ * Include the tables that the given predicate tests as true for.
+ * Tables previously included will still be included.
+ * @param tables the tables to be included.
+ * @return this
+ */
+ public Builder includeTables(Predicate tables) {
+ this.tableFilter = this.tableFilter.or(tables);
+ return this;
+ }
+
+ /**
+ * Build the filters.
+ * @return the {@link Filters}
+ */
+ public Filters build() {
+ return new Filters(this.dbFilter,
+ this.tableFilter,
+ this.isBuiltInDb,
+ this.isBuiltInTable,
+ this.columnFilter);
+ }
+ }
}
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java
index 8c2f45944..704aa34a7 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java
@@ -191,6 +191,57 @@ public static SnapshotMode parse(String value, String defaultValue) {
}
}
+ public static enum SnapshotNewTables implements EnumeratedValue {
+ /**
+ * Do not snapshot new tables
+ */
+ OFF("off"),
+
+ /**
+ * Snapshot new tables in parallel to normal binlog reading.
+ */
+ PARALLEL("parallel");
+
+ private final String value;
+
+ private SnapshotNewTables(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public String getValue() {
+ return value;
+ }
+
+ /**
+ * Determine if the supplied value is one of the predefined options.
+ *
+ * @param value the configuration property value; may not be null
+ * @return the matching option, or null if no match is found
+ */
+ public static SnapshotNewTables parse(String value) {
+ if (value == null) return null;
+ value = value.trim();
+ for (SnapshotNewTables option : SnapshotNewTables.values()) {
+ if (option.getValue().equalsIgnoreCase(value)) return option;
+ }
+ return null;
+ }
+
+ /**
+ * Determine if the supplied value is one of the predefined options.
+ *
+ * @param value the configuration property value; may not be null
+ * @param defaultValue the default value; may be null
+ * @return the matching option, or null if no match is found and the non-null default is invalid
+ */
+ public static SnapshotNewTables parse(String value, String defaultValue) {
+ SnapshotNewTables snapshotNewTables = parse(value);
+ if (snapshotNewTables == null && defaultValue != null) snapshotNewTables = parse(defaultValue);
+ return snapshotNewTables;
+ }
+ }
+
/**
* The set of predefined Snapshot Locking Mode options.
*/
@@ -497,7 +548,9 @@ public static DdlParsingMode parse(String value, String defaultValue) {
}
private static final String DATABASE_WHITELIST_NAME = "database.whitelist";
+ private static final String DATABASE_BLACKLIST_NAME = "database.blacklist";
private static final String TABLE_WHITELIST_NAME = "table.whitelist";
+ private static final String TABLE_BLACKLIST_NAME = "table.blacklist";
private static final String TABLE_IGNORE_BUILTIN_NAME = "table.ignore.builtin";
/**
@@ -572,6 +625,17 @@ public static DdlParsingMode parse(String value, String defaultValue) {
+ "MySQL database cluster as another server (with this unique ID) so it can read "
+ "the binlog. By default, a random number is generated between 5400 and 6400.");
+ public static final Field SERVER_ID_OFFSET = Field.create("database.server.id.offset")
+ .withDisplayName("Cluster ID offset")
+ .withType(Type.LONG)
+ .withWidth(Width.LONG)
+ .withImportance(Importance.HIGH)
+ .withDefault(10000L)
+ .withDescription("Only relevant in parallel snapshotting is configured. During "
+ + "parallel snapshotting, multiple (4) connections open to the database "
+ + "client, and they each need their own unique connection ID. This offset is "
+ + "used to generate those IDs from the base configured cluster ID.");
+
public static final Field SSL_MODE = Field.create("database.ssl.mode")
.withDisplayName("SSL mode")
.withEnum(SecureConnectionMode.class, SecureConnectionMode.DISABLED)
@@ -649,7 +713,7 @@ public static DdlParsingMode parse(String value, String defaultValue) {
* A comma-separated list of regular expressions that match database names to be excluded from monitoring.
* May not be used with {@link #DATABASE_WHITELIST}.
*/
- public static final Field DATABASE_BLACKLIST = Field.create("database.blacklist")
+ public static final Field DATABASE_BLACKLIST = Field.create(DATABASE_BLACKLIST_NAME)
.withDisplayName("Exclude Databases")
.withType(Type.STRING)
.withWidth(Width.LONG)
@@ -677,7 +741,7 @@ public static DdlParsingMode parse(String value, String defaultValue) {
* monitoring. Fully-qualified names for tables are of the form {@code .} or
* {@code ..}. May not be used with {@link #TABLE_WHITELIST}.
*/
- public static final Field TABLE_BLACKLIST = Field.create("table.blacklist")
+ public static final Field TABLE_BLACKLIST = Field.create(TABLE_BLACKLIST_NAME)
.withDisplayName("Exclude Tables")
.withType(Type.STRING)
.withWidth(Width.LONG)
@@ -878,6 +942,19 @@ public static DdlParsingMode parse(String value, String defaultValue) {
+ "'schema_only_recovery' and is only safe to use if no schema changes are happening while the snapshot is taken.")
.withValidation(MySqlConnectorConfig::validateSnapshotLockingMode);
+ public static final Field SNAPSHOT_NEW_TABLES = Field.create("snapshot.new.tables")
+ .withDisplayName("Snapshot newly added tables")
+ .withEnum(SnapshotNewTables.class, SnapshotNewTables.OFF)
+ .withWidth(Width.SHORT)
+ .withImportance(Importance.LOW)
+ .withDescription("BETA FEATURE: On connector restart, the connector will check if there have been any new tables added to the configuration, "
+ + "and snapshot them. There is presently only two options:"
+ + "'off': Default behavior. Do not snapshot new tables."
+ + "'parallel': The snapshot of the new tables will occur in parallel to the continued binlog reading of the old tables. When the snapshot "
+ + "completes, an independent binlog reader will begin reading the events for the new tables until it catches up to present time. At this "
+ + "point, both old and new binlog readers will be momentarily halted and new binlog reader will start that will read the binlog for all "
+ + "configured tables. The parallel binlog reader will have a configured server id of 10000 + the primary binlog reader's server id.");
+
public static final Field TIME_PRECISION_MODE = Field.create("time.precision.mode")
.withDisplayName("Time Precision")
.withEnum(TemporalPrecisionMode.class, TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS)
@@ -973,7 +1050,7 @@ public static final Field MASK_COLUMN(int length) {
/**
* The set of {@link Field}s defined as part of this configuration.
*/
- public static Field.Set ALL_FIELDS = Field.setOf(USER, PASSWORD, HOSTNAME, PORT, ON_CONNECT_STATEMENTS, SERVER_ID,
+ public static Field.Set ALL_FIELDS = Field.setOf(USER, PASSWORD, HOSTNAME, PORT, ON_CONNECT_STATEMENTS, SERVER_ID, SERVER_ID_OFFSET,
SERVER_NAME,
CONNECTION_TIMEOUT_MS, KEEP_ALIVE, KEEP_ALIVE_INTERVAL_MS,
CommonConnectorConfig.MAX_QUEUE_SIZE,
@@ -983,7 +1060,8 @@ public static final Field MASK_COLUMN(int length) {
Heartbeat.HEARTBEAT_TOPICS_PREFIX, DATABASE_HISTORY, INCLUDE_SCHEMA_CHANGES, INCLUDE_SQL_QUERY,
TABLE_WHITELIST, TABLE_BLACKLIST, TABLES_IGNORE_BUILTIN,
DATABASE_WHITELIST, DATABASE_BLACKLIST,
- COLUMN_BLACKLIST, SNAPSHOT_MODE, SNAPSHOT_MINIMAL_LOCKING, SNAPSHOT_LOCKING_MODE,
+ COLUMN_BLACKLIST,
+ SNAPSHOT_MODE, SNAPSHOT_NEW_TABLES, SNAPSHOT_MINIMAL_LOCKING, SNAPSHOT_LOCKING_MODE,
GTID_SOURCE_INCLUDES, GTID_SOURCE_EXCLUDES,
GTID_SOURCE_FILTER_DML_EVENTS,
GTID_NEW_CHANNEL_POSITION,
@@ -1057,7 +1135,7 @@ public GtidNewChannelPosition gtidNewChannelPosition() {
protected static ConfigDef configDef() {
ConfigDef config = new ConfigDef();
- Field.group(config, "MySQL", HOSTNAME, PORT, USER, PASSWORD, ON_CONNECT_STATEMENTS, SERVER_NAME, SERVER_ID,
+ Field.group(config, "MySQL", HOSTNAME, PORT, USER, PASSWORD, ON_CONNECT_STATEMENTS, SERVER_NAME, SERVER_ID, SERVER_ID_OFFSET,
SSL_MODE, SSL_KEYSTORE, SSL_KEYSTORE_PASSWORD, SSL_TRUSTSTORE, SSL_TRUSTSTORE_PASSWORD, JDBC_DRIVER);
Field.group(config, "History Storage", KafkaDatabaseHistory.BOOTSTRAP_SERVERS,
KafkaDatabaseHistory.TOPIC, KafkaDatabaseHistory.RECOVERY_POLL_ATTEMPTS,
@@ -1071,7 +1149,7 @@ protected static ConfigDef configDef() {
CommonConnectorConfig.TOMBSTONES_ON_DELETE);
Field.group(config, "Connector", CONNECTION_TIMEOUT_MS, KEEP_ALIVE, KEEP_ALIVE_INTERVAL_MS, CommonConnectorConfig.MAX_QUEUE_SIZE,
CommonConnectorConfig.MAX_BATCH_SIZE, CommonConnectorConfig.POLL_INTERVAL_MS,
- SNAPSHOT_MODE, SNAPSHOT_LOCKING_MODE, SNAPSHOT_MINIMAL_LOCKING, TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE,
+ SNAPSHOT_MODE, SNAPSHOT_LOCKING_MODE, SNAPSHOT_NEW_TABLES, SNAPSHOT_MINIMAL_LOCKING, TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE,
BIGINT_UNSIGNED_HANDLING_MODE, SNAPSHOT_DELAY_MS, DDL_PARSER_MODE);
return config;
}
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java
index d7a4bd30d..f911bd145 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java
@@ -7,10 +7,16 @@
import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import io.debezium.util.LoggingContext;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
@@ -22,6 +28,7 @@
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
import io.debezium.schema.TopicSelector;
+import io.debezium.util.Collect;
import io.debezium.util.LoggingContext.PreviousContext;
/**
@@ -54,20 +61,21 @@ public String version() {
@Override
public synchronized void start(Configuration config) {
- // Create and start the task context ...
- this.taskContext = new MySqlTaskContext(config);
- this.connectionContext = taskContext.getConnectionContext();
+ final String serverName = config.getString(MySqlConnectorConfig.SERVER_NAME);
+ PreviousContext prevLoggingContext = LoggingContext.forConnector("MySQL", serverName, "task");
- PreviousContext prevLoggingContext = this.taskContext.configureLoggingContext("task");
try {
- this.taskContext.start();
-
// Get the offsets for our partition ...
boolean startWithSnapshot = false;
boolean snapshotEventsAreInserts = true;
- final SourceInfo source = taskContext.source();
- Map offsets = context.offsetStorageReader().offset(taskContext.source().partition());
+ Map partition = Collect.hashMapOf(SourceInfo.SERVER_PARTITION_KEY, serverName);
+ Map offsets = getRestartOffset(context.offsetStorageReader().offset(partition));
+ final SourceInfo source;
if (offsets != null) {
+ Filters filters = SourceInfo.offsetsHaveFilterInfo(offsets) ? getOldFilters(offsets, config) : getAllFilters(config);
+ this.taskContext = createAndStartTaskContext(config, filters);
+ this.connectionContext = taskContext.getConnectionContext();
+ source = taskContext.source();
// Set the position in our source info ...
source.setOffset(offsets);
logger.info("Found existing offset: {}", offsets);
@@ -124,7 +132,11 @@ public synchronized void start(Configuration config) {
} else {
// We have no recorded offsets ...
+ this.taskContext = createAndStartTaskContext(config, getAllFilters(config));
taskContext.initializeHistoryStorage();
+ this.connectionContext = taskContext.getConnectionContext();
+ source = taskContext.source();
+
if (taskContext.isSnapshotNeverAllowed()) {
// We're not allowed to take a snapshot, so instead we have to assume that the binlog contains the
// full history of the database.
@@ -161,7 +173,6 @@ public synchronized void start(Configuration config) {
ChainedReader.Builder chainedReaderBuilder = new ChainedReader.Builder();
// Set up the readers, with a callback to `completeReaders` so that we know when it is finished ...
- BinlogReader binlogReader = new BinlogReader("binlog", taskContext);
if (startWithSnapshot) {
// We're supposed to start with a snapshot, so set that up ...
SnapshotReader snapshotReader = new SnapshotReader("snapshot", taskContext);
@@ -183,15 +194,57 @@ public synchronized void start(Configuration config) {
+ "required for this connector to work properly. Change the MySQL configuration to use a "
+ "row-level binlog and restart the connector.");
}
+ BinlogReader binlogReader = new BinlogReader("binlog", taskContext, null);
chainedReaderBuilder.addReader(binlogReader);
}
} else {
+ if (!source.hasFilterInfo()) {
+ // if we don't have filter info, then either
+ // 1. the snapshot was taken in a version of debezium before the filter info was stored in the offsets, or
+ // 2. this connector previously had no filter information.
+ // either way, we have to assume that the filter information currently in the config accurately reflects
+ // the current state of the connector.
+ source.maybeSetFilterDataFromConfig(config);
+ }
if (!rowBinlogEnabled) {
throw new ConnectException(
"The MySQL server does not appear to be using a row-level binlog, which is required for this connector to work properly. Enable this mode and restart the connector.");
}
- // We're going to start by reading the binlog ...
- chainedReaderBuilder.addReader(binlogReader);
+
+
+ // if there are new tables
+ if (newTablesInConfig()) {
+ // and we are configured to run a parallel snapshot
+ if (taskContext.snapshotNewTables() == MySqlConnectorConfig.SnapshotNewTables.PARALLEL) {
+ ServerIdGenerator serverIdGenerator =
+ new ServerIdGenerator(config.getLong(MySqlConnectorConfig.SERVER_ID),
+ config.getLong(MySqlConnectorConfig.SERVER_ID_OFFSET));
+ ParallelSnapshotReader parallelSnapshotReader = new ParallelSnapshotReader(config,
+ taskContext,
+ getNewFilters(offsets, config),
+ serverIdGenerator);
+
+ MySqlTaskContext unifiedTaskContext = createAndStartTaskContext(config, getAllFilters(config));
+ // we aren't completing a snapshot, but we need to make sure the "snapshot" flag is false for this new context.
+ unifiedTaskContext.source().completeSnapshot();
+ BinlogReader unifiedBinlogReader = new BinlogReader("binlog",
+ unifiedTaskContext,
+ null,
+ serverIdGenerator.getConfiguredServerId());
+ ReconcilingBinlogReader reconcilingBinlogReader = parallelSnapshotReader.createReconcilingBinlogReader(unifiedBinlogReader);
+
+ chainedReaderBuilder.addReader(parallelSnapshotReader);
+ chainedReaderBuilder.addReader(reconcilingBinlogReader);
+ chainedReaderBuilder.addReader(unifiedBinlogReader);
+
+ unifiedBinlogReader.uponCompletion(unifiedTaskContext::shutdown);
+ }
+ } else {
+ // We're going to start by reading the binlog ...
+ BinlogReader binlogReader = new BinlogReader("binlog", taskContext, null);
+ chainedReaderBuilder.addReader(binlogReader);
+ }
+
}
readers = chainedReaderBuilder.build();
@@ -222,6 +275,133 @@ public synchronized void start(Configuration config) {
}
}
+ public class ServerIdGenerator {
+
+ private final long configuredServerId;
+ private final long offset;
+ private int counter;
+
+ private ServerIdGenerator(long configuredServerId, long configuredOffset) {
+ this.configuredServerId = configuredServerId;
+ this.offset = configuredOffset;
+ this.counter = 0;
+ }
+
+ public long getNextServerId() {
+ counter++;
+ return configuredServerId + (counter * offset);
+ }
+
+ public long getConfiguredServerId() {
+ return configuredServerId;
+ }
+ }
+
+ /**
+ * Get the offset to restart the connector from. Normally, this is just the stored offset.
+ *
+ * However, if we were doing a parallel load with new tables, it's possible that the last
+ * committed offset is from reading the new tables, which could be beyond where we want to
+ * restart from (and restarting there could cause skipped events). To fix this, the new
+ * tables binlog reader records extra information in its offset to tell the connector where
+ * to restart from. If this extra information is present in the stored offset, that is the
+ * offset that is returned.
+ * @param storedOffset the stored offset.
+ * @return the offset to restart from.
+ * @see RecordMakers#RecordMakers(MySqlSchema, SourceInfo, TopicSelector, boolean, Map)
+ */
+ @SuppressWarnings("unchecked")
+ private Map getRestartOffset(Map storedOffset) {
+ Map restartOffset = new HashMap<>();
+ if (storedOffset != null) {
+ for (String key : storedOffset.keySet()){
+ if (key.startsWith(SourceInfo.RESTART_PREFIX)) {
+ String newKey = key.substring(SourceInfo.RESTART_PREFIX.length());
+ restartOffset.put(newKey, storedOffset.get(key));
+ }
+ }
+ }
+ return restartOffset.isEmpty()? storedOffset : restartOffset;
+ }
+
+ private static MySqlTaskContext createAndStartTaskContext(Configuration config,
+ Filters filters) {
+ MySqlTaskContext taskContext = new MySqlTaskContext(config, filters);
+ taskContext.start();
+ return taskContext;
+ }
+
+ /**
+ * @return true if new tables appear to have been added to the config, and false otherwise.
+ */
+ private boolean newTablesInConfig() {
+ final String elementSep = "/s*,/s*";
+
+ // take in two stringified lists, and return true if the first list contains elements that are not in the second list
+ BiFunction hasExclusiveElements = (String a, String b) -> {
+ if (a == null || a.isEmpty()) {
+ return false;
+ } else if (b == null || b.isEmpty()) {
+ return true;
+ }
+ Set bSet = Stream.of(b.split(elementSep)).collect(Collectors.toSet());
+ return !Stream.of(a.split(elementSep)).filter((x) -> !bSet.contains(x)).collect(Collectors.toSet()).isEmpty();
+ };
+
+ final SourceInfo sourceInfo = taskContext.source();
+ final Configuration config = taskContext.config();
+ if (!sourceInfo.hasFilterInfo()) {
+ // if there was previously no filter info, then we either can't evaluate if there are new tables,
+ // or there aren't any new tables because we previously used no filter.
+ return false;
+ }
+ // otherwise, we have filter info
+ // if either whitelist has been added to, then we may have new tables
+
+ if (hasExclusiveElements.apply(config.getString(MySqlConnectorConfig.DATABASE_WHITELIST), sourceInfo.getDatabaseWhitelist())) {
+ return true;
+ }
+ if (hasExclusiveElements.apply(config.getString(MySqlConnectorConfig.TABLE_WHITELIST), sourceInfo.getTableWhitelist())) {
+ return true;
+ }
+ // if either blacklist has been removed from, then we may have new tables
+ if (hasExclusiveElements.apply(sourceInfo.getDatabaseBlacklist(), config.getString(MySqlConnectorConfig.DATABASE_BLACKLIST))) {
+ return true;
+ }
+ if (hasExclusiveElements.apply(sourceInfo.getTableBlacklist(), config.getString(MySqlConnectorConfig.TABLE_BLACKLIST))) {
+ return true;
+ }
+ // otherwise, false.
+ return false;
+ }
+
+ /**
+ * Get the filters representing the tables that have been newly added to the config, but
+ * not those that previously existed in the config.
+ * @return {@link Filters}
+ */
+ private static Filters getNewFilters(Map offsets, Configuration config) {
+ Filters oldFilters = getOldFilters(offsets, config);
+ return new Filters.Builder(config).excludeAllTables(oldFilters).build();
+ }
+
+ /**
+ * Get the filters representing those tables that previously existed in the config, but
+ * not those newly added to the config.
+ * @return {@link Filters}
+ */
+ private static Filters getOldFilters(Map offsets, Configuration config) {
+ return new Filters.Builder(config).setFiltersFromOffsets(offsets).build();
+ }
+
+ /**
+ * Get the filters representing all tables represented by the config.
+ * @return {@link Filters}
+ */
+ private static Filters getAllFilters(Configuration config) {
+ return new Filters.Builder(config).build();
+ }
+
@Override
public List poll() throws InterruptedException {
Reader currentReader = readers;
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java
index d234d078c..571473642 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java
@@ -82,12 +82,16 @@ public class MySqlSchema extends RelationalDatabaseSchema {
* may be null if not needed
* @param tableIdCaseInsensitive true if table lookup ignores letter case
*/
- public MySqlSchema(MySqlConnectorConfig configuration, Predicate gtidFilter, boolean tableIdCaseInsensitive, TopicSelector topicSelector) {
+ public MySqlSchema(MySqlConnectorConfig configuration,
+ Predicate gtidFilter,
+ boolean tableIdCaseInsensitive,
+ TopicSelector topicSelector,
+ Filters tableFilters) {
super(
configuration,
topicSelector,
- TableFilter.fromPredicate(new Filters(configuration.getConfig()).tableFilter()),
- new Filters(configuration.getConfig()).columnFilter(),
+ TableFilter.fromPredicate(new Filters.Builder(configuration.getConfig()).build().tableFilter()),
+ new Filters.Builder(configuration.getConfig()).build().columnFilter(),
new TableSchemaBuilder(
getValueConverters(configuration), SchemaNameAdjuster.create(logger), SourceInfo.SCHEMA)
,
@@ -96,7 +100,7 @@ public MySqlSchema(MySqlConnectorConfig configuration, Predicate gtidFil
Configuration config = configuration.getConfig();
- this.filters = new Filters(config);
+ this.filters = tableFilters;
// Do not remove the prefix from the subset of config properties ...
String connectorName = config.getString("name", configuration.getLogicalName());
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java
index 8aa577bb9..39c2b3bc4 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java
@@ -17,6 +17,7 @@
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.connector.mysql.MySqlConnectorConfig.GtidNewChannelPosition;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
+import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotNewTables;
import io.debezium.function.Predicates;
import io.debezium.relational.TableId;
import io.debezium.relational.history.DatabaseHistory;
@@ -49,12 +50,15 @@ public final class MySqlTaskContext extends CdcSourceTaskContext {
*/
private final boolean tableIdCaseInsensitive;
- public MySqlTaskContext(Configuration config) {
- this(config, null);
+ public MySqlTaskContext(Configuration config, Filters filters) {
+ this(config, filters, null, null);
}
- public MySqlTaskContext(Configuration config, Boolean tableIdCaseInsensitive) {
- // MySQL now calculates JMX binlog reader metrics on its own
+ public MySqlTaskContext(Configuration config, Filters filters, Map restartOffset) {
+ this(config, filters, null, restartOffset);
+ }
+
+ public MySqlTaskContext(Configuration config, Filters filters, Boolean tableIdCaseInsensitive, Map restartOffset) {
super("MySQL", config.getString(MySqlConnectorConfig.SERVER_NAME), Collections::emptyList);
this.config = config;
@@ -81,10 +85,10 @@ public MySqlTaskContext(Configuration config, Boolean tableIdCaseInsensitive) {
}
// Set up the MySQL schema ...
- this.dbSchema = new MySqlSchema(connectorConfig, this.gtidSourceFilter, this.tableIdCaseInsensitive, topicSelector);
+ this.dbSchema = new MySqlSchema(connectorConfig, this.gtidSourceFilter, this.tableIdCaseInsensitive, topicSelector, filters);
// Set up the record processor ...
- this.recordProcessor = new RecordMakers(dbSchema, source, topicSelector, config.getBoolean(CommonConnectorConfig.TOMBSTONES_ON_DELETE));
+ this.recordProcessor = new RecordMakers(dbSchema, source, topicSelector, config.getBoolean(CommonConnectorConfig.TOMBSTONES_ON_DELETE), restartOffset);
// Set up the DDL filter
final String ddlFilter = config.getString(DatabaseHistory.DDL_FILTER);
@@ -242,6 +246,11 @@ protected SnapshotMode snapshotMode() {
return SnapshotMode.parse(value, MySqlConnectorConfig.SNAPSHOT_MODE.defaultValueAsString());
}
+ protected SnapshotNewTables snapshotNewTables() {
+ String value = config.getString(MySqlConnectorConfig.SNAPSHOT_NEW_TABLES);
+ return SnapshotNewTables.parse(value, MySqlConnectorConfig.SNAPSHOT_NEW_TABLES.defaultValueAsString());
+ }
+
public String getSnapshotSelectOverrides() {
return config.getString(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE);
}
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/ParallelSnapshotReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/ParallelSnapshotReader.java
new file mode 100644
index 000000000..b50e5f474
--- /dev/null
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/ParallelSnapshotReader.java
@@ -0,0 +1,243 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.connector.mysql;
+
+import io.debezium.config.Configuration;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
+
+/**
+ * A reader that runs a {@link ChainedReader} consisting of a {@link SnapshotReader} and a {@link BinlogReader}
+ * for all tables newly added to the config in parallel with a {@link BinlogReader} for all the tables previously
+ * in the config.
+ */
+public class ParallelSnapshotReader implements Reader {
+
+ private final Logger logger = LoggerFactory.getLogger(getClass());
+
+ private final BinlogReader oldTablesReader;
+ private final BinlogReader newTablesBinlogReader;
+ private final ChainedReader newTablesReader;
+
+ private final AtomicBoolean running = new AtomicBoolean(false);
+ private final AtomicBoolean completed = new AtomicBoolean(false);
+ private final AtomicReference uponCompletion = new AtomicReference<>();
+
+ private final MySqlConnectorTask.ServerIdGenerator serverIdGenerator;
+
+ /**
+ * Create a ParallelSnapshotReader.
+ *
+ * @param config the current connector configuration.
+ * @param noSnapshotContext The context for those tables not undergoing a snapshot.
+ * @param snapshotFilters {@link Filters} matching the tables that should be snapshotted.
+ * @param serverIdGenerator a generator for creating unconflicting serverIds.
+ */
+ public ParallelSnapshotReader(Configuration config,
+ MySqlTaskContext noSnapshotContext,
+ Filters snapshotFilters,
+ MySqlConnectorTask.ServerIdGenerator serverIdGenerator) {
+ this.serverIdGenerator = serverIdGenerator;
+ AtomicBoolean oldTablesReaderNearEnd = new AtomicBoolean(false);
+ AtomicBoolean newTablesReaderNearEnd = new AtomicBoolean(false);
+ ParallelHaltingPredicate oldTablesReaderHaltingPredicate =
+ new ParallelHaltingPredicate(oldTablesReaderNearEnd, newTablesReaderNearEnd);
+ ParallelHaltingPredicate newTablesReaderHaltingPredicate =
+ new ParallelHaltingPredicate(newTablesReaderNearEnd, oldTablesReaderNearEnd);
+
+ this.oldTablesReader = new BinlogReader("oldBinlog",
+ noSnapshotContext,
+ oldTablesReaderHaltingPredicate,
+ serverIdGenerator.getNextServerId());
+
+ MySqlTaskContext newTablesContext = new MySqlTaskContext(config,
+ snapshotFilters,
+ noSnapshotContext.source().offset());
+ newTablesContext.start();
+ SnapshotReader newTablesSnapshotReader = new SnapshotReader("newSnapshot", newTablesContext);
+
+ this.newTablesBinlogReader = new BinlogReader("newBinlog",
+ newTablesContext,
+ newTablesReaderHaltingPredicate,
+ serverIdGenerator.getNextServerId());
+ this.newTablesReader = new ChainedReader.Builder().addReader(newTablesSnapshotReader).addReader(newTablesBinlogReader).build();
+ }
+
+ // for testing purposes
+ /*package private*/ ParallelSnapshotReader(BinlogReader oldTablesBinlogReader,
+ SnapshotReader newTablesSnapshotReader,
+ BinlogReader newTablesBinlogReader) {
+ this.oldTablesReader = oldTablesBinlogReader;
+ this.newTablesBinlogReader = newTablesBinlogReader;
+ this.newTablesReader = new ChainedReader.Builder().addReader(newTablesSnapshotReader).addReader(newTablesBinlogReader).build();
+ this.serverIdGenerator = null;
+ }
+
+ /**
+ * Create and return a {@link ReconcilingBinlogReader} for the two binlog readers contained in this
+ * ParallelSnapshotReader.
+ * @return a {@link ReconcilingBinlogReader}
+ */
+ public ReconcilingBinlogReader createReconcilingBinlogReader(BinlogReader unifiedReader) {
+ return new ReconcilingBinlogReader(oldTablesReader,
+ newTablesBinlogReader,
+ unifiedReader,
+ serverIdGenerator.getNextServerId());
+ }
+
+ @Override
+ public void uponCompletion(Runnable handler) {
+ uponCompletion.set(handler);
+ }
+
+ @Override
+ public void initialize() {
+ oldTablesReader.initialize();
+ newTablesReader.initialize();
+ }
+
+ @Override
+ public void start() {
+ if (running.compareAndSet(false, true)) {
+ oldTablesReader.start();
+ newTablesReader.start();
+ }
+ }
+
+ @Override
+ public void stop() {
+ if (running.compareAndSet(true, false)) {
+ try {
+ logger.info("Stopping the {} reader", oldTablesReader.name());
+ oldTablesReader.stop();
+ oldTablesReader.context.shutdown();
+ } catch (Throwable t) {
+ logger.error("Unexpected error stopping the {} reader", oldTablesReader.name());
+ }
+
+ try {
+ logger.info("Stopping the {} reader", newTablesReader.name());
+ newTablesReader.stop();
+ oldTablesReader.context.shutdown();
+ } catch (Throwable t) {
+ logger.error("Unexpected error stopping the {} reader", newTablesReader.name());
+ }
+ }
+ }
+
+ @Override
+ public State state() {
+ if (running.get()) {
+ return State.RUNNING;
+ }
+ return State.STOPPED;
+ }
+
+
+ @Override
+ public List poll() throws InterruptedException {
+ // the old tables reader is a raw BinlogReader and will throw an exception of poll is called when it is not running.
+ List allRecords = oldTablesReader.isRunning()? oldTablesReader.poll() : null;
+ List newTablesRecords = newTablesReader.poll();
+ if (newTablesRecords != null) {
+ if (allRecords == null) {
+ allRecords = newTablesRecords;
+ } else {
+ allRecords.addAll(newTablesRecords);
+ }
+ }
+ else {
+ // else newTableRecords == null
+ if (allRecords == null) {
+ // if both readers have stopped, we need to stop.
+ completeSuccessfully();
+ }
+ }
+ return allRecords;
+ }
+
+ private void completeSuccessfully() {
+ if (completed.compareAndSet(false, true)) {
+ stop();
+ Runnable completionHandler = uponCompletion.getAndSet(null); // set to null so that we call it only once
+ if (completionHandler != null) {
+ completionHandler.run();
+ }
+ }
+ }
+
+ @Override
+ public String name() {
+ return "parallelSnapshotReader";
+ }
+
+ /**
+ * A Halting Predicate for the parallel snapshot reader.
+ * Usage for this predicate assumes two readers using two ParallelHalting Predicates.
+ * The booleans are owned by the two predicates, and keep track of whether they or the
+ * other reader has reached the end of the binlog.
+ *
+ * Test returns false if both both readers have been determined to be near the end of the
+ * binlog.
+ * Being near the end of the binlog is determined to be this predicate having seen a record
+ * with a timestamp within {@link ParallelHaltingPredicate#DEFAULT_MIN_HALTING_DURATION} of
+ * the current time. Once a single record near the end of the binlog has been seen, we
+ * we assume the reader will stay near the end of the binlog.
+ */
+ /*package local*/ static class ParallelHaltingPredicate implements Predicate {
+
+ private final Logger logger = LoggerFactory.getLogger(getClass());
+
+ private volatile AtomicBoolean thisReaderNearEnd;
+ private volatile AtomicBoolean otherReaderNearEnd;
+
+ // The minimum duration we must be within before we attempt to halt.
+ private final Duration minHaltingDuration;
+ // is hard coded in as 5 minutes.
+ private static final Duration DEFAULT_MIN_HALTING_DURATION = Duration.ofMinutes(5);
+
+ /*package local*/ ParallelHaltingPredicate(AtomicBoolean thisReaderNearEndRef,
+ AtomicBoolean otherReaderNearEndRef) {
+ this(thisReaderNearEndRef, otherReaderNearEndRef, DEFAULT_MIN_HALTING_DURATION);
+ }
+
+ /*package local*/ ParallelHaltingPredicate(AtomicBoolean thisReaderNearEndRef,
+ AtomicBoolean otherReaderNearEndRef,
+ Duration minHaltingDuration) {
+ this.otherReaderNearEnd = otherReaderNearEndRef;
+ this.thisReaderNearEnd = thisReaderNearEndRef;
+ this.minHaltingDuration = minHaltingDuration;
+ }
+
+ @Override
+ public boolean test(SourceRecord ourSourceRecord) {
+ // we assume if we ever end up near the end of the binlog, then we will remain there.
+ if (!thisReaderNearEnd.get()) {
+ Long sourceRecordTimestamp = (Long) ourSourceRecord.sourceOffset().get(SourceInfo.TIMESTAMP_KEY);
+ Instant recordTimestamp = Instant.ofEpochSecond(sourceRecordTimestamp);
+ Instant now = Instant.now();
+ Duration durationToEnd =
+ Duration.between(recordTimestamp,
+ now);
+ if (durationToEnd.compareTo(minHaltingDuration) <= 0) {
+ // we are within minHaltingDuration of the end
+ logger.debug("Parallel halting predicate: this reader near end");
+ thisReaderNearEnd.set(true);
+ }
+ }
+ // return false if both readers are near end, true otherwise.
+ return !(thisReaderNearEnd.get() && otherReaderNearEnd.get());
+ }
+ }
+}
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/ReconcilingBinlogReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/ReconcilingBinlogReader.java
new file mode 100644
index 000000000..4bf427cb9
--- /dev/null
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/ReconcilingBinlogReader.java
@@ -0,0 +1,229 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.connector.mysql;
+
+import io.debezium.document.Document;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
+
+import static io.debezium.connector.mysql.SourceInfo.BINLOG_FILENAME_OFFSET_KEY;
+import static io.debezium.connector.mysql.SourceInfo.BINLOG_POSITION_OFFSET_KEY;
+
+/**
+ * A reader that unifies the binlog positions of two binlog readers.
+ *
+ * To do this, at start time we evaluate the (now completed) states of the two binlog
+ * readers we want to unify, and create a new {@link BinlogReader} duplicating the
+ * lagging reader, but with a halting predicate that will halt it once it has passed the
+ * final position of the leading reader.
+ */
+public class ReconcilingBinlogReader implements Reader {
+
+ private final Logger logger = LoggerFactory.getLogger(getClass());
+
+ private final BinlogReader binlogReaderA;
+ private final BinlogReader binlogReaderB;
+ private final BinlogReader unifiedReader;
+
+ private BinlogReader reconcilingReader;
+
+ private Boolean aReaderLeading = null;
+
+ private final AtomicBoolean running = new AtomicBoolean(false);
+ private final AtomicBoolean completed = new AtomicBoolean(false);
+ private final AtomicReference uponCompletion = new AtomicReference<>();
+
+ private final long serverId;
+
+ /**
+ * Create a reconciling Binlog Reader.
+ *
+ * @param binlogReaderA the first binlog reader to unify.
+ * @param binlogReaderB the second binlog reader to unify.
+ * @param unifiedReader the final, unified binlog reader that will run once the reconciliation is complete.
+ */
+ public ReconcilingBinlogReader(BinlogReader binlogReaderA,
+ BinlogReader binlogReaderB,
+ BinlogReader unifiedReader,
+ long serverId) {
+ this.binlogReaderA = binlogReaderA;
+ this.binlogReaderB = binlogReaderB;
+ this.unifiedReader = unifiedReader;
+ this.serverId = serverId;
+ }
+
+ @Override
+ public String name() {
+ return "reconcilingBinlogReader";
+ }
+
+ @Override
+ public State state() {
+ if (running.get()) {
+ return State.RUNNING;
+ }
+ return completed.get() ? State.STOPPED : State.STOPPING;
+ }
+
+ @Override
+ public void uponCompletion(Runnable handler) {
+ uponCompletion.set(handler);
+ }
+
+ @Override
+ public void start() {
+ if (running.compareAndSet(false, true)) {
+ completed.set(false);
+ determineLeadingReader();
+
+ MySqlTaskContext laggingReaderContext = getLaggingReader().context;
+ OffsetLimitPredicate offsetLimitPredicate =
+ new OffsetLimitPredicate(getLeadingReader().getLastOffset(),
+ laggingReaderContext.gtidSourceFilter());
+
+ // create our actual reader
+ reconcilingReader = new BinlogReader("innerReconcilingReader",
+ laggingReaderContext,
+ offsetLimitPredicate,
+ serverId);
+ reconcilingReader.start();
+ }
+ }
+
+ @Override
+ public void stop() {
+ if (running.compareAndSet(true, false)){
+ try {
+ logger.info("Stopping the {} reader", reconcilingReader.name());
+ reconcilingReader.stop();
+ reconcilingReader.context.shutdown();
+ } catch (Throwable t) {
+ logger.error("Unexpected error stopping the {} reader", reconcilingReader.name());
+ }
+ }
+ }
+
+ @Override
+ public List poll() throws InterruptedException {
+ List innerReaderPoll = reconcilingReader.poll();
+ if (innerReaderPoll == null) {
+ completeSuccessfully();
+ }
+ return innerReaderPoll;
+ }
+
+ private void completeSuccessfully() {
+ if (completed.compareAndSet(false, true)){
+ stop();
+ setupUnifiedReader();
+ logger.info("Completed Reconciliation of Parallel Readers.");
+
+ Runnable completionHandler = uponCompletion.getAndSet(null); // set to null so that we call it only once
+ if (completionHandler != null) {
+ completionHandler.run();
+ }
+ }
+ }
+
+ private void setupUnifiedReader() {
+ unifiedReader.context.loadHistory(getLeadingReader().context.source());
+ unifiedReader.context.source().setFilterDataFromConfig(unifiedReader.context.config());
+ Map keyedOffset =
+ reconcilingReader.getLastOffset() == null ?
+ getLeadingReader().getLastOffset() :
+ reconcilingReader.getLastOffset();
+ unifiedReader.context.source()
+ .setBinlogStartPoint((String) keyedOffset.get(BINLOG_FILENAME_OFFSET_KEY),
+ (Long) keyedOffset.get(BINLOG_POSITION_OFFSET_KEY));
+ // note: this seems to dupe -one- event in my tests.
+ // I don't totally understand why that's happening (that is, I don't understand
+ // why the lastOffset seems to be before the actual last record) but this seems
+ // like a minor issue to me.
+ }
+
+ private void determineLeadingReader() {
+ Map aOffset = binlogReaderA.getLastOffset();
+ Map bOffset = binlogReaderB.getLastOffset();
+ boolean aNotStopped = binlogReaderA.state() != State.STOPPED;
+ boolean bNotStopped = binlogReaderB.state() != State.STOPPED;
+ boolean noOffsets = aOffset == null && bOffset == null;
+ if (noOffsets || aNotStopped || bNotStopped) {
+ throw new IllegalStateException("Cannot determine leading reader until both source readers have completed.");
+ }
+
+ // if one reader has not processed any events, its 'lastOffset' will be null.
+ // in this case, it must the be the lagging reader.
+ if (aOffset == null) {
+ aReaderLeading = false;
+ } else if (bOffset == null) {
+ aReaderLeading = true;
+ } else {
+ Document aDocument = SourceInfo.createDocumentFromOffset(aOffset);
+ Document bDocument = SourceInfo.createDocumentFromOffset(bOffset);
+
+ aReaderLeading = SourceInfo.isPositionAtOrBefore(bDocument,
+ aDocument,
+ binlogReaderA.context.gtidSourceFilter());
+ }
+
+ if (aReaderLeading) {
+ logger.info("old tables leading; reading only from new tables");
+ } else {
+ logger.info("new tables leading; reading only from old tables");
+ }
+ }
+
+ /*package private*/ BinlogReader getLeadingReader() {
+ checkLaggingLeadingInfo();
+ return aReaderLeading? binlogReaderA : binlogReaderB;
+ }
+
+ /*package private*/ BinlogReader getLaggingReader() {
+ checkLaggingLeadingInfo();
+ return aReaderLeading? binlogReaderB : binlogReaderA;
+ }
+
+ private void checkLaggingLeadingInfo() {
+ if (aReaderLeading == null) {
+ throw new IllegalStateException("Cannot return leading or lagging readers until this reader has started.");
+ }
+ }
+
+ // package private for testing purposes
+ /**
+ * A Predicate that returns false for any record beyond a given offset.
+ */
+ /*package private*/ static class OffsetLimitPredicate implements Predicate {
+
+ private Document leadingReaderFinalOffsetDocument;
+ private Predicate gtidFilter;
+
+ /*package private*/ OffsetLimitPredicate(Map leadingReaderFinalOffset,
+ Predicate gtidFilter) {
+ this.leadingReaderFinalOffsetDocument = SourceInfo.createDocumentFromOffset(leadingReaderFinalOffset);
+ this.gtidFilter = gtidFilter;
+
+ }
+
+ @Override
+ public boolean test(SourceRecord sourceRecord) {
+ Document offsetDocument = SourceInfo.createDocumentFromOffset(sourceRecord.sourceOffset());
+ // .isPositionAtOrBefore is true IFF leadingReaderFinalOffsetDocument <= offsetDocument
+ // we should stop (return false) IFF leadingReaderFinalOffsetDocument <= offsetDocument
+ return
+ ! SourceInfo.isPositionAtOrBefore(leadingReaderFinalOffsetDocument,
+ offsetDocument,
+ gtidFilter);
+ }
+ }
+}
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java
index 373910a7e..88a0d7968 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java
@@ -46,6 +46,7 @@ public class RecordMakers {
private final Schema schemaChangeKeySchema;
private final Schema schemaChangeValueSchema;
private final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(logger);
+ private Map restartOffset = null;
/**
* Create the record makers using the supplied components.
@@ -71,6 +72,24 @@ public RecordMakers(MySqlSchema schema, SourceInfo source, TopicSelector topicSelector,
+ boolean emitTombstoneOnDelete,
+ Map restartOffset) {
+ this(schema, source, topicSelector, emitTombstoneOnDelete);
+ this.restartOffset = restartOffset;
+ }
+
/**
* Obtain the record maker for the given table, using the specified columns and sending records to the given consumer.
*
@@ -160,6 +179,20 @@ public void regenerate() {
});
}
+ private Map getSourceRecordOffset(Map sourceOffset) {
+ if (restartOffset == null) {
+ return sourceOffset;
+ }
+ else {
+ Map offset = (Map) sourceOffset;
+ for(String key : restartOffset.keySet()){
+ StringBuilder sb = new StringBuilder(SourceInfo.RESTART_PREFIX);
+ offset.put(sb.append(key).toString(), restartOffset.get(key));
+ }
+ return offset;
+ }
+ }
+
/**
* Assign the given table number to the table with the specified {@link TableId table ID}.
*
@@ -196,7 +229,7 @@ public int read(SourceInfo source, Object[] row, int rowNumber, int numberOfRows
Map partition = source.partition();
Map offset = source.offsetForRow(rowNumber, numberOfRows);
Struct origin = source.struct(id);
- SourceRecord record = new SourceRecord(partition, offset, topicName, partitionNum,
+ SourceRecord record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum,
keySchema, key, envelope.schema(), envelope.read(value, origin, ts));
consumer.accept(record);
return 1;
@@ -215,7 +248,7 @@ public int insert(SourceInfo source, Object[] row, int rowNumber, int numberOfRo
Map partition = source.partition();
Map offset = source.offsetForRow(rowNumber, numberOfRows);
Struct origin = source.struct(id);
- SourceRecord record = new SourceRecord(partition, offset, topicName, partitionNum,
+ SourceRecord record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum,
keySchema, key, envelope.schema(), envelope.create(value, origin, ts));
consumer.accept(record);
return 1;
@@ -242,26 +275,26 @@ public int update(SourceInfo source, Object[] before, Object[] after, int rowNum
// The key has changed, so we need to deal with both the new key and old key.
// Consumers may push the events into a system that won't allow both records to exist at the same time,
// so we first want to send the delete event for the old key...
- SourceRecord record = new SourceRecord(partition, offset, topicName, partitionNum,
+ SourceRecord record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum,
keySchema, oldKey, envelope.schema(), envelope.delete(valueBefore, origin, ts));
consumer.accept(record);
++count;
if (emitTombstoneOnDelete) {
// Next send a tombstone event for the old key ...
- record = new SourceRecord(partition, offset, topicName, partitionNum, keySchema, oldKey, null, null);
+ record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum, keySchema, oldKey, null, null);
consumer.accept(record);
++count;
}
// And finally send the create event ...
- record = new SourceRecord(partition, offset, topicName, partitionNum,
+ record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum,
keySchema, key, envelope.schema(), envelope.create(valueAfter, origin, ts));
consumer.accept(record);
++count;
} else {
// The key has not changed, so a simple update is fine ...
- SourceRecord record = new SourceRecord(partition, offset, topicName, partitionNum,
+ SourceRecord record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum,
keySchema, key, envelope.schema(), envelope.update(valueBefore, valueAfter, origin, ts));
consumer.accept(record);
++count;
@@ -283,14 +316,14 @@ public int delete(SourceInfo source, Object[] row, int rowNumber, int numberOfRo
Map offset = source.offsetForRow(rowNumber, numberOfRows);
Struct origin = source.struct(id);
// Send a delete message ...
- SourceRecord record = new SourceRecord(partition, offset, topicName, partitionNum,
+ SourceRecord record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum,
keySchema, key, envelope.schema(), envelope.delete(value, origin, ts));
consumer.accept(record);
++count;
// And send a tombstone ...
if (emitTombstoneOnDelete) {
- record = new SourceRecord(partition, offset, topicName, partitionNum,
+ record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum,
keySchema, key, null, null);
consumer.accept(record);
++count;
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java
index d8896d394..da7a33d75 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java
@@ -74,7 +74,7 @@ public class SnapshotReader extends AbstractReader {
* @param context the task context in which this reader is running; may not be null
*/
public SnapshotReader(String name, MySqlTaskContext context) {
- super(name, context);
+ super(name, context, null);
this.includeData = context.snapshotMode().includeData();
this.snapshotLockingMode = context.getConnectorConfig().getSnapshotLockingMode();
recorder = this::recordRowAsRead;
@@ -321,7 +321,9 @@ protected void execute() {
if (!isRunning()) return;
logger.info("Step {}: read list of available tables in each database", step++);
List tableIds = new ArrayList<>();
- final Map> tableIdsByDbName = new HashMap<>();
+ //List allTableIds = new ArrayList<>();
+ final Filters createTableFilters = getCreateTableFilters(filters);
+ final Map> createTablesMap = new HashMap<>();
final Set readableDatabaseNames = new HashSet<>();
for (String dbName : databaseNames) {
try {
@@ -331,9 +333,11 @@ protected void execute() {
mysql.query(sql.get(), rs -> {
while (rs.next() && isRunning()) {
TableId id = new TableId(dbName, null, rs.getString(1));
+ if (createTableFilters.tableFilter().test(id)){
+ createTablesMap.computeIfAbsent(dbName, k -> new ArrayList<>()).add(id);
+ }
if (filters.tableFilter().test(id)) {
tableIds.add(id);
- tableIdsByDbName.computeIfAbsent(dbName, k -> new ArrayList<>()).add(id);
logger.info("\t including '{}'", id);
} else {
logger.info("\t '{}' is filtered out, discarding", id);
@@ -427,7 +431,7 @@ protected void execute() {
this::enqueueSchemaChanges));
// Now process all of our tables for each database ...
- for (Map.Entry> entry : tableIdsByDbName.entrySet()) {
+ for (Map.Entry> entry : createTablesMap.entrySet()) {
if (!isRunning()) break;
String dbName = entry.getKey();
// First drop, create, and then use the named database ...
@@ -601,7 +605,7 @@ protected void execute() {
// We've copied all of the tables and we've not yet been stopped, but our buffer holds onto the
// very last record. First mark the snapshot as complete and then apply the updated offset to
// the buffered record ...
- source.markLastSnapshot();
+ source.markLastSnapshot(context.config());
long stop = clock.currentTimeInMillis();
try {
bufferedRecordQueue.close(this::replaceOffset);
@@ -615,7 +619,6 @@ protected void execute() {
interrupted.set(true);
}
} else {
- // source.markLastSnapshot(); Think we will not be needing this here it is used to mark last row entry?
logger.info("Step {}: encountered only schema based snapshot, skipping data snapshot", step);
}
step++;
@@ -756,6 +759,23 @@ protected void readBinlogPosition(int step, SourceInfo source, JdbcConnection my
}
}
+ /**
+ * Get the filters for table creation. Depending on the configuration, this may not be the default filter set.
+ *
+ * @param filters the default filters of this {@link SnapshotReader}
+ * @return {@link Filters} that represent all the tables that this snapshot reader should CREATE
+ */
+ private Filters getCreateTableFilters(Filters filters) {
+ MySqlConnectorConfig.SnapshotNewTables snapshotNewTables = context.snapshotNewTables();
+ if (snapshotNewTables == MySqlConnectorConfig.SnapshotNewTables.PARALLEL) {
+ // if we are snapshotting new tables in parallel, we need to make sure all the tables in the configuration
+ // are created.
+ return new Filters.Builder(context.config()).build();
+ } else {
+ return filters;
+ }
+ }
+
protected String quote(String dbOrTableName) {
return "`" + dbOrTableName + "`";
}
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SourceInfo.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SourceInfo.java
index 703f105dd..3c468c384 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SourceInfo.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SourceInfo.java
@@ -9,6 +9,7 @@
import java.util.Map;
import java.util.function.Predicate;
+import io.debezium.config.Configuration;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
@@ -111,6 +112,11 @@ final class SourceInfo extends AbstractSourceInfo {
public static final String DB_NAME_KEY = "db";
public static final String TABLE_NAME_KEY = "table";
public static final String QUERY_KEY = "query";
+ public static final String DATABASE_WHITELIST_KEY = "database_whitelist";
+ public static final String DATABASE_BLACKLIST_KEY = "database_blacklist";
+ public static final String TABLE_WHITELIST_KEY = "table_whitelist";
+ public static final String TABLE_BLACKLIST_KEY = "table_blacklist";
+ public static final String RESTART_PREFIX = "RESTART_";
/**
* A {@link Schema} definition for a {@link Struct} used to store the {@link #partition()} and {@link #offset()} information.
@@ -151,6 +157,10 @@ final class SourceInfo extends AbstractSourceInfo {
private boolean lastSnapshot = true;
private boolean nextSnapshot = false;
private String currentQuery = null;
+ private String databaseWhitelist;
+ private String databaseBlacklist;
+ private String tableWhitelist;
+ private String tableBlacklist;
public SourceInfo() {
super(Module.version());
@@ -286,6 +296,12 @@ public void setEventPosition(long positionOfCurrentEvent, long eventSizeInBytes)
if (isSnapshotInEffect()) {
map.put(SNAPSHOT_KEY, true);
}
+ if(hasFilterInfo()) {
+ map.put(DATABASE_WHITELIST_KEY, databaseWhitelist);
+ map.put(DATABASE_BLACKLIST_KEY, databaseBlacklist);
+ map.put(TABLE_WHITELIST_KEY, tableWhitelist);
+ map.put(TABLE_BLACKLIST_KEY, tableBlacklist);
+ }
return map;
}
@@ -343,6 +359,7 @@ public Struct struct(TableId tableId) {
result.put(BINLOG_ROW_IN_EVENT_OFFSET_KEY, currentRowNumber);
result.put(TIMESTAMP_KEY, binlogTimestampSeconds);
if (lastSnapshot) {
+ // if the snapshot is COMPLETED, then this will not happen.
result.put(SNAPSHOT_KEY, true);
}
if (threadId >= 0) {
@@ -478,9 +495,10 @@ public void startSnapshot() {
/**
* Denote that a snapshot will be complete after one last record.
*/
- public void markLastSnapshot() {
+ public void markLastSnapshot(Configuration config) {
this.lastSnapshot = true;
this.nextSnapshot = false;
+ maybeSetFilterDataFromConfig(config);
}
/**
@@ -491,6 +509,58 @@ public void completeSnapshot() {
this.nextSnapshot = false;
}
+ /**
+ * Set the filter data for the offset from the given {@link Configuration}
+ * @param config the configuration
+ */
+ public void setFilterDataFromConfig(Configuration config) {
+ this.databaseWhitelist = config.getString(MySqlConnectorConfig.DATABASE_WHITELIST);
+ this.databaseBlacklist = config.getString(MySqlConnectorConfig.DATABASE_BLACKLIST);
+ this.tableWhitelist = config.getString(MySqlConnectorConfig.TABLE_WHITELIST);
+ this.tableBlacklist = config.getString(MySqlConnectorConfig.TABLE_BLACKLIST);
+ }
+
+ /**
+ * Set filter data from config if and only if parallel snapshotting of new tables is turned on
+ * @param config the configuration.
+ */
+ public void maybeSetFilterDataFromConfig(Configuration config) {
+ if (config.getString(MySqlConnectorConfig.SNAPSHOT_NEW_TABLES).equals(
+ MySqlConnectorConfig.SnapshotNewTables.PARALLEL.getValue())) {
+ setFilterDataFromConfig(config);
+ }
+ }
+
+ /**
+ * @return true if this offset has filter info, false otherwise.
+ */
+ public boolean hasFilterInfo() {
+ /*
+ * There are 2 possible cases for us not having filter info.
+ * 1. The connector does not use a filter. Creating a filter in such a connector could never add any tables.
+ * 2. The initial snapshot occurred in a version of Debezium that did not store the filter information in the
+ * offsets / the connector was not configured to store filter information.
+ */
+ return databaseWhitelist != null || databaseBlacklist != null ||
+ tableWhitelist != null || tableBlacklist != null;
+ }
+
+ public String getDatabaseWhitelist() {
+ return databaseWhitelist;
+ }
+
+ public String getDatabaseBlacklist() {
+ return databaseBlacklist;
+ }
+
+ public String getTableWhitelist() {
+ return tableWhitelist;
+ }
+
+ public String getTableBlacklist() {
+ return tableBlacklist;
+ }
+
/**
* Set the source offset, as read from Kafka Connect. This method does nothing if the supplied map is null.
*
@@ -511,9 +581,21 @@ public void setOffset(Map sourceOffset) {
this.restartRowsToSkip = (int) longOffsetValue(sourceOffset, BINLOG_ROW_IN_EVENT_OFFSET_KEY);
nextSnapshot = booleanOffsetValue(sourceOffset, SNAPSHOT_KEY);
lastSnapshot = nextSnapshot;
+ this.databaseWhitelist = (String) sourceOffset.get(DATABASE_WHITELIST_KEY);
+ this.databaseBlacklist = (String) sourceOffset.get(DATABASE_BLACKLIST_KEY);
+ this.tableWhitelist = (String) sourceOffset.get(TABLE_WHITELIST_KEY);
+ this.tableBlacklist = (String) sourceOffset.get(TABLE_BLACKLIST_KEY);
}
}
+ public static boolean offsetsHaveFilterInfo(Map sourceOffset) {
+ return sourceOffset != null &&
+ sourceOffset.containsKey(DATABASE_BLACKLIST_KEY) ||
+ sourceOffset.containsKey(DATABASE_WHITELIST_KEY) ||
+ sourceOffset.containsKey(TABLE_BLACKLIST_KEY) ||
+ sourceOffset.containsKey(TABLE_WHITELIST_KEY);
+ }
+
private long longOffsetValue(Map values, String key) {
Object obj = values.get(key);
if (obj == null) return 0L;
@@ -615,6 +697,24 @@ public String toString() {
}
return sb.toString();
}
+
+ /**
+ * Create a {@link Document} from the given offset.
+ *
+ * @param offset the offset to create the document from.
+ * @return a {@link Document} with the offset data.
+ */
+ public static Document createDocumentFromOffset(Map offset) {
+ Document offsetDocument = Document.create();
+ // all of the offset keys represent int, long, or string types, so we don't need to worry about references
+ // and information changing underneath us.
+
+ for (Map.Entry entry : offset.entrySet()) {
+ offsetDocument.set(entry.getKey(), entry.getValue());
+ }
+
+ return offsetDocument;
+ }
/**
* Determine whether the first {@link #offset() offset} is at or before the point in time of the second
diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/BinlogReaderIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/BinlogReaderIT.java
index bcb58d616..8fb82d314 100644
--- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/BinlogReaderIT.java
+++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/BinlogReaderIT.java
@@ -116,7 +116,8 @@ protected Configuration.Builder simpleConfig() {
@Test
public void shouldCreateSnapshotOfSingleDatabase() throws Exception {
config = simpleConfig().build();
- context = new MySqlTaskContext(config);
+ Filters filters = new Filters.Builder(config).build();
+ context = new MySqlTaskContext(config, filters);
context.start();
context.source().setBinlogStartPoint("",0L); // start from beginning
context.initializeHistory();
@@ -177,7 +178,8 @@ public void shouldCreateSnapshotOfSingleDatabase() throws Exception {
@Test
public void shouldCreateSnapshotOfSingleDatabaseWithSchemaChanges() throws Exception {
config = simpleConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).build();
- context = new MySqlTaskContext(config);
+ Filters filters = new Filters.Builder(config).build();
+ context = new MySqlTaskContext(config, filters);
context.start();
context.source().setBinlogStartPoint("",0L); // start from beginning
context.initializeHistory();
@@ -248,7 +250,8 @@ public void shouldHandleTimestampTimezones() throws Exception {
.with(MySqlConnectorConfig.DATABASE_WHITELIST, REGRESSION_DATABASE.getDatabaseName())
.with(MySqlConnectorConfig.TABLE_WHITELIST, REGRESSION_DATABASE.qualifiedTableName(tableName))
.build();
- context = new MySqlTaskContext(config);
+ Filters filters = new Filters.Builder(config).build();
+ context = new MySqlTaskContext(config, filters);
context.start();
context.source().setBinlogStartPoint("",0L); // start from beginning
context.initializeHistory();
@@ -291,11 +294,12 @@ public void shouldHandleMySQLTimeCorrectly() throws Exception {
.with(MySqlConnectorConfig.DATABASE_WHITELIST, REGRESSION_DATABASE.getDatabaseName())
.with(MySqlConnectorConfig.TABLE_WHITELIST, REGRESSION_DATABASE.qualifiedTableName(tableName))
.build();
- context = new MySqlTaskContext(config);
+ Filters filters = new Filters.Builder(config).build();
+ context = new MySqlTaskContext(config, filters);
context.start();
context.source().setBinlogStartPoint("",0L); // start from beginning
context.initializeHistory();
- reader = new BinlogReader("binlog", context);
+ reader = new BinlogReader("binlog", context, null);
// Start reading the binlog ...
reader.start();
@@ -391,11 +395,12 @@ private void inconsistentSchema(EventProcessingFailureHandlingMode mode) throws
.build();
}
- context = new MySqlTaskContext(config);
+ Filters filters = new Filters.Builder(config).build();
+ context = new MySqlTaskContext(config, filters);
context.start();
context.source().setBinlogStartPoint("",0L); // start from beginning
context.initializeHistory();
- reader = new BinlogReader("binlog", context);
+ reader = new BinlogReader("binlog", context, null);
// Start reading the binlog ...
reader.start();
diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/Configurator.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/Configurator.java
index 41d9a8814..abb3d614e 100644
--- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/Configurator.java
+++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/Configurator.java
@@ -16,78 +16,85 @@
*
* @author Randall Hauch
*/
-public class Configurator {
+/*package local*/ class Configurator {
private final Configuration.Builder configBuilder = Configuration.create();
- public Configurator with(Field field, String value) {
+ /*package local*/ Configurator with(Field field, String value) {
configBuilder.with(field, value);
return this;
}
- public Configurator with(Field field, boolean value) {
+ /*package local*/ Configurator with(Field field, boolean value) {
configBuilder.with(field, value);
return this;
}
- public Configurator serverName(String serverName) {
+ /*package local*/ Configurator serverName(String serverName) {
return with(MySqlConnectorConfig.SERVER_NAME, serverName);
}
- public Configurator includeDatabases(String regexList) {
+ /*package local*/ Configurator includeDatabases(String regexList) {
return with(MySqlConnectorConfig.DATABASE_WHITELIST, regexList);
}
- public Configurator excludeDatabases(String regexList) {
+ /*package local*/ Configurator excludeDatabases(String regexList) {
return with(MySqlConnectorConfig.DATABASE_BLACKLIST, regexList);
}
- public Configurator includeTables(String regexList) {
+ /*package local*/ Configurator includeTables(String regexList) {
return with(MySqlConnectorConfig.TABLE_WHITELIST, regexList);
}
- public Configurator excludeTables(String regexList) {
+ /*package local*/ Configurator excludeTables(String regexList) {
return with(MySqlConnectorConfig.TABLE_BLACKLIST, regexList);
}
- public Configurator excludeColumns(String regexList) {
+ /*package local*/ Configurator excludeColumns(String regexList) {
return with(MySqlConnectorConfig.COLUMN_BLACKLIST, regexList);
}
- public Configurator truncateColumns(int length, String fullyQualifiedTableNames) {
+ /*package local*/ Configurator truncateColumns(int length, String fullyQualifiedTableNames) {
return with(MySqlConnectorConfig.TRUNCATE_COLUMN(length), fullyQualifiedTableNames);
}
- public Configurator maskColumns(int length, String fullyQualifiedTableNames) {
+ /*package local*/ Configurator maskColumns(int length, String fullyQualifiedTableNames) {
return with(MySqlConnectorConfig.MASK_COLUMN(length), fullyQualifiedTableNames);
}
- public Configurator excludeBuiltInTables() {
+ /*package local*/ Configurator excludeBuiltInTables() {
return with(MySqlConnectorConfig.TABLES_IGNORE_BUILTIN, true);
}
- public Configurator includeBuiltInTables() {
+ /*package local*/ Configurator includeBuiltInTables() {
return with(MySqlConnectorConfig.TABLES_IGNORE_BUILTIN, false);
}
- public Configurator storeDatabaseHistoryInFile(Path path) {
+ /*package local*/ Configurator storeDatabaseHistoryInFile(Path path) {
with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class.getName());
with(FileDatabaseHistory.FILE_PATH,path.toAbsolutePath().toString());
return this;
}
- public Filters createFilters() {
- return new Filters(configBuilder.build());
+ /*package local*/ Filters createFilters() {
+ return new Filters.Builder(configBuilder.build()).build();
}
/**
* For tests use only
*/
- public MySqlSchema createSchemas() {
+ /*package local*/ MySqlSchema createSchemas() {
+ return createSchemasWithFilter(createFilters());
+ }
+
+ /*package local*/ MySqlSchema createSchemasWithFilter(Filters filters) {
Configuration config = configBuilder.build();
MySqlConnectorConfig connectorConfig = new MySqlConnectorConfig(config);
- return new MySqlSchema(connectorConfig, null, false,
- MySqlTopicSelector.defaultSelector(connectorConfig.getLogicalName(), "__debezium-heartbeat"));
+ return new MySqlSchema(connectorConfig,
+ null,
+ false,
+ MySqlTopicSelector.defaultSelector(connectorConfig.getLogicalName(), "__debezium-heartbeat"),
+ filters);
}
}
diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java
index 6c35a1181..b64f9ebce 100644
--- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java
+++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java
@@ -128,6 +128,7 @@ public void shouldFailToValidateInvalidConfiguration() {
assertNoConfigurationErrors(result, MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SNAPSHOT_MODE);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SNAPSHOT_LOCKING_MODE);
+ assertNoConfigurationErrors(result, MySqlConnectorConfig.SNAPSHOT_NEW_TABLES);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SSL_MODE);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SSL_KEYSTORE);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SSL_KEYSTORE_PASSWORD);
@@ -182,6 +183,7 @@ public void shouldValidateValidConfigurationWithSSL() {
assertNoConfigurationErrors(result, MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SNAPSHOT_MODE);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SNAPSHOT_LOCKING_MODE);
+ assertNoConfigurationErrors(result, MySqlConnectorConfig.SNAPSHOT_NEW_TABLES);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SSL_MODE);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SSL_KEYSTORE);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SSL_KEYSTORE_PASSWORD);
@@ -232,6 +234,7 @@ public void shouldValidateAcceptableConfiguration() {
assertNoConfigurationErrors(result, MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SNAPSHOT_MODE);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SNAPSHOT_LOCKING_MODE);
+ assertNoConfigurationErrors(result, MySqlConnectorConfig.SNAPSHOT_NEW_TABLES);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SSL_MODE);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SSL_KEYSTORE);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SSL_KEYSTORE_PASSWORD);
diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextIT.java
index 61b60b010..552a29c75 100644
--- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextIT.java
+++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextIT.java
@@ -21,7 +21,7 @@ public class MySqlTaskContextIT extends MySqlTaskContextTest {
@Test
public void shouldCreateTaskFromConfiguration() throws Exception {
config = simpleConfig().build();
- context = new MySqlTaskContext(config);
+ context = new MySqlTaskContext(config, new Filters.Builder(config).build());
context.start();
assertThat(context.config()).isSameAs(config);
@@ -58,7 +58,7 @@ public void shouldCreateTaskFromConfiguration() throws Exception {
@Test
public void shouldCloseJdbcConnectionOnShutdown() throws Exception {
config = simpleConfig().build();
- context = new MySqlTaskContext(config);
+ context = new MySqlTaskContext(config, new Filters.Builder(config).build());
context.start();
// JDBC connection is automatically created by MySqlTaskContext when it reads database variables
diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextTest.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextTest.java
index d98f810a6..2c681eb1e 100644
--- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextTest.java
+++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextTest.java
@@ -94,7 +94,7 @@ protected Configuration.Builder simpleConfig() {
public void shouldCreateTaskFromConfigurationWithNeverSnapshotMode() throws Exception {
config = simpleConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER)
.build();
- context = new MySqlTaskContext(config, false);
+ context = new MySqlTaskContext(config, new Filters.Builder(config).build(), false, null);
context.start();
assertThat("" + context.snapshotMode().getValue()).isEqualTo(SnapshotMode.NEVER.getValue());
@@ -106,7 +106,7 @@ public void shouldCreateTaskFromConfigurationWithNeverSnapshotMode() throws Exce
public void shouldCreateTaskFromConfigurationWithWhenNeededSnapshotMode() throws Exception {
config = simpleConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.WHEN_NEEDED)
.build();
- context = new MySqlTaskContext(config, false);
+ context = new MySqlTaskContext(config, new Filters.Builder(config).build(), false, null);
context.start();
assertThat("" + context.snapshotMode().getValue()).isEqualTo(SnapshotMode.WHEN_NEEDED.getValue());
@@ -118,7 +118,7 @@ public void shouldCreateTaskFromConfigurationWithWhenNeededSnapshotMode() throws
public void shouldUseGtidSetIncludes() throws Exception {
config = simpleConfig().with(MySqlConnectorConfig.GTID_SOURCE_INCLUDES, "a,b,c,d.*")
.build();
- context = new MySqlTaskContext(config, false);
+ context = new MySqlTaskContext(config, new Filters.Builder(config).build(), false, null);
context.start();
Predicate filter = context.gtidSourceFilter();
@@ -145,7 +145,7 @@ public void shouldUseGtidSetIncludesLiteralUuids() throws Exception {
config = simpleConfig().with(MySqlConnectorConfig.GTID_SOURCE_INCLUDES,
"036d85a9-64e5-11e6-9b48-42010af0000c,7145bf69-d1ca-11e5-a588-0242ac110004")
.build();
- context = new MySqlTaskContext(config, false);
+ context = new MySqlTaskContext(config, new Filters.Builder(config).build(), false, null);
context.start();
Predicate filter = context.gtidSourceFilter();
@@ -175,7 +175,7 @@ public void shouldUseGtidSetxcludesLiteralUuids() throws Exception {
config = simpleConfig().with(MySqlConnectorConfig.GTID_SOURCE_EXCLUDES,
"7c1de3f2-3fd2-11e6-9cdc-42010af000bc")
.build();
- context = new MySqlTaskContext(config, false);
+ context = new MySqlTaskContext(config, new Filters.Builder(config).build(), false, null);
context.start();
Predicate filter = context.gtidSourceFilter();
@@ -204,7 +204,7 @@ public void shouldNotAllowBothGtidSetIncludesAndExcludes() throws Exception {
.with(MySqlConnectorConfig.GTID_SOURCE_EXCLUDES,
"7c1de3f2-3fd2-11e6-9cdc-42010af000bc:1-41")
.build();
- context = new MySqlTaskContext(config, false);
+ context = new MySqlTaskContext(config, new Filters.Builder(config).build(), false, null);
boolean valid = config.validateAndRecord(MySqlConnectorConfig.ALL_FIELDS, msg -> {});
assertThat(valid).isFalse();
}
@@ -221,7 +221,7 @@ public void shouldFilterAndMergeGtidSet() throws Exception {
config = simpleConfig().with(MySqlConnectorConfig.GTID_SOURCE_INCLUDES,
"036d85a9-64e5-11e6-9b48-42010af0000c")
.build();
- context = new MySqlTaskContext(config, false);
+ context = new MySqlTaskContext(config, new Filters.Builder(config).build(), false, null);
context.start();
context.source().setCompletedGtidSet(gtidStr);
@@ -255,7 +255,7 @@ public void shouldMergeToFirstAvailableGtidSetPositions() throws Exception {
.with(MySqlConnectorConfig.GTID_NEW_CHANNEL_POSITION, GtidNewChannelPosition.EARLIEST)
.build();
- context = new MySqlTaskContext(config, false);
+ context = new MySqlTaskContext(config, new Filters.Builder(config).build(), false, null);
context.start();
context.source().setCompletedGtidSet(gtidStr);
@@ -283,7 +283,7 @@ public void shouldComparePositionsWithDifferentFields() {
+ "d079cbb3-750f-11e6-954e-42010af00c28:1-11544291:11544293-11885648";
config = simpleConfig().with(MySqlConnectorConfig.GTID_SOURCE_EXCLUDES, "96c2072e-e428-11e6-9590-42010a28002d")
.build();
- context = new MySqlTaskContext(config, false);
+ context = new MySqlTaskContext(config, new Filters.Builder(config).build(), false, null);
context.start();
context.source().setCompletedGtidSet(lastGtidStr);
HistoryRecordComparator comparator = context.dbSchema().historyComparator();
@@ -310,7 +310,7 @@ public void shouldComparePositionsWithDifferentFields() {
public void shouldIgnoreDatabaseHistoryProperties() throws Exception {
config = simpleConfig().with(KafkaDatabaseHistory.TOPIC, "dummytopic")
.build();
- context = new MySqlTaskContext(config, false);
+ context = new MySqlTaskContext(config, new Filters.Builder(config).build(), false, null);
context.start();
context.getConnectionContext().jdbc().config().forEach((k, v) -> {
diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/ParallelSnapshotReaderTest.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/ParallelSnapshotReaderTest.java
new file mode 100644
index 000000000..448c39d6f
--- /dev/null
+++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/ParallelSnapshotReaderTest.java
@@ -0,0 +1,240 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.connector.mysql;
+
+import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static io.debezium.connector.mysql.ParallelSnapshotReader.ParallelHaltingPredicate;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * @author Moira Tagle
+ */
+public class ParallelSnapshotReaderTest {
+
+ @Test
+ public void startStartsBothReaders() {
+ BinlogReader mockOldBinlogReader = mock(BinlogReader.class);
+ SnapshotReader mockNewSnapshotReader = mock(SnapshotReader.class);
+ BinlogReader mockNewBinlogReader = mock(BinlogReader.class);
+
+ ParallelSnapshotReader parallelSnapshotReader = new ParallelSnapshotReader(mockOldBinlogReader, mockNewSnapshotReader, mockNewBinlogReader);
+
+ parallelSnapshotReader.start();
+
+ Assert.assertSame(parallelSnapshotReader.state(), Reader.State.RUNNING);
+
+ verify(mockOldBinlogReader).start();
+ verify(mockNewSnapshotReader).start();
+ // chained reader will only start the snapshot reader
+ }
+
+ @Test
+ public void pollCombinesBothReadersPolls() throws InterruptedException {
+ BinlogReader mockOldBinlogReader = mock(BinlogReader.class);
+ SnapshotReader mockNewSnapshotReader = mock(SnapshotReader.class);
+ BinlogReader mockNewBinlogReader = mock(BinlogReader.class);
+
+ ParallelSnapshotReader parallelSnapshotReader = new ParallelSnapshotReader(mockOldBinlogReader, mockNewSnapshotReader, mockNewBinlogReader);
+
+ SourceRecord oldBinlogSourceRecord = mock(SourceRecord.class);
+ List oldBinlogRecords = new ArrayList<>();
+ oldBinlogRecords.add(oldBinlogSourceRecord);
+
+ SourceRecord newSnapshotSourceRecord = mock(SourceRecord.class);
+ List newSnapshotRecords = new ArrayList<>();
+ newSnapshotRecords.add(newSnapshotSourceRecord);
+
+ when(mockOldBinlogReader.isRunning()).thenReturn(true);
+ when(mockOldBinlogReader.poll()).thenReturn(oldBinlogRecords);
+ when(mockNewSnapshotReader.poll()).thenReturn(newSnapshotRecords);
+
+ // this needs to happen so that the chained reader can be polled.
+ parallelSnapshotReader.start();
+
+ List parallelRecords = parallelSnapshotReader.poll();
+
+ Assert.assertEquals(2, parallelRecords.size());
+ Assert.assertTrue(parallelRecords.contains(oldBinlogSourceRecord));
+ Assert.assertTrue(parallelRecords.contains(newSnapshotSourceRecord));
+ }
+
+ @Test
+ public void pollReturnsNewIfOldReaderIsStopped() throws InterruptedException {
+ BinlogReader mockOldBinlogReader = mock(BinlogReader.class);
+ SnapshotReader mockNewSnapshotReader = mock(SnapshotReader.class);
+ BinlogReader mockNewBinlogReader = mock(BinlogReader.class);
+
+ ParallelSnapshotReader parallelSnapshotReader = new ParallelSnapshotReader(mockOldBinlogReader, mockNewSnapshotReader, mockNewBinlogReader);
+
+ SourceRecord newSnapshotSourceRecord = mock(SourceRecord.class);
+ List newSnapshotRecords = new ArrayList<>();
+ newSnapshotRecords.add(newSnapshotSourceRecord);
+
+ // if the old reader is polled when it's stopped it will throw an exception.
+ when(mockOldBinlogReader.isRunning()).thenReturn(false);
+ when(mockOldBinlogReader.poll()).thenThrow(new InterruptedException());
+
+ when(mockNewSnapshotReader.poll()).thenReturn(newSnapshotRecords);
+
+ // this needs to happen so that the chained reader runs correctly.
+ parallelSnapshotReader.start();
+
+ List parallelRecords = parallelSnapshotReader.poll();
+
+ Assert.assertEquals(1, parallelRecords.size());
+ Assert.assertTrue(parallelRecords.contains(newSnapshotSourceRecord));
+ }
+
+ // this test and the next don't appear to be halting. Something with the chained reader maybe.
+ @Test
+ public void pollReturnsOldIfNewReaderIsStopped() throws InterruptedException {
+ BinlogReader mockOldBinlogReader = mock(BinlogReader.class);
+ SnapshotReader mockNewSnapshotReader = mock(SnapshotReader.class);
+ BinlogReader mockNewBinlogReader = mock(BinlogReader.class);
+
+ ParallelSnapshotReader parallelSnapshotReader = new ParallelSnapshotReader(mockOldBinlogReader, mockNewSnapshotReader, mockNewBinlogReader);
+
+ SourceRecord oldBinlogSourceRecord = mock(SourceRecord.class);
+ List oldBinlogRecords = new ArrayList<>();
+ oldBinlogRecords.add(oldBinlogSourceRecord);
+
+ when(mockOldBinlogReader.isRunning()).thenReturn(true);
+ when(mockOldBinlogReader.poll()).thenReturn(oldBinlogRecords);
+
+ // cheap way to have the new reader be stopped is to just not start it; so don't start the parallel reader
+
+ List parallelRecords = parallelSnapshotReader.poll();
+
+ Assert.assertEquals(1, parallelRecords.size());
+ Assert.assertTrue(parallelRecords.contains(oldBinlogSourceRecord));
+ }
+
+ @Test
+ public void pollReturnsNullIfBothReadersAreStopped() throws InterruptedException {
+ BinlogReader mockOldBinlogReader = mock(BinlogReader.class);
+ SnapshotReader mockNewSnapshotReader = mock(SnapshotReader.class);
+ BinlogReader mockNewBinlogReader = mock(BinlogReader.class);
+
+ ParallelSnapshotReader parallelSnapshotReader = new ParallelSnapshotReader(mockOldBinlogReader, mockNewSnapshotReader, mockNewBinlogReader);
+
+ when(mockOldBinlogReader.isRunning()).thenReturn(false);
+ when(mockOldBinlogReader.poll()).thenThrow(new InterruptedException());
+
+ when(mockNewBinlogReader.poll()).thenReturn(null);
+
+ // cheap way to have the new reader be stopped is to just not start it; so don't start the parallel reader
+
+ List parallelRecords = parallelSnapshotReader.poll();
+
+ Assert.assertEquals(null, parallelRecords);
+ }
+
+ @Test
+ public void testStopStopsBothReaders() {
+ BinlogReader mockOldBinlogReader = mock(BinlogReader.class);
+ SnapshotReader mockNewSnapshotReader = mock(SnapshotReader.class);
+ BinlogReader mockNewBinlogReader = mock(BinlogReader.class);
+
+ ParallelSnapshotReader parallelSnapshotReader = new ParallelSnapshotReader(mockOldBinlogReader, mockNewSnapshotReader, mockNewBinlogReader);
+
+ parallelSnapshotReader.start();
+ parallelSnapshotReader.stop();
+
+ Assert.assertTrue(parallelSnapshotReader.state() == Reader.State.STOPPED);
+
+ verify(mockOldBinlogReader).stop();
+ verify(mockNewSnapshotReader).stop();
+ }
+
+
+ @Test
+ public void testHaltingPredicateHonorsTimeRange() {
+ // verify that halting predicate does nothing and changes no state if the
+ // document's timestamp is outside of the time range.
+
+ AtomicBoolean thisReaderNearEnd = new AtomicBoolean(false);
+ AtomicBoolean otherReaderNearEnd = new AtomicBoolean(false);
+
+ long durationSec = 5 * 60; // five minutes
+ Duration duration = Duration.ofSeconds(durationSec);
+
+ ParallelHaltingPredicate parallelHaltingPredicate = new ParallelHaltingPredicate(thisReaderNearEnd, otherReaderNearEnd, duration);
+
+ boolean testResult = parallelHaltingPredicate.test(createSourceRecordWithTimestamp(System.currentTimeMillis()/1000 - (durationSec * 2)));
+
+ Assert.assertTrue(testResult);
+
+ Assert.assertFalse(thisReaderNearEnd.get());
+ Assert.assertFalse(otherReaderNearEnd.get());
+ }
+
+ @Test
+ public void testHaltingPredicateFlipsthisReaderNearEnd() {
+ // verify that the halting predicate flips the `this reader` boolean if the
+ // document's timestamp is within the time range, but still returns false.
+
+
+ AtomicBoolean thisReaderNearEnd = new AtomicBoolean(false);
+ AtomicBoolean otherReaderNearEnd = new AtomicBoolean(false);
+
+ Duration duration = Duration.ofSeconds(5 * 60); // five minutes
+
+ ParallelHaltingPredicate parallelHaltingPredicate = new ParallelHaltingPredicate(thisReaderNearEnd, otherReaderNearEnd, duration);
+
+ boolean testResult = parallelHaltingPredicate.test(createSourceRecordWithTimestamp(System.currentTimeMillis()/1000));
+
+ Assert.assertTrue(testResult);
+
+ Assert.assertTrue(thisReaderNearEnd.get());
+ Assert.assertFalse(otherReaderNearEnd.get());
+ }
+
+ @Test
+ public void testHaltingPredicateHalts() {
+ // verify that the halting predicate returns false if both the 'this' and
+ // 'other' reader are near the end of the binlog.
+
+ AtomicBoolean thisReaderNearEnd = new AtomicBoolean(false);
+ AtomicBoolean otherReaderNearEnd = new AtomicBoolean(true);
+
+ Duration duration = Duration.ofSeconds(5 * 60); // five minutes
+
+ ParallelHaltingPredicate parallelHaltingPredicate =
+ new ParallelHaltingPredicate(thisReaderNearEnd, otherReaderNearEnd, duration);
+
+ boolean testResult =
+ parallelHaltingPredicate.test(createSourceRecordWithTimestamp(System.currentTimeMillis()/1000));
+
+ Assert.assertFalse(testResult);
+
+ Assert.assertTrue(thisReaderNearEnd.get());
+ Assert.assertTrue(otherReaderNearEnd.get());
+ }
+
+ /**
+ * Create an "offset" containing a single timestamp element with the given value.
+ * Needed because {@link ParallelSnapshotReader.ParallelHaltingPredicate} halts based on how
+ * close the record's timestamp is to the present time.
+ * @param tsSec the timestamp (in seconds) in the resulting offset.
+ * @return an "offset" containing the given timestamp.
+ */
+ private SourceRecord createSourceRecordWithTimestamp(long tsSec) {
+ Map offset = Collections.singletonMap(SourceInfo.TIMESTAMP_KEY, tsSec);
+ return new SourceRecord(null, offset, null, null, null);
+ }
+}
diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/ReconcilingBinlogReaderTest.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/ReconcilingBinlogReaderTest.java
new file mode 100644
index 000000000..ffb334390
--- /dev/null
+++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/ReconcilingBinlogReaderTest.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.debezium.connector.mysql;
+
+import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author Moira Tagle
+ */
+public class ReconcilingBinlogReaderTest {
+
+ @Test
+ public void haltAfterPredicateTrue() {
+ List
+
+ org.mockito
+ mockito-core
+ ${version.mockito}
+