DBZ-175 Initial implementation for support to whitelist/blacklist changes;

* The filter config is stored in offsets now, allowing to detect changes
to the config after a connector restart
* In that case, roughly the following steps are done:
- set up a binlog reader for the "old" tables
- set up a snapshot reader for the "new" tables
- if the snapshot is done, set up a binlog reader for the new tables
- if both binlog readers are in proximity of the logs head, stop them
and set up a single binlog reader for all tables
* That behavior is disabled by default for the purposes of testing and
can be enabled via new connector option "snapshot.new.tables"
* To facilitate connector restarts while whitelist/blacklist changes are
processed, separate restart offsets are added to the offsets
This commit is contained in:
Moira Tagle 2017-10-06 14:32:01 -07:00 committed by Gunnar Morling
parent 276e053f65
commit a6d791cd8c
24 changed files with 1587 additions and 136 deletions

View File

@ -77,6 +77,11 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easytesting</groupId>
<artifactId>fest-assert</artifactId>

View File

@ -13,6 +13,7 @@
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
@ -49,13 +50,19 @@ public abstract class AbstractReader implements Reader {
private final AtomicReference<Runnable> uponCompletion = new AtomicReference<>();
private final Duration pollInterval;
private final Predicate<SourceRecord> acceptAndContinue;
/**
* Create a snapshot reader.
*
* @param name the name of the reader
* @param context the task context in which this reader is running; may not be null
* @param acceptAndContinue a predicate that returns true if the tested {@link SourceRecord} should be accepted and
* false if the record and all subsequent records should be ignored. The reader will stop
* accepting records once {@link #enqueueRecord(SourceRecord)} is called with a record
* that tests as false. Can be null. If null, all records will be accepted.
*/
public AbstractReader(String name, MySqlTaskContext context) {
public AbstractReader(String name, MySqlTaskContext context, Predicate<SourceRecord> acceptAndContinue) {
this.name = name;
this.context = context;
this.connectionContext = context.getConnectionContext();
@ -63,6 +70,7 @@ public AbstractReader(String name, MySqlTaskContext context) {
this.maxBatchSize = context.getConnectorConfig().getMaxBatchSize();
this.pollInterval = context.getConnectorConfig().getPollInterval();
this.metronome = Metronome.parker(pollInterval, Clock.SYSTEM);
this.acceptAndContinue = acceptAndContinue == null? new AcceptAllPredicate() : acceptAndContinue;
}
@Override
@ -300,14 +308,34 @@ protected void pollComplete(List<SourceRecord> batch) {
* queue is full.
*
* @param record the record to be enqueued
* @return true if the record was successfully enqueued, false if not.
* @throws InterruptedException if interrupted while waiting for the queue to have room for this record
*/
protected void enqueueRecord(SourceRecord record) throws InterruptedException {
protected boolean enqueueRecord(SourceRecord record) throws InterruptedException {
if (record != null) {
if (acceptAndContinue.test(record)) {
if (logger.isTraceEnabled()) {
logger.trace("Enqueuing source record: {}", record);
}
this.records.put(record);
return true;
} else {
// if we found a record we should not accept, we are done.
logger.info("predicate returned false; completing reader {}", this.name);
completeSuccessfully();
}
}
return false;
}
/**
* A predicate that returns true for all sourceRecords
*/
public static class AcceptAllPredicate implements Predicate<SourceRecord> {
@Override
public boolean test(SourceRecord sourceRecord) {
return true;
}
}
}

View File

@ -144,9 +144,22 @@ public boolean equals(Object obj) {
*
* @param name the name of this reader; may not be null
* @param context the task context in which this reader is running; may not be null
* @param acceptAndContinue see {@link AbstractReader#AbstractReader(String, MySqlTaskContext, Predicate)}
*/
public BinlogReader(String name, MySqlTaskContext context) {
super(name, context);
public BinlogReader(String name, MySqlTaskContext context, Predicate<SourceRecord> acceptAndContinue) {
this(name, context, acceptAndContinue, context.serverId());
}
/**
* Create a binlog reader.
*
* @param name the name of this reader; may not be null
* @param context the task context in which this reader is running; may not be null
* @param acceptAndContinue see {@link AbstractReader#AbstractReader(String, MySqlTaskContext, Predicate)}
* @param serverId the server id to use for the {@link BinaryLogClient}
*/
public BinlogReader(String name, MySqlTaskContext context, Predicate<SourceRecord> acceptAndContinue, long serverId) {
super(name, context, acceptAndContinue);
connectionContext = context.getConnectionContext();
source = context.source();
@ -163,7 +176,7 @@ public BinlogReader(String name, MySqlTaskContext context) {
client = new BinaryLogClient(connectionContext.hostname(), connectionContext.port(), connectionContext.username(), connectionContext.password());
// BinaryLogClient will overwrite thread names later
client.setThreadFactory(Threads.threadFactory(MySqlConnector.class, context.getConnectorConfig().getLogicalName(), "binlog-client", false));
client.setServerId(context.serverId());
client.setServerId(serverId);
client.setSSLMode(sslModeFor(connectionContext.sslMode()));
client.setKeepAlive(context.config().getBoolean(MySqlConnectorConfig.KEEP_ALIVE));
client.setKeepAliveInterval(context.config().getLong(MySqlConnectorConfig.KEEP_ALIVE_INTERVAL_MS));
@ -238,6 +251,10 @@ public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
context.getConnectorConfig().getLogicalName());
}
public BinlogReader(String name, MySqlTaskContext context) {
this(name, context, null);
}
@Override
protected void doInitialize() {
metrics.register(logger);
@ -360,16 +377,23 @@ protected void rewindBinaryLogClient(BinlogPosition position) {
}
}
/**
* @return a copy of the last offset of this reader, or null if this reader has not completed a poll.
*/
public Map<String, ?> getLastOffset() {
return lastOffset == null? null : new HashMap<>(lastOffset);
}
@Override
protected void doStop() {
try {
if (isRunning()) {
logger.debug("Stopping binlog reader, last recorded offset: {}", lastOffset);
if (client.isConnected()) {
logger.debug("Stopping binlog reader '{}', last recorded offset: {}", this.name(), lastOffset);
client.disconnect();
}
cleanupResources();
} catch (IOException e) {
logger.error("Unexpected error when disconnecting from the MySQL binary log reader", e);
logger.error("Unexpected error when disconnecting from the MySQL binary log reader '{}'", this.name(), e);
}
}

View File

@ -118,7 +118,7 @@ public synchronized void stop() {
Reader current = currentReader.get();
if (current != null) {
try {
logger.info("Stopping the {} reader", current.name());
logger.info("ChainedReader: Stopping the {} reader", current.name());
current.stop();
} catch (Throwable t) {
logger.error("Unexpected error stopping the {} reader", current.name(), t);
@ -196,7 +196,7 @@ private boolean startNextReader() {
// There is at least one more reader, so start it ...
Reader lastReader = currentReader.getAndSet(null);
if (lastReader != null) {
logger.debug("Transitioning from the {} reader to the {} reader", lastReader.name(), reader.name());
logger.info("Transitioning from the {} reader to the {} reader", lastReader.name(), reader.name());
} else {
logger.debug("Starting the {} reader", reader.name());
}

View File

@ -7,6 +7,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@ -58,38 +59,16 @@ protected static List<String> withoutBuiltInDatabases(Collection<String> dbNames
private final Predicate<TableId> isBuiltInTable;
private final Predicate<ColumnId> columnFilter;
/**
* @param config the configuration; may not be null
*/
public Filters(Configuration config) {
this.isBuiltInDb = Filters::isBuiltInDatabase;
this.isBuiltInTable = Filters::isBuiltInTable;
// Define the filter used for database names ...
Predicate<String> dbFilter = Selectors.databaseSelector()
.includeDatabases(config.getString(MySqlConnectorConfig.DATABASE_WHITELIST))
.excludeDatabases(config.getString(MySqlConnectorConfig.DATABASE_BLACKLIST))
.build();
// Define the filter using the whitelists and blacklists for tables and database names ...
Predicate<TableId> tableFilter = Selectors.tableSelector()
.includeDatabases(config.getString(MySqlConnectorConfig.DATABASE_WHITELIST))
.excludeDatabases(config.getString(MySqlConnectorConfig.DATABASE_BLACKLIST))
.includeTables(config.getString(MySqlConnectorConfig.TABLE_WHITELIST))
.excludeTables(config.getString(MySqlConnectorConfig.TABLE_BLACKLIST))
.build();
// Ignore built-in databases and tables ...
if (config.getBoolean(MySqlConnectorConfig.TABLES_IGNORE_BUILTIN)) {
this.tableFilter = tableFilter.and(isBuiltInTable.negate());
this.dbFilter = dbFilter.and(isBuiltInDb.negate());
} else {
this.tableFilter = tableFilter;
private Filters(Predicate<String> dbFilter,
Predicate<TableId> tableFilter,
Predicate<String> isBuiltInDb,
Predicate<TableId> isBuiltInTable,
Predicate<ColumnId> columnFilter) {
this.dbFilter = dbFilter;
}
// Define the filter that excludes blacklisted columns, truncated columns, and masked columns ...
this.columnFilter = Selectors.excludeColumns(config.getString(MySqlConnectorConfig.COLUMN_BLACKLIST));
this.tableFilter = tableFilter;
this.isBuiltInDb = isBuiltInDb;
this.isBuiltInTable = isBuiltInTable;
this.columnFilter = columnFilter;
}
public Predicate<String> databaseFilter() {
@ -117,4 +96,151 @@ public Predicate<String> builtInDatabaseFilter() {
public Predicate<ColumnId> columnFilter() {
return columnFilter;
}
public static class Builder {
private Predicate<String> dbFilter;
private Predicate<TableId> tableFilter;
private Predicate<String> isBuiltInDb = Filters::isBuiltInDatabase;
private Predicate<TableId> isBuiltInTable = Filters::isBuiltInTable;
private Predicate<ColumnId> columnFilter;
private final Configuration config;
/**
* Create a Builder for a filter.
* Set the initial filter data to match the filter data in the given configuration.
* @param config the configuration of the connector.
*/
public Builder(Configuration config) {
this.config = config;
setFiltersFromStrings(config.getString(MySqlConnectorConfig.DATABASE_WHITELIST),
config.getString(MySqlConnectorConfig.DATABASE_BLACKLIST),
config.getString(MySqlConnectorConfig.TABLE_WHITELIST),
config.getString(MySqlConnectorConfig.TABLE_BLACKLIST));
// Define the filter that excludes blacklisted columns, truncated columns, and masked columns ...
this.columnFilter = Selectors.excludeColumns(config.getString(MySqlConnectorConfig.COLUMN_BLACKLIST));
}
/**
* Completely reset the filter to match the filter info in the given offsets.
* This will completely reset the filters to those passed in.
* @param offsets The offsets to set the filter info to.
* @return this
*/
public Builder setFiltersFromOffsets(Map<String, ?> offsets) {
setFiltersFromStrings((String)offsets.get(SourceInfo.DATABASE_WHITELIST_KEY),
(String)offsets.get(SourceInfo.DATABASE_BLACKLIST_KEY),
(String)offsets.get(SourceInfo.TABLE_WHITELIST_KEY),
(String)offsets.get(SourceInfo.TABLE_BLACKLIST_KEY));
return this;
}
private void setFiltersFromStrings(String dbWhitelist,
String dbBlacklist,
String tableWhitelist,
String tableBlacklist) {
Predicate<String> dbFilter = Selectors.databaseSelector()
.includeDatabases(dbWhitelist)
.excludeDatabases(dbBlacklist)
.build();
// Define the filter using the whitelists and blacklists for tables and database names ...
Predicate<TableId> tableFilter = Selectors.tableSelector()
.includeDatabases(dbWhitelist)
.excludeDatabases(dbBlacklist)
.includeTables(tableWhitelist)
.excludeTables(tableBlacklist)
.build();
// Ignore built-in databases and tables ...
if (config.getBoolean(MySqlConnectorConfig.TABLES_IGNORE_BUILTIN)) {
this.tableFilter = tableFilter.and(isBuiltInTable.negate());
this.dbFilter = dbFilter.and(isBuiltInDb.negate());
} else {
this.tableFilter = tableFilter;
this.dbFilter = dbFilter;
}
}
/**
* Set the filter to match the given other filter.
* This will completely reset the filters to those passed in.
* @param filters The other filter
* @return this
*/
public Builder setFiltersFromFilters(Filters filters) {
this.dbFilter = filters.dbFilter;
this.tableFilter = filters.tableFilter;
this.isBuiltInDb = filters.isBuiltInDb;
this.isBuiltInTable = filters.isBuiltInTable;
this.columnFilter = filters.columnFilter;
return this;
}
/**
* Exclude all those tables included by the given filter.
* @param otherFilter the filter
* @return this
*/
public Builder excludeAllTables(Filters otherFilter) {
excludeDatabases(otherFilter.dbFilter);
excludeTables(otherFilter.tableFilter);
return this;
}
/**
* Exclude all the databases that the given predicate tests as true for.
* @param databases the databases to excluded
* @return
*/
public Builder excludeDatabases(Predicate<String> databases) {
this.dbFilter = this.dbFilter.and(databases.negate());
return this;
}
/**
* Include all the databases that the given predicate tests as true for.
* All databases previously included will still be included.
* @param databases the databases to be included
* @return
*/
public Builder includeDatabases(Predicate<String> databases) {
this.dbFilter = this.dbFilter.or(databases);
return this;
}
/**
* Exclude all the tables that the given predicate tests as true for.
* @param tables the tables to be excluded.
* @return this
*/
public Builder excludeTables(Predicate<TableId> tables) {
this.tableFilter = this.tableFilter.and(tables.negate());
return this;
}
/**
* Include the tables that the given predicate tests as true for.
* Tables previously included will still be included.
* @param tables the tables to be included.
* @return this
*/
public Builder includeTables(Predicate<TableId> tables) {
this.tableFilter = this.tableFilter.or(tables);
return this;
}
/**
* Build the filters.
* @return the {@link Filters}
*/
public Filters build() {
return new Filters(this.dbFilter,
this.tableFilter,
this.isBuiltInDb,
this.isBuiltInTable,
this.columnFilter);
}
}
}

View File

@ -191,6 +191,57 @@ public static SnapshotMode parse(String value, String defaultValue) {
}
}
public static enum SnapshotNewTables implements EnumeratedValue {
/**
* Do not snapshot new tables
*/
OFF("off"),
/**
* Snapshot new tables in parallel to normal binlog reading.
*/
PARALLEL("parallel");
private final String value;
private SnapshotNewTables(String value) {
this.value = value;
}
@Override
public String getValue() {
return value;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @return the matching option, or null if no match is found
*/
public static SnapshotNewTables parse(String value) {
if (value == null) return null;
value = value.trim();
for (SnapshotNewTables option : SnapshotNewTables.values()) {
if (option.getValue().equalsIgnoreCase(value)) return option;
}
return null;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @param defaultValue the default value; may be null
* @return the matching option, or null if no match is found and the non-null default is invalid
*/
public static SnapshotNewTables parse(String value, String defaultValue) {
SnapshotNewTables snapshotNewTables = parse(value);
if (snapshotNewTables == null && defaultValue != null) snapshotNewTables = parse(defaultValue);
return snapshotNewTables;
}
}
/**
* The set of predefined Snapshot Locking Mode options.
*/
@ -497,7 +548,9 @@ public static DdlParsingMode parse(String value, String defaultValue) {
}
private static final String DATABASE_WHITELIST_NAME = "database.whitelist";
private static final String DATABASE_BLACKLIST_NAME = "database.blacklist";
private static final String TABLE_WHITELIST_NAME = "table.whitelist";
private static final String TABLE_BLACKLIST_NAME = "table.blacklist";
private static final String TABLE_IGNORE_BUILTIN_NAME = "table.ignore.builtin";
/**
@ -572,6 +625,17 @@ public static DdlParsingMode parse(String value, String defaultValue) {
+ "MySQL database cluster as another server (with this unique ID) so it can read "
+ "the binlog. By default, a random number is generated between 5400 and 6400.");
public static final Field SERVER_ID_OFFSET = Field.create("database.server.id.offset")
.withDisplayName("Cluster ID offset")
.withType(Type.LONG)
.withWidth(Width.LONG)
.withImportance(Importance.HIGH)
.withDefault(10000L)
.withDescription("Only relevant in parallel snapshotting is configured. During "
+ "parallel snapshotting, multiple (4) connections open to the database "
+ "client, and they each need their own unique connection ID. This offset is "
+ "used to generate those IDs from the base configured cluster ID.");
public static final Field SSL_MODE = Field.create("database.ssl.mode")
.withDisplayName("SSL mode")
.withEnum(SecureConnectionMode.class, SecureConnectionMode.DISABLED)
@ -649,7 +713,7 @@ public static DdlParsingMode parse(String value, String defaultValue) {
* A comma-separated list of regular expressions that match database names to be excluded from monitoring.
* May not be used with {@link #DATABASE_WHITELIST}.
*/
public static final Field DATABASE_BLACKLIST = Field.create("database.blacklist")
public static final Field DATABASE_BLACKLIST = Field.create(DATABASE_BLACKLIST_NAME)
.withDisplayName("Exclude Databases")
.withType(Type.STRING)
.withWidth(Width.LONG)
@ -677,7 +741,7 @@ public static DdlParsingMode parse(String value, String defaultValue) {
* monitoring. Fully-qualified names for tables are of the form {@code <databaseName>.<tableName>} or
* {@code <databaseName>.<schemaName>.<tableName>}. May not be used with {@link #TABLE_WHITELIST}.
*/
public static final Field TABLE_BLACKLIST = Field.create("table.blacklist")
public static final Field TABLE_BLACKLIST = Field.create(TABLE_BLACKLIST_NAME)
.withDisplayName("Exclude Tables")
.withType(Type.STRING)
.withWidth(Width.LONG)
@ -878,6 +942,19 @@ public static DdlParsingMode parse(String value, String defaultValue) {
+ "'schema_only_recovery' and is only safe to use if no schema changes are happening while the snapshot is taken.")
.withValidation(MySqlConnectorConfig::validateSnapshotLockingMode);
public static final Field SNAPSHOT_NEW_TABLES = Field.create("snapshot.new.tables")
.withDisplayName("Snapshot newly added tables")
.withEnum(SnapshotNewTables.class, SnapshotNewTables.OFF)
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDescription("BETA FEATURE: On connector restart, the connector will check if there have been any new tables added to the configuration, "
+ "and snapshot them. There is presently only two options:"
+ "'off': Default behavior. Do not snapshot new tables."
+ "'parallel': The snapshot of the new tables will occur in parallel to the continued binlog reading of the old tables. When the snapshot "
+ "completes, an independent binlog reader will begin reading the events for the new tables until it catches up to present time. At this "
+ "point, both old and new binlog readers will be momentarily halted and new binlog reader will start that will read the binlog for all "
+ "configured tables. The parallel binlog reader will have a configured server id of 10000 + the primary binlog reader's server id.");
public static final Field TIME_PRECISION_MODE = Field.create("time.precision.mode")
.withDisplayName("Time Precision")
.withEnum(TemporalPrecisionMode.class, TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS)
@ -973,7 +1050,7 @@ public static final Field MASK_COLUMN(int length) {
/**
* The set of {@link Field}s defined as part of this configuration.
*/
public static Field.Set ALL_FIELDS = Field.setOf(USER, PASSWORD, HOSTNAME, PORT, ON_CONNECT_STATEMENTS, SERVER_ID,
public static Field.Set ALL_FIELDS = Field.setOf(USER, PASSWORD, HOSTNAME, PORT, ON_CONNECT_STATEMENTS, SERVER_ID, SERVER_ID_OFFSET,
SERVER_NAME,
CONNECTION_TIMEOUT_MS, KEEP_ALIVE, KEEP_ALIVE_INTERVAL_MS,
CommonConnectorConfig.MAX_QUEUE_SIZE,
@ -983,7 +1060,8 @@ public static final Field MASK_COLUMN(int length) {
Heartbeat.HEARTBEAT_TOPICS_PREFIX, DATABASE_HISTORY, INCLUDE_SCHEMA_CHANGES, INCLUDE_SQL_QUERY,
TABLE_WHITELIST, TABLE_BLACKLIST, TABLES_IGNORE_BUILTIN,
DATABASE_WHITELIST, DATABASE_BLACKLIST,
COLUMN_BLACKLIST, SNAPSHOT_MODE, SNAPSHOT_MINIMAL_LOCKING, SNAPSHOT_LOCKING_MODE,
COLUMN_BLACKLIST,
SNAPSHOT_MODE, SNAPSHOT_NEW_TABLES, SNAPSHOT_MINIMAL_LOCKING, SNAPSHOT_LOCKING_MODE,
GTID_SOURCE_INCLUDES, GTID_SOURCE_EXCLUDES,
GTID_SOURCE_FILTER_DML_EVENTS,
GTID_NEW_CHANNEL_POSITION,
@ -1057,7 +1135,7 @@ public GtidNewChannelPosition gtidNewChannelPosition() {
protected static ConfigDef configDef() {
ConfigDef config = new ConfigDef();
Field.group(config, "MySQL", HOSTNAME, PORT, USER, PASSWORD, ON_CONNECT_STATEMENTS, SERVER_NAME, SERVER_ID,
Field.group(config, "MySQL", HOSTNAME, PORT, USER, PASSWORD, ON_CONNECT_STATEMENTS, SERVER_NAME, SERVER_ID, SERVER_ID_OFFSET,
SSL_MODE, SSL_KEYSTORE, SSL_KEYSTORE_PASSWORD, SSL_TRUSTSTORE, SSL_TRUSTSTORE_PASSWORD, JDBC_DRIVER);
Field.group(config, "History Storage", KafkaDatabaseHistory.BOOTSTRAP_SERVERS,
KafkaDatabaseHistory.TOPIC, KafkaDatabaseHistory.RECOVERY_POLL_ATTEMPTS,
@ -1071,7 +1149,7 @@ protected static ConfigDef configDef() {
CommonConnectorConfig.TOMBSTONES_ON_DELETE);
Field.group(config, "Connector", CONNECTION_TIMEOUT_MS, KEEP_ALIVE, KEEP_ALIVE_INTERVAL_MS, CommonConnectorConfig.MAX_QUEUE_SIZE,
CommonConnectorConfig.MAX_BATCH_SIZE, CommonConnectorConfig.POLL_INTERVAL_MS,
SNAPSHOT_MODE, SNAPSHOT_LOCKING_MODE, SNAPSHOT_MINIMAL_LOCKING, TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE,
SNAPSHOT_MODE, SNAPSHOT_LOCKING_MODE, SNAPSHOT_NEW_TABLES, SNAPSHOT_MINIMAL_LOCKING, TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE,
BIGINT_UNSIGNED_HANDLING_MODE, SNAPSHOT_DELAY_MS, DDL_PARSER_MODE);
return config;
}

View File

@ -7,10 +7,16 @@
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import io.debezium.util.LoggingContext;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
@ -22,6 +28,7 @@
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Collect;
import io.debezium.util.LoggingContext.PreviousContext;
/**
@ -54,20 +61,21 @@ public String version() {
@Override
public synchronized void start(Configuration config) {
// Create and start the task context ...
this.taskContext = new MySqlTaskContext(config);
this.connectionContext = taskContext.getConnectionContext();
final String serverName = config.getString(MySqlConnectorConfig.SERVER_NAME);
PreviousContext prevLoggingContext = LoggingContext.forConnector("MySQL", serverName, "task");
PreviousContext prevLoggingContext = this.taskContext.configureLoggingContext("task");
try {
this.taskContext.start();
// Get the offsets for our partition ...
boolean startWithSnapshot = false;
boolean snapshotEventsAreInserts = true;
final SourceInfo source = taskContext.source();
Map<String, ?> offsets = context.offsetStorageReader().offset(taskContext.source().partition());
Map<String, String> partition = Collect.hashMapOf(SourceInfo.SERVER_PARTITION_KEY, serverName);
Map<String, ?> offsets = getRestartOffset(context.offsetStorageReader().offset(partition));
final SourceInfo source;
if (offsets != null) {
Filters filters = SourceInfo.offsetsHaveFilterInfo(offsets) ? getOldFilters(offsets, config) : getAllFilters(config);
this.taskContext = createAndStartTaskContext(config, filters);
this.connectionContext = taskContext.getConnectionContext();
source = taskContext.source();
// Set the position in our source info ...
source.setOffset(offsets);
logger.info("Found existing offset: {}", offsets);
@ -124,7 +132,11 @@ public synchronized void start(Configuration config) {
} else {
// We have no recorded offsets ...
this.taskContext = createAndStartTaskContext(config, getAllFilters(config));
taskContext.initializeHistoryStorage();
this.connectionContext = taskContext.getConnectionContext();
source = taskContext.source();
if (taskContext.isSnapshotNeverAllowed()) {
// We're not allowed to take a snapshot, so instead we have to assume that the binlog contains the
// full history of the database.
@ -161,7 +173,6 @@ public synchronized void start(Configuration config) {
ChainedReader.Builder chainedReaderBuilder = new ChainedReader.Builder();
// Set up the readers, with a callback to `completeReaders` so that we know when it is finished ...
BinlogReader binlogReader = new BinlogReader("binlog", taskContext);
if (startWithSnapshot) {
// We're supposed to start with a snapshot, so set that up ...
SnapshotReader snapshotReader = new SnapshotReader("snapshot", taskContext);
@ -183,17 +194,59 @@ public synchronized void start(Configuration config) {
+ "required for this connector to work properly. Change the MySQL configuration to use a "
+ "row-level binlog and restart the connector.");
}
BinlogReader binlogReader = new BinlogReader("binlog", taskContext, null);
chainedReaderBuilder.addReader(binlogReader);
}
} else {
if (!source.hasFilterInfo()) {
// if we don't have filter info, then either
// 1. the snapshot was taken in a version of debezium before the filter info was stored in the offsets, or
// 2. this connector previously had no filter information.
// either way, we have to assume that the filter information currently in the config accurately reflects
// the current state of the connector.
source.maybeSetFilterDataFromConfig(config);
}
if (!rowBinlogEnabled) {
throw new ConnectException(
"The MySQL server does not appear to be using a row-level binlog, which is required for this connector to work properly. Enable this mode and restart the connector.");
}
// if there are new tables
if (newTablesInConfig()) {
// and we are configured to run a parallel snapshot
if (taskContext.snapshotNewTables() == MySqlConnectorConfig.SnapshotNewTables.PARALLEL) {
ServerIdGenerator serverIdGenerator =
new ServerIdGenerator(config.getLong(MySqlConnectorConfig.SERVER_ID),
config.getLong(MySqlConnectorConfig.SERVER_ID_OFFSET));
ParallelSnapshotReader parallelSnapshotReader = new ParallelSnapshotReader(config,
taskContext,
getNewFilters(offsets, config),
serverIdGenerator);
MySqlTaskContext unifiedTaskContext = createAndStartTaskContext(config, getAllFilters(config));
// we aren't completing a snapshot, but we need to make sure the "snapshot" flag is false for this new context.
unifiedTaskContext.source().completeSnapshot();
BinlogReader unifiedBinlogReader = new BinlogReader("binlog",
unifiedTaskContext,
null,
serverIdGenerator.getConfiguredServerId());
ReconcilingBinlogReader reconcilingBinlogReader = parallelSnapshotReader.createReconcilingBinlogReader(unifiedBinlogReader);
chainedReaderBuilder.addReader(parallelSnapshotReader);
chainedReaderBuilder.addReader(reconcilingBinlogReader);
chainedReaderBuilder.addReader(unifiedBinlogReader);
unifiedBinlogReader.uponCompletion(unifiedTaskContext::shutdown);
}
} else {
// We're going to start by reading the binlog ...
BinlogReader binlogReader = new BinlogReader("binlog", taskContext, null);
chainedReaderBuilder.addReader(binlogReader);
}
}
readers = chainedReaderBuilder.build();
readers.uponCompletion(this::completeReaders);
@ -222,6 +275,133 @@ public synchronized void start(Configuration config) {
}
}
public class ServerIdGenerator {
private final long configuredServerId;
private final long offset;
private int counter;
private ServerIdGenerator(long configuredServerId, long configuredOffset) {
this.configuredServerId = configuredServerId;
this.offset = configuredOffset;
this.counter = 0;
}
public long getNextServerId() {
counter++;
return configuredServerId + (counter * offset);
}
public long getConfiguredServerId() {
return configuredServerId;
}
}
/**
* Get the offset to restart the connector from. Normally, this is just the stored offset.
*
* However, if we were doing a parallel load with new tables, it's possible that the last
* committed offset is from reading the new tables, which could be beyond where we want to
* restart from (and restarting there could cause skipped events). To fix this, the new
* tables binlog reader records extra information in its offset to tell the connector where
* to restart from. If this extra information is present in the stored offset, that is the
* offset that is returned.
* @param storedOffset the stored offset.
* @return the offset to restart from.
* @see RecordMakers#RecordMakers(MySqlSchema, SourceInfo, TopicSelector, boolean, Map)
*/
@SuppressWarnings("unchecked")
private Map<String, ?> getRestartOffset(Map<String, ?> storedOffset) {
Map<String, Object> restartOffset = new HashMap<>();
if (storedOffset != null) {
for (String key : storedOffset.keySet()){
if (key.startsWith(SourceInfo.RESTART_PREFIX)) {
String newKey = key.substring(SourceInfo.RESTART_PREFIX.length());
restartOffset.put(newKey, storedOffset.get(key));
}
}
}
return restartOffset.isEmpty()? storedOffset : restartOffset;
}
private static MySqlTaskContext createAndStartTaskContext(Configuration config,
Filters filters) {
MySqlTaskContext taskContext = new MySqlTaskContext(config, filters);
taskContext.start();
return taskContext;
}
/**
* @return true if new tables appear to have been added to the config, and false otherwise.
*/
private boolean newTablesInConfig() {
final String elementSep = "/s*,/s*";
// take in two stringified lists, and return true if the first list contains elements that are not in the second list
BiFunction<String, String, Boolean> hasExclusiveElements = (String a, String b) -> {
if (a == null || a.isEmpty()) {
return false;
} else if (b == null || b.isEmpty()) {
return true;
}
Set<String> bSet = Stream.of(b.split(elementSep)).collect(Collectors.toSet());
return !Stream.of(a.split(elementSep)).filter((x) -> !bSet.contains(x)).collect(Collectors.toSet()).isEmpty();
};
final SourceInfo sourceInfo = taskContext.source();
final Configuration config = taskContext.config();
if (!sourceInfo.hasFilterInfo()) {
// if there was previously no filter info, then we either can't evaluate if there are new tables,
// or there aren't any new tables because we previously used no filter.
return false;
}
// otherwise, we have filter info
// if either whitelist has been added to, then we may have new tables
if (hasExclusiveElements.apply(config.getString(MySqlConnectorConfig.DATABASE_WHITELIST), sourceInfo.getDatabaseWhitelist())) {
return true;
}
if (hasExclusiveElements.apply(config.getString(MySqlConnectorConfig.TABLE_WHITELIST), sourceInfo.getTableWhitelist())) {
return true;
}
// if either blacklist has been removed from, then we may have new tables
if (hasExclusiveElements.apply(sourceInfo.getDatabaseBlacklist(), config.getString(MySqlConnectorConfig.DATABASE_BLACKLIST))) {
return true;
}
if (hasExclusiveElements.apply(sourceInfo.getTableBlacklist(), config.getString(MySqlConnectorConfig.TABLE_BLACKLIST))) {
return true;
}
// otherwise, false.
return false;
}
/**
* Get the filters representing the tables that have been newly added to the config, but
* not those that previously existed in the config.
* @return {@link Filters}
*/
private static Filters getNewFilters(Map<String, ?> offsets, Configuration config) {
Filters oldFilters = getOldFilters(offsets, config);
return new Filters.Builder(config).excludeAllTables(oldFilters).build();
}
/**
* Get the filters representing those tables that previously existed in the config, but
* not those newly added to the config.
* @return {@link Filters}
*/
private static Filters getOldFilters(Map<String, ?> offsets, Configuration config) {
return new Filters.Builder(config).setFiltersFromOffsets(offsets).build();
}
/**
* Get the filters representing all tables represented by the config.
* @return {@link Filters}
*/
private static Filters getAllFilters(Configuration config) {
return new Filters.Builder(config).build();
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
Reader currentReader = readers;

View File

@ -82,12 +82,16 @@ public class MySqlSchema extends RelationalDatabaseSchema {
* may be null if not needed
* @param tableIdCaseInsensitive true if table lookup ignores letter case
*/
public MySqlSchema(MySqlConnectorConfig configuration, Predicate<String> gtidFilter, boolean tableIdCaseInsensitive, TopicSelector<TableId> topicSelector) {
public MySqlSchema(MySqlConnectorConfig configuration,
Predicate<String> gtidFilter,
boolean tableIdCaseInsensitive,
TopicSelector<TableId> topicSelector,
Filters tableFilters) {
super(
configuration,
topicSelector,
TableFilter.fromPredicate(new Filters(configuration.getConfig()).tableFilter()),
new Filters(configuration.getConfig()).columnFilter(),
TableFilter.fromPredicate(new Filters.Builder(configuration.getConfig()).build().tableFilter()),
new Filters.Builder(configuration.getConfig()).build().columnFilter(),
new TableSchemaBuilder(
getValueConverters(configuration), SchemaNameAdjuster.create(logger), SourceInfo.SCHEMA)
,
@ -96,7 +100,7 @@ public MySqlSchema(MySqlConnectorConfig configuration, Predicate<String> gtidFil
Configuration config = configuration.getConfig();
this.filters = new Filters(config);
this.filters = tableFilters;
// Do not remove the prefix from the subset of config properties ...
String connectorName = config.getString("name", configuration.getLogicalName());

View File

@ -17,6 +17,7 @@
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.connector.mysql.MySqlConnectorConfig.GtidNewChannelPosition;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotNewTables;
import io.debezium.function.Predicates;
import io.debezium.relational.TableId;
import io.debezium.relational.history.DatabaseHistory;
@ -49,12 +50,15 @@ public final class MySqlTaskContext extends CdcSourceTaskContext {
*/
private final boolean tableIdCaseInsensitive;
public MySqlTaskContext(Configuration config) {
this(config, null);
public MySqlTaskContext(Configuration config, Filters filters) {
this(config, filters, null, null);
}
public MySqlTaskContext(Configuration config, Boolean tableIdCaseInsensitive) {
// MySQL now calculates JMX binlog reader metrics on its own
public MySqlTaskContext(Configuration config, Filters filters, Map<String, ?> restartOffset) {
this(config, filters, null, restartOffset);
}
public MySqlTaskContext(Configuration config, Filters filters, Boolean tableIdCaseInsensitive, Map<String, ?> restartOffset) {
super("MySQL", config.getString(MySqlConnectorConfig.SERVER_NAME), Collections::emptyList);
this.config = config;
@ -81,10 +85,10 @@ public MySqlTaskContext(Configuration config, Boolean tableIdCaseInsensitive) {
}
// Set up the MySQL schema ...
this.dbSchema = new MySqlSchema(connectorConfig, this.gtidSourceFilter, this.tableIdCaseInsensitive, topicSelector);
this.dbSchema = new MySqlSchema(connectorConfig, this.gtidSourceFilter, this.tableIdCaseInsensitive, topicSelector, filters);
// Set up the record processor ...
this.recordProcessor = new RecordMakers(dbSchema, source, topicSelector, config.getBoolean(CommonConnectorConfig.TOMBSTONES_ON_DELETE));
this.recordProcessor = new RecordMakers(dbSchema, source, topicSelector, config.getBoolean(CommonConnectorConfig.TOMBSTONES_ON_DELETE), restartOffset);
// Set up the DDL filter
final String ddlFilter = config.getString(DatabaseHistory.DDL_FILTER);
@ -242,6 +246,11 @@ protected SnapshotMode snapshotMode() {
return SnapshotMode.parse(value, MySqlConnectorConfig.SNAPSHOT_MODE.defaultValueAsString());
}
protected SnapshotNewTables snapshotNewTables() {
String value = config.getString(MySqlConnectorConfig.SNAPSHOT_NEW_TABLES);
return SnapshotNewTables.parse(value, MySqlConnectorConfig.SNAPSHOT_NEW_TABLES.defaultValueAsString());
}
public String getSnapshotSelectOverrides() {
return config.getString(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE);
}

View File

@ -0,0 +1,243 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import io.debezium.config.Configuration;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
/**
* A reader that runs a {@link ChainedReader} consisting of a {@link SnapshotReader} and a {@link BinlogReader}
* for all tables newly added to the config in parallel with a {@link BinlogReader} for all the tables previously
* in the config.
*/
public class ParallelSnapshotReader implements Reader {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final BinlogReader oldTablesReader;
private final BinlogReader newTablesBinlogReader;
private final ChainedReader newTablesReader;
private final AtomicBoolean running = new AtomicBoolean(false);
private final AtomicBoolean completed = new AtomicBoolean(false);
private final AtomicReference<Runnable> uponCompletion = new AtomicReference<>();
private final MySqlConnectorTask.ServerIdGenerator serverIdGenerator;
/**
* Create a ParallelSnapshotReader.
*
* @param config the current connector configuration.
* @param noSnapshotContext The context for those tables not undergoing a snapshot.
* @param snapshotFilters {@link Filters} matching the tables that should be snapshotted.
* @param serverIdGenerator a generator for creating unconflicting serverIds.
*/
public ParallelSnapshotReader(Configuration config,
MySqlTaskContext noSnapshotContext,
Filters snapshotFilters,
MySqlConnectorTask.ServerIdGenerator serverIdGenerator) {
this.serverIdGenerator = serverIdGenerator;
AtomicBoolean oldTablesReaderNearEnd = new AtomicBoolean(false);
AtomicBoolean newTablesReaderNearEnd = new AtomicBoolean(false);
ParallelHaltingPredicate oldTablesReaderHaltingPredicate =
new ParallelHaltingPredicate(oldTablesReaderNearEnd, newTablesReaderNearEnd);
ParallelHaltingPredicate newTablesReaderHaltingPredicate =
new ParallelHaltingPredicate(newTablesReaderNearEnd, oldTablesReaderNearEnd);
this.oldTablesReader = new BinlogReader("oldBinlog",
noSnapshotContext,
oldTablesReaderHaltingPredicate,
serverIdGenerator.getNextServerId());
MySqlTaskContext newTablesContext = new MySqlTaskContext(config,
snapshotFilters,
noSnapshotContext.source().offset());
newTablesContext.start();
SnapshotReader newTablesSnapshotReader = new SnapshotReader("newSnapshot", newTablesContext);
this.newTablesBinlogReader = new BinlogReader("newBinlog",
newTablesContext,
newTablesReaderHaltingPredicate,
serverIdGenerator.getNextServerId());
this.newTablesReader = new ChainedReader.Builder().addReader(newTablesSnapshotReader).addReader(newTablesBinlogReader).build();
}
// for testing purposes
/*package private*/ ParallelSnapshotReader(BinlogReader oldTablesBinlogReader,
SnapshotReader newTablesSnapshotReader,
BinlogReader newTablesBinlogReader) {
this.oldTablesReader = oldTablesBinlogReader;
this.newTablesBinlogReader = newTablesBinlogReader;
this.newTablesReader = new ChainedReader.Builder().addReader(newTablesSnapshotReader).addReader(newTablesBinlogReader).build();
this.serverIdGenerator = null;
}
/**
* Create and return a {@link ReconcilingBinlogReader} for the two binlog readers contained in this
* ParallelSnapshotReader.
* @return a {@link ReconcilingBinlogReader}
*/
public ReconcilingBinlogReader createReconcilingBinlogReader(BinlogReader unifiedReader) {
return new ReconcilingBinlogReader(oldTablesReader,
newTablesBinlogReader,
unifiedReader,
serverIdGenerator.getNextServerId());
}
@Override
public void uponCompletion(Runnable handler) {
uponCompletion.set(handler);
}
@Override
public void initialize() {
oldTablesReader.initialize();
newTablesReader.initialize();
}
@Override
public void start() {
if (running.compareAndSet(false, true)) {
oldTablesReader.start();
newTablesReader.start();
}
}
@Override
public void stop() {
if (running.compareAndSet(true, false)) {
try {
logger.info("Stopping the {} reader", oldTablesReader.name());
oldTablesReader.stop();
oldTablesReader.context.shutdown();
} catch (Throwable t) {
logger.error("Unexpected error stopping the {} reader", oldTablesReader.name());
}
try {
logger.info("Stopping the {} reader", newTablesReader.name());
newTablesReader.stop();
oldTablesReader.context.shutdown();
} catch (Throwable t) {
logger.error("Unexpected error stopping the {} reader", newTablesReader.name());
}
}
}
@Override
public State state() {
if (running.get()) {
return State.RUNNING;
}
return State.STOPPED;
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
// the old tables reader is a raw BinlogReader and will throw an exception of poll is called when it is not running.
List<SourceRecord> allRecords = oldTablesReader.isRunning()? oldTablesReader.poll() : null;
List<SourceRecord> newTablesRecords = newTablesReader.poll();
if (newTablesRecords != null) {
if (allRecords == null) {
allRecords = newTablesRecords;
} else {
allRecords.addAll(newTablesRecords);
}
}
else {
// else newTableRecords == null
if (allRecords == null) {
// if both readers have stopped, we need to stop.
completeSuccessfully();
}
}
return allRecords;
}
private void completeSuccessfully() {
if (completed.compareAndSet(false, true)) {
stop();
Runnable completionHandler = uponCompletion.getAndSet(null); // set to null so that we call it only once
if (completionHandler != null) {
completionHandler.run();
}
}
}
@Override
public String name() {
return "parallelSnapshotReader";
}
/**
* A Halting Predicate for the parallel snapshot reader.
* Usage for this predicate assumes two readers using two ParallelHalting Predicates.
* The booleans are owned by the two predicates, and keep track of whether they or the
* other reader has reached the end of the binlog.
*
* Test returns false if both both readers have been determined to be near the end of the
* binlog.
* Being near the end of the binlog is determined to be this predicate having seen a record
* with a timestamp within {@link ParallelHaltingPredicate#DEFAULT_MIN_HALTING_DURATION} of
* the current time. Once a single record near the end of the binlog has been seen, we
* we assume the reader will stay near the end of the binlog.
*/
/*package local*/ static class ParallelHaltingPredicate implements Predicate<SourceRecord> {
private final Logger logger = LoggerFactory.getLogger(getClass());
private volatile AtomicBoolean thisReaderNearEnd;
private volatile AtomicBoolean otherReaderNearEnd;
// The minimum duration we must be within before we attempt to halt.
private final Duration minHaltingDuration;
// is hard coded in as 5 minutes.
private static final Duration DEFAULT_MIN_HALTING_DURATION = Duration.ofMinutes(5);
/*package local*/ ParallelHaltingPredicate(AtomicBoolean thisReaderNearEndRef,
AtomicBoolean otherReaderNearEndRef) {
this(thisReaderNearEndRef, otherReaderNearEndRef, DEFAULT_MIN_HALTING_DURATION);
}
/*package local*/ ParallelHaltingPredicate(AtomicBoolean thisReaderNearEndRef,
AtomicBoolean otherReaderNearEndRef,
Duration minHaltingDuration) {
this.otherReaderNearEnd = otherReaderNearEndRef;
this.thisReaderNearEnd = thisReaderNearEndRef;
this.minHaltingDuration = minHaltingDuration;
}
@Override
public boolean test(SourceRecord ourSourceRecord) {
// we assume if we ever end up near the end of the binlog, then we will remain there.
if (!thisReaderNearEnd.get()) {
Long sourceRecordTimestamp = (Long) ourSourceRecord.sourceOffset().get(SourceInfo.TIMESTAMP_KEY);
Instant recordTimestamp = Instant.ofEpochSecond(sourceRecordTimestamp);
Instant now = Instant.now();
Duration durationToEnd =
Duration.between(recordTimestamp,
now);
if (durationToEnd.compareTo(minHaltingDuration) <= 0) {
// we are within minHaltingDuration of the end
logger.debug("Parallel halting predicate: this reader near end");
thisReaderNearEnd.set(true);
}
}
// return false if both readers are near end, true otherwise.
return !(thisReaderNearEnd.get() && otherReaderNearEnd.get());
}
}
}

View File

@ -0,0 +1,229 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import io.debezium.document.Document;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import static io.debezium.connector.mysql.SourceInfo.BINLOG_FILENAME_OFFSET_KEY;
import static io.debezium.connector.mysql.SourceInfo.BINLOG_POSITION_OFFSET_KEY;
/**
* A reader that unifies the binlog positions of two binlog readers.
*
* To do this, at start time we evaluate the (now completed) states of the two binlog
* readers we want to unify, and create a new {@link BinlogReader} duplicating the
* lagging reader, but with a halting predicate that will halt it once it has passed the
* final position of the leading reader.
*/
public class ReconcilingBinlogReader implements Reader {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final BinlogReader binlogReaderA;
private final BinlogReader binlogReaderB;
private final BinlogReader unifiedReader;
private BinlogReader reconcilingReader;
private Boolean aReaderLeading = null;
private final AtomicBoolean running = new AtomicBoolean(false);
private final AtomicBoolean completed = new AtomicBoolean(false);
private final AtomicReference<Runnable> uponCompletion = new AtomicReference<>();
private final long serverId;
/**
* Create a reconciling Binlog Reader.
*
* @param binlogReaderA the first binlog reader to unify.
* @param binlogReaderB the second binlog reader to unify.
* @param unifiedReader the final, unified binlog reader that will run once the reconciliation is complete.
*/
public ReconcilingBinlogReader(BinlogReader binlogReaderA,
BinlogReader binlogReaderB,
BinlogReader unifiedReader,
long serverId) {
this.binlogReaderA = binlogReaderA;
this.binlogReaderB = binlogReaderB;
this.unifiedReader = unifiedReader;
this.serverId = serverId;
}
@Override
public String name() {
return "reconcilingBinlogReader";
}
@Override
public State state() {
if (running.get()) {
return State.RUNNING;
}
return completed.get() ? State.STOPPED : State.STOPPING;
}
@Override
public void uponCompletion(Runnable handler) {
uponCompletion.set(handler);
}
@Override
public void start() {
if (running.compareAndSet(false, true)) {
completed.set(false);
determineLeadingReader();
MySqlTaskContext laggingReaderContext = getLaggingReader().context;
OffsetLimitPredicate offsetLimitPredicate =
new OffsetLimitPredicate(getLeadingReader().getLastOffset(),
laggingReaderContext.gtidSourceFilter());
// create our actual reader
reconcilingReader = new BinlogReader("innerReconcilingReader",
laggingReaderContext,
offsetLimitPredicate,
serverId);
reconcilingReader.start();
}
}
@Override
public void stop() {
if (running.compareAndSet(true, false)){
try {
logger.info("Stopping the {} reader", reconcilingReader.name());
reconcilingReader.stop();
reconcilingReader.context.shutdown();
} catch (Throwable t) {
logger.error("Unexpected error stopping the {} reader", reconcilingReader.name());
}
}
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
List<SourceRecord> innerReaderPoll = reconcilingReader.poll();
if (innerReaderPoll == null) {
completeSuccessfully();
}
return innerReaderPoll;
}
private void completeSuccessfully() {
if (completed.compareAndSet(false, true)){
stop();
setupUnifiedReader();
logger.info("Completed Reconciliation of Parallel Readers.");
Runnable completionHandler = uponCompletion.getAndSet(null); // set to null so that we call it only once
if (completionHandler != null) {
completionHandler.run();
}
}
}
private void setupUnifiedReader() {
unifiedReader.context.loadHistory(getLeadingReader().context.source());
unifiedReader.context.source().setFilterDataFromConfig(unifiedReader.context.config());
Map<String, ?> keyedOffset =
reconcilingReader.getLastOffset() == null ?
getLeadingReader().getLastOffset() :
reconcilingReader.getLastOffset();
unifiedReader.context.source()
.setBinlogStartPoint((String) keyedOffset.get(BINLOG_FILENAME_OFFSET_KEY),
(Long) keyedOffset.get(BINLOG_POSITION_OFFSET_KEY));
// note: this seems to dupe -one- event in my tests.
// I don't totally understand why that's happening (that is, I don't understand
// why the lastOffset seems to be before the actual last record) but this seems
// like a minor issue to me.
}
private void determineLeadingReader() {
Map<String, ?> aOffset = binlogReaderA.getLastOffset();
Map<String, ?> bOffset = binlogReaderB.getLastOffset();
boolean aNotStopped = binlogReaderA.state() != State.STOPPED;
boolean bNotStopped = binlogReaderB.state() != State.STOPPED;
boolean noOffsets = aOffset == null && bOffset == null;
if (noOffsets || aNotStopped || bNotStopped) {
throw new IllegalStateException("Cannot determine leading reader until both source readers have completed.");
}
// if one reader has not processed any events, its 'lastOffset' will be null.
// in this case, it must the be the lagging reader.
if (aOffset == null) {
aReaderLeading = false;
} else if (bOffset == null) {
aReaderLeading = true;
} else {
Document aDocument = SourceInfo.createDocumentFromOffset(aOffset);
Document bDocument = SourceInfo.createDocumentFromOffset(bOffset);
aReaderLeading = SourceInfo.isPositionAtOrBefore(bDocument,
aDocument,
binlogReaderA.context.gtidSourceFilter());
}
if (aReaderLeading) {
logger.info("old tables leading; reading only from new tables");
} else {
logger.info("new tables leading; reading only from old tables");
}
}
/*package private*/ BinlogReader getLeadingReader() {
checkLaggingLeadingInfo();
return aReaderLeading? binlogReaderA : binlogReaderB;
}
/*package private*/ BinlogReader getLaggingReader() {
checkLaggingLeadingInfo();
return aReaderLeading? binlogReaderB : binlogReaderA;
}
private void checkLaggingLeadingInfo() {
if (aReaderLeading == null) {
throw new IllegalStateException("Cannot return leading or lagging readers until this reader has started.");
}
}
// package private for testing purposes
/**
* A Predicate that returns false for any record beyond a given offset.
*/
/*package private*/ static class OffsetLimitPredicate implements Predicate<SourceRecord> {
private Document leadingReaderFinalOffsetDocument;
private Predicate<String> gtidFilter;
/*package private*/ OffsetLimitPredicate(Map<String, ?> leadingReaderFinalOffset,
Predicate<String> gtidFilter) {
this.leadingReaderFinalOffsetDocument = SourceInfo.createDocumentFromOffset(leadingReaderFinalOffset);
this.gtidFilter = gtidFilter;
}
@Override
public boolean test(SourceRecord sourceRecord) {
Document offsetDocument = SourceInfo.createDocumentFromOffset(sourceRecord.sourceOffset());
// .isPositionAtOrBefore is true IFF leadingReaderFinalOffsetDocument <= offsetDocument
// we should stop (return false) IFF leadingReaderFinalOffsetDocument <= offsetDocument
return
! SourceInfo.isPositionAtOrBefore(leadingReaderFinalOffsetDocument,
offsetDocument,
gtidFilter);
}
}
}

View File

@ -46,6 +46,7 @@ public class RecordMakers {
private final Schema schemaChangeKeySchema;
private final Schema schemaChangeValueSchema;
private final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(logger);
private Map<String, ?> restartOffset = null;
/**
* Create the record makers using the supplied components.
@ -71,6 +72,24 @@ public RecordMakers(MySqlSchema schema, SourceInfo source, TopicSelector<TableId
.build();
}
/**
* @param restartOffset the offset to publish with the {@link SourceInfo#RESTART_PREFIX} prefix
* as additional information in the offset. If the connector attempts to
* restart from an offset with information with this prefix it will
* create an offset from the prefixed information rather than restarting
* from the base offset.
* @see RecordMakers#RecordMakers(MySqlSchema, SourceInfo, TopicSelector, boolean)
* @see MySqlConnectorTask#getRestartOffset(Map)
*/
public RecordMakers(MySqlSchema schema,
SourceInfo source,
TopicSelector<TableId> topicSelector,
boolean emitTombstoneOnDelete,
Map<String, ?> restartOffset) {
this(schema, source, topicSelector, emitTombstoneOnDelete);
this.restartOffset = restartOffset;
}
/**
* Obtain the record maker for the given table, using the specified columns and sending records to the given consumer.
*
@ -160,6 +179,20 @@ public void regenerate() {
});
}
private Map<String, ?> getSourceRecordOffset(Map<String, ?> sourceOffset) {
if (restartOffset == null) {
return sourceOffset;
}
else {
Map<String, Object> offset = (Map<String, Object>) sourceOffset;
for(String key : restartOffset.keySet()){
StringBuilder sb = new StringBuilder(SourceInfo.RESTART_PREFIX);
offset.put(sb.append(key).toString(), restartOffset.get(key));
}
return offset;
}
}
/**
* Assign the given table number to the table with the specified {@link TableId table ID}.
*
@ -196,7 +229,7 @@ public int read(SourceInfo source, Object[] row, int rowNumber, int numberOfRows
Map<String, ?> partition = source.partition();
Map<String, ?> offset = source.offsetForRow(rowNumber, numberOfRows);
Struct origin = source.struct(id);
SourceRecord record = new SourceRecord(partition, offset, topicName, partitionNum,
SourceRecord record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum,
keySchema, key, envelope.schema(), envelope.read(value, origin, ts));
consumer.accept(record);
return 1;
@ -215,7 +248,7 @@ public int insert(SourceInfo source, Object[] row, int rowNumber, int numberOfRo
Map<String, ?> partition = source.partition();
Map<String, ?> offset = source.offsetForRow(rowNumber, numberOfRows);
Struct origin = source.struct(id);
SourceRecord record = new SourceRecord(partition, offset, topicName, partitionNum,
SourceRecord record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum,
keySchema, key, envelope.schema(), envelope.create(value, origin, ts));
consumer.accept(record);
return 1;
@ -242,26 +275,26 @@ public int update(SourceInfo source, Object[] before, Object[] after, int rowNum
// The key has changed, so we need to deal with both the new key and old key.
// Consumers may push the events into a system that won't allow both records to exist at the same time,
// so we first want to send the delete event for the old key...
SourceRecord record = new SourceRecord(partition, offset, topicName, partitionNum,
SourceRecord record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum,
keySchema, oldKey, envelope.schema(), envelope.delete(valueBefore, origin, ts));
consumer.accept(record);
++count;
if (emitTombstoneOnDelete) {
// Next send a tombstone event for the old key ...
record = new SourceRecord(partition, offset, topicName, partitionNum, keySchema, oldKey, null, null);
record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum, keySchema, oldKey, null, null);
consumer.accept(record);
++count;
}
// And finally send the create event ...
record = new SourceRecord(partition, offset, topicName, partitionNum,
record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum,
keySchema, key, envelope.schema(), envelope.create(valueAfter, origin, ts));
consumer.accept(record);
++count;
} else {
// The key has not changed, so a simple update is fine ...
SourceRecord record = new SourceRecord(partition, offset, topicName, partitionNum,
SourceRecord record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum,
keySchema, key, envelope.schema(), envelope.update(valueBefore, valueAfter, origin, ts));
consumer.accept(record);
++count;
@ -283,14 +316,14 @@ public int delete(SourceInfo source, Object[] row, int rowNumber, int numberOfRo
Map<String, ?> offset = source.offsetForRow(rowNumber, numberOfRows);
Struct origin = source.struct(id);
// Send a delete message ...
SourceRecord record = new SourceRecord(partition, offset, topicName, partitionNum,
SourceRecord record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum,
keySchema, key, envelope.schema(), envelope.delete(value, origin, ts));
consumer.accept(record);
++count;
// And send a tombstone ...
if (emitTombstoneOnDelete) {
record = new SourceRecord(partition, offset, topicName, partitionNum,
record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum,
keySchema, key, null, null);
consumer.accept(record);
++count;

View File

@ -74,7 +74,7 @@ public class SnapshotReader extends AbstractReader {
* @param context the task context in which this reader is running; may not be null
*/
public SnapshotReader(String name, MySqlTaskContext context) {
super(name, context);
super(name, context, null);
this.includeData = context.snapshotMode().includeData();
this.snapshotLockingMode = context.getConnectorConfig().getSnapshotLockingMode();
recorder = this::recordRowAsRead;
@ -321,7 +321,9 @@ protected void execute() {
if (!isRunning()) return;
logger.info("Step {}: read list of available tables in each database", step++);
List<TableId> tableIds = new ArrayList<>();
final Map<String, List<TableId>> tableIdsByDbName = new HashMap<>();
//List<TableId> allTableIds = new ArrayList<>();
final Filters createTableFilters = getCreateTableFilters(filters);
final Map<String, List<TableId>> createTablesMap = new HashMap<>();
final Set<String> readableDatabaseNames = new HashSet<>();
for (String dbName : databaseNames) {
try {
@ -331,9 +333,11 @@ protected void execute() {
mysql.query(sql.get(), rs -> {
while (rs.next() && isRunning()) {
TableId id = new TableId(dbName, null, rs.getString(1));
if (createTableFilters.tableFilter().test(id)){
createTablesMap.computeIfAbsent(dbName, k -> new ArrayList<>()).add(id);
}
if (filters.tableFilter().test(id)) {
tableIds.add(id);
tableIdsByDbName.computeIfAbsent(dbName, k -> new ArrayList<>()).add(id);
logger.info("\t including '{}'", id);
} else {
logger.info("\t '{}' is filtered out, discarding", id);
@ -427,7 +431,7 @@ protected void execute() {
this::enqueueSchemaChanges));
// Now process all of our tables for each database ...
for (Map.Entry<String, List<TableId>> entry : tableIdsByDbName.entrySet()) {
for (Map.Entry<String, List<TableId>> entry : createTablesMap.entrySet()) {
if (!isRunning()) break;
String dbName = entry.getKey();
// First drop, create, and then use the named database ...
@ -601,7 +605,7 @@ protected void execute() {
// We've copied all of the tables and we've not yet been stopped, but our buffer holds onto the
// very last record. First mark the snapshot as complete and then apply the updated offset to
// the buffered record ...
source.markLastSnapshot();
source.markLastSnapshot(context.config());
long stop = clock.currentTimeInMillis();
try {
bufferedRecordQueue.close(this::replaceOffset);
@ -615,7 +619,6 @@ protected void execute() {
interrupted.set(true);
}
} else {
// source.markLastSnapshot(); Think we will not be needing this here it is used to mark last row entry?
logger.info("Step {}: encountered only schema based snapshot, skipping data snapshot", step);
}
step++;
@ -756,6 +759,23 @@ protected void readBinlogPosition(int step, SourceInfo source, JdbcConnection my
}
}
/**
* Get the filters for table creation. Depending on the configuration, this may not be the default filter set.
*
* @param filters the default filters of this {@link SnapshotReader}
* @return {@link Filters} that represent all the tables that this snapshot reader should CREATE
*/
private Filters getCreateTableFilters(Filters filters) {
MySqlConnectorConfig.SnapshotNewTables snapshotNewTables = context.snapshotNewTables();
if (snapshotNewTables == MySqlConnectorConfig.SnapshotNewTables.PARALLEL) {
// if we are snapshotting new tables in parallel, we need to make sure all the tables in the configuration
// are created.
return new Filters.Builder(context.config()).build();
} else {
return filters;
}
}
protected String quote(String dbOrTableName) {
return "`" + dbOrTableName + "`";
}

View File

@ -9,6 +9,7 @@
import java.util.Map;
import java.util.function.Predicate;
import io.debezium.config.Configuration;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
@ -111,6 +112,11 @@ final class SourceInfo extends AbstractSourceInfo {
public static final String DB_NAME_KEY = "db";
public static final String TABLE_NAME_KEY = "table";
public static final String QUERY_KEY = "query";
public static final String DATABASE_WHITELIST_KEY = "database_whitelist";
public static final String DATABASE_BLACKLIST_KEY = "database_blacklist";
public static final String TABLE_WHITELIST_KEY = "table_whitelist";
public static final String TABLE_BLACKLIST_KEY = "table_blacklist";
public static final String RESTART_PREFIX = "RESTART_";
/**
* A {@link Schema} definition for a {@link Struct} used to store the {@link #partition()} and {@link #offset()} information.
@ -151,6 +157,10 @@ final class SourceInfo extends AbstractSourceInfo {
private boolean lastSnapshot = true;
private boolean nextSnapshot = false;
private String currentQuery = null;
private String databaseWhitelist;
private String databaseBlacklist;
private String tableWhitelist;
private String tableBlacklist;
public SourceInfo() {
super(Module.version());
@ -286,6 +296,12 @@ public void setEventPosition(long positionOfCurrentEvent, long eventSizeInBytes)
if (isSnapshotInEffect()) {
map.put(SNAPSHOT_KEY, true);
}
if(hasFilterInfo()) {
map.put(DATABASE_WHITELIST_KEY, databaseWhitelist);
map.put(DATABASE_BLACKLIST_KEY, databaseBlacklist);
map.put(TABLE_WHITELIST_KEY, tableWhitelist);
map.put(TABLE_BLACKLIST_KEY, tableBlacklist);
}
return map;
}
@ -343,6 +359,7 @@ public Struct struct(TableId tableId) {
result.put(BINLOG_ROW_IN_EVENT_OFFSET_KEY, currentRowNumber);
result.put(TIMESTAMP_KEY, binlogTimestampSeconds);
if (lastSnapshot) {
// if the snapshot is COMPLETED, then this will not happen.
result.put(SNAPSHOT_KEY, true);
}
if (threadId >= 0) {
@ -478,9 +495,10 @@ public void startSnapshot() {
/**
* Denote that a snapshot will be complete after one last record.
*/
public void markLastSnapshot() {
public void markLastSnapshot(Configuration config) {
this.lastSnapshot = true;
this.nextSnapshot = false;
maybeSetFilterDataFromConfig(config);
}
/**
@ -491,6 +509,58 @@ public void completeSnapshot() {
this.nextSnapshot = false;
}
/**
* Set the filter data for the offset from the given {@link Configuration}
* @param config the configuration
*/
public void setFilterDataFromConfig(Configuration config) {
this.databaseWhitelist = config.getString(MySqlConnectorConfig.DATABASE_WHITELIST);
this.databaseBlacklist = config.getString(MySqlConnectorConfig.DATABASE_BLACKLIST);
this.tableWhitelist = config.getString(MySqlConnectorConfig.TABLE_WHITELIST);
this.tableBlacklist = config.getString(MySqlConnectorConfig.TABLE_BLACKLIST);
}
/**
* Set filter data from config if and only if parallel snapshotting of new tables is turned on
* @param config the configuration.
*/
public void maybeSetFilterDataFromConfig(Configuration config) {
if (config.getString(MySqlConnectorConfig.SNAPSHOT_NEW_TABLES).equals(
MySqlConnectorConfig.SnapshotNewTables.PARALLEL.getValue())) {
setFilterDataFromConfig(config);
}
}
/**
* @return true if this offset has filter info, false otherwise.
*/
public boolean hasFilterInfo() {
/*
* There are 2 possible cases for us not having filter info.
* 1. The connector does not use a filter. Creating a filter in such a connector could never add any tables.
* 2. The initial snapshot occurred in a version of Debezium that did not store the filter information in the
* offsets / the connector was not configured to store filter information.
*/
return databaseWhitelist != null || databaseBlacklist != null ||
tableWhitelist != null || tableBlacklist != null;
}
public String getDatabaseWhitelist() {
return databaseWhitelist;
}
public String getDatabaseBlacklist() {
return databaseBlacklist;
}
public String getTableWhitelist() {
return tableWhitelist;
}
public String getTableBlacklist() {
return tableBlacklist;
}
/**
* Set the source offset, as read from Kafka Connect. This method does nothing if the supplied map is null.
*
@ -511,9 +581,21 @@ public void setOffset(Map<String, ?> sourceOffset) {
this.restartRowsToSkip = (int) longOffsetValue(sourceOffset, BINLOG_ROW_IN_EVENT_OFFSET_KEY);
nextSnapshot = booleanOffsetValue(sourceOffset, SNAPSHOT_KEY);
lastSnapshot = nextSnapshot;
this.databaseWhitelist = (String) sourceOffset.get(DATABASE_WHITELIST_KEY);
this.databaseBlacklist = (String) sourceOffset.get(DATABASE_BLACKLIST_KEY);
this.tableWhitelist = (String) sourceOffset.get(TABLE_WHITELIST_KEY);
this.tableBlacklist = (String) sourceOffset.get(TABLE_BLACKLIST_KEY);
}
}
public static boolean offsetsHaveFilterInfo(Map<String, ?> sourceOffset) {
return sourceOffset != null &&
sourceOffset.containsKey(DATABASE_BLACKLIST_KEY) ||
sourceOffset.containsKey(DATABASE_WHITELIST_KEY) ||
sourceOffset.containsKey(TABLE_BLACKLIST_KEY) ||
sourceOffset.containsKey(TABLE_WHITELIST_KEY);
}
private long longOffsetValue(Map<String, ?> values, String key) {
Object obj = values.get(key);
if (obj == null) return 0L;
@ -616,6 +698,24 @@ public String toString() {
return sb.toString();
}
/**
* Create a {@link Document} from the given offset.
*
* @param offset the offset to create the document from.
* @return a {@link Document} with the offset data.
*/
public static Document createDocumentFromOffset(Map<String, ?> offset) {
Document offsetDocument = Document.create();
// all of the offset keys represent int, long, or string types, so we don't need to worry about references
// and information changing underneath us.
for (Map.Entry<String, ?> entry : offset.entrySet()) {
offsetDocument.set(entry.getKey(), entry.getValue());
}
return offsetDocument;
}
/**
* Determine whether the first {@link #offset() offset} is at or before the point in time of the second
* offset, where the offsets are given in JSON representation of the maps returned by {@link #offset()}.

View File

@ -116,7 +116,8 @@ protected Configuration.Builder simpleConfig() {
@Test
public void shouldCreateSnapshotOfSingleDatabase() throws Exception {
config = simpleConfig().build();
context = new MySqlTaskContext(config);
Filters filters = new Filters.Builder(config).build();
context = new MySqlTaskContext(config, filters);
context.start();
context.source().setBinlogStartPoint("",0L); // start from beginning
context.initializeHistory();
@ -177,7 +178,8 @@ public void shouldCreateSnapshotOfSingleDatabase() throws Exception {
@Test
public void shouldCreateSnapshotOfSingleDatabaseWithSchemaChanges() throws Exception {
config = simpleConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).build();
context = new MySqlTaskContext(config);
Filters filters = new Filters.Builder(config).build();
context = new MySqlTaskContext(config, filters);
context.start();
context.source().setBinlogStartPoint("",0L); // start from beginning
context.initializeHistory();
@ -248,7 +250,8 @@ public void shouldHandleTimestampTimezones() throws Exception {
.with(MySqlConnectorConfig.DATABASE_WHITELIST, REGRESSION_DATABASE.getDatabaseName())
.with(MySqlConnectorConfig.TABLE_WHITELIST, REGRESSION_DATABASE.qualifiedTableName(tableName))
.build();
context = new MySqlTaskContext(config);
Filters filters = new Filters.Builder(config).build();
context = new MySqlTaskContext(config, filters);
context.start();
context.source().setBinlogStartPoint("",0L); // start from beginning
context.initializeHistory();
@ -291,11 +294,12 @@ public void shouldHandleMySQLTimeCorrectly() throws Exception {
.with(MySqlConnectorConfig.DATABASE_WHITELIST, REGRESSION_DATABASE.getDatabaseName())
.with(MySqlConnectorConfig.TABLE_WHITELIST, REGRESSION_DATABASE.qualifiedTableName(tableName))
.build();
context = new MySqlTaskContext(config);
Filters filters = new Filters.Builder(config).build();
context = new MySqlTaskContext(config, filters);
context.start();
context.source().setBinlogStartPoint("",0L); // start from beginning
context.initializeHistory();
reader = new BinlogReader("binlog", context);
reader = new BinlogReader("binlog", context, null);
// Start reading the binlog ...
reader.start();
@ -391,11 +395,12 @@ private void inconsistentSchema(EventProcessingFailureHandlingMode mode) throws
.build();
}
context = new MySqlTaskContext(config);
Filters filters = new Filters.Builder(config).build();
context = new MySqlTaskContext(config, filters);
context.start();
context.source().setBinlogStartPoint("",0L); // start from beginning
context.initializeHistory();
reader = new BinlogReader("binlog", context);
reader = new BinlogReader("binlog", context, null);
// Start reading the binlog ...
reader.start();

View File

@ -16,78 +16,85 @@
*
* @author Randall Hauch
*/
public class Configurator {
/*package local*/ class Configurator {
private final Configuration.Builder configBuilder = Configuration.create();
public Configurator with(Field field, String value) {
/*package local*/ Configurator with(Field field, String value) {
configBuilder.with(field, value);
return this;
}
public Configurator with(Field field, boolean value) {
/*package local*/ Configurator with(Field field, boolean value) {
configBuilder.with(field, value);
return this;
}
public Configurator serverName(String serverName) {
/*package local*/ Configurator serverName(String serverName) {
return with(MySqlConnectorConfig.SERVER_NAME, serverName);
}
public Configurator includeDatabases(String regexList) {
/*package local*/ Configurator includeDatabases(String regexList) {
return with(MySqlConnectorConfig.DATABASE_WHITELIST, regexList);
}
public Configurator excludeDatabases(String regexList) {
/*package local*/ Configurator excludeDatabases(String regexList) {
return with(MySqlConnectorConfig.DATABASE_BLACKLIST, regexList);
}
public Configurator includeTables(String regexList) {
/*package local*/ Configurator includeTables(String regexList) {
return with(MySqlConnectorConfig.TABLE_WHITELIST, regexList);
}
public Configurator excludeTables(String regexList) {
/*package local*/ Configurator excludeTables(String regexList) {
return with(MySqlConnectorConfig.TABLE_BLACKLIST, regexList);
}
public Configurator excludeColumns(String regexList) {
/*package local*/ Configurator excludeColumns(String regexList) {
return with(MySqlConnectorConfig.COLUMN_BLACKLIST, regexList);
}
public Configurator truncateColumns(int length, String fullyQualifiedTableNames) {
/*package local*/ Configurator truncateColumns(int length, String fullyQualifiedTableNames) {
return with(MySqlConnectorConfig.TRUNCATE_COLUMN(length), fullyQualifiedTableNames);
}
public Configurator maskColumns(int length, String fullyQualifiedTableNames) {
/*package local*/ Configurator maskColumns(int length, String fullyQualifiedTableNames) {
return with(MySqlConnectorConfig.MASK_COLUMN(length), fullyQualifiedTableNames);
}
public Configurator excludeBuiltInTables() {
/*package local*/ Configurator excludeBuiltInTables() {
return with(MySqlConnectorConfig.TABLES_IGNORE_BUILTIN, true);
}
public Configurator includeBuiltInTables() {
/*package local*/ Configurator includeBuiltInTables() {
return with(MySqlConnectorConfig.TABLES_IGNORE_BUILTIN, false);
}
public Configurator storeDatabaseHistoryInFile(Path path) {
/*package local*/ Configurator storeDatabaseHistoryInFile(Path path) {
with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class.getName());
with(FileDatabaseHistory.FILE_PATH,path.toAbsolutePath().toString());
return this;
}
public Filters createFilters() {
return new Filters(configBuilder.build());
/*package local*/ Filters createFilters() {
return new Filters.Builder(configBuilder.build()).build();
}
/**
* For tests use only
*/
public MySqlSchema createSchemas() {
/*package local*/ MySqlSchema createSchemas() {
return createSchemasWithFilter(createFilters());
}
/*package local*/ MySqlSchema createSchemasWithFilter(Filters filters) {
Configuration config = configBuilder.build();
MySqlConnectorConfig connectorConfig = new MySqlConnectorConfig(config);
return new MySqlSchema(connectorConfig, null, false,
MySqlTopicSelector.defaultSelector(connectorConfig.getLogicalName(), "__debezium-heartbeat"));
return new MySqlSchema(connectorConfig,
null,
false,
MySqlTopicSelector.defaultSelector(connectorConfig.getLogicalName(), "__debezium-heartbeat"),
filters);
}
}

View File

@ -128,6 +128,7 @@ public void shouldFailToValidateInvalidConfiguration() {
assertNoConfigurationErrors(result, MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SNAPSHOT_MODE);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SNAPSHOT_LOCKING_MODE);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SNAPSHOT_NEW_TABLES);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SSL_MODE);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SSL_KEYSTORE);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SSL_KEYSTORE_PASSWORD);
@ -182,6 +183,7 @@ public void shouldValidateValidConfigurationWithSSL() {
assertNoConfigurationErrors(result, MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SNAPSHOT_MODE);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SNAPSHOT_LOCKING_MODE);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SNAPSHOT_NEW_TABLES);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SSL_MODE);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SSL_KEYSTORE);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SSL_KEYSTORE_PASSWORD);
@ -232,6 +234,7 @@ public void shouldValidateAcceptableConfiguration() {
assertNoConfigurationErrors(result, MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SNAPSHOT_MODE);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SNAPSHOT_LOCKING_MODE);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SNAPSHOT_NEW_TABLES);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SSL_MODE);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SSL_KEYSTORE);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SSL_KEYSTORE_PASSWORD);

View File

@ -21,7 +21,7 @@ public class MySqlTaskContextIT extends MySqlTaskContextTest {
@Test
public void shouldCreateTaskFromConfiguration() throws Exception {
config = simpleConfig().build();
context = new MySqlTaskContext(config);
context = new MySqlTaskContext(config, new Filters.Builder(config).build());
context.start();
assertThat(context.config()).isSameAs(config);
@ -58,7 +58,7 @@ public void shouldCreateTaskFromConfiguration() throws Exception {
@Test
public void shouldCloseJdbcConnectionOnShutdown() throws Exception {
config = simpleConfig().build();
context = new MySqlTaskContext(config);
context = new MySqlTaskContext(config, new Filters.Builder(config).build());
context.start();
// JDBC connection is automatically created by MySqlTaskContext when it reads database variables

View File

@ -94,7 +94,7 @@ protected Configuration.Builder simpleConfig() {
public void shouldCreateTaskFromConfigurationWithNeverSnapshotMode() throws Exception {
config = simpleConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER)
.build();
context = new MySqlTaskContext(config, false);
context = new MySqlTaskContext(config, new Filters.Builder(config).build(), false, null);
context.start();
assertThat("" + context.snapshotMode().getValue()).isEqualTo(SnapshotMode.NEVER.getValue());
@ -106,7 +106,7 @@ public void shouldCreateTaskFromConfigurationWithNeverSnapshotMode() throws Exce
public void shouldCreateTaskFromConfigurationWithWhenNeededSnapshotMode() throws Exception {
config = simpleConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.WHEN_NEEDED)
.build();
context = new MySqlTaskContext(config, false);
context = new MySqlTaskContext(config, new Filters.Builder(config).build(), false, null);
context.start();
assertThat("" + context.snapshotMode().getValue()).isEqualTo(SnapshotMode.WHEN_NEEDED.getValue());
@ -118,7 +118,7 @@ public void shouldCreateTaskFromConfigurationWithWhenNeededSnapshotMode() throws
public void shouldUseGtidSetIncludes() throws Exception {
config = simpleConfig().with(MySqlConnectorConfig.GTID_SOURCE_INCLUDES, "a,b,c,d.*")
.build();
context = new MySqlTaskContext(config, false);
context = new MySqlTaskContext(config, new Filters.Builder(config).build(), false, null);
context.start();
Predicate<String> filter = context.gtidSourceFilter();
@ -145,7 +145,7 @@ public void shouldUseGtidSetIncludesLiteralUuids() throws Exception {
config = simpleConfig().with(MySqlConnectorConfig.GTID_SOURCE_INCLUDES,
"036d85a9-64e5-11e6-9b48-42010af0000c,7145bf69-d1ca-11e5-a588-0242ac110004")
.build();
context = new MySqlTaskContext(config, false);
context = new MySqlTaskContext(config, new Filters.Builder(config).build(), false, null);
context.start();
Predicate<String> filter = context.gtidSourceFilter();
@ -175,7 +175,7 @@ public void shouldUseGtidSetxcludesLiteralUuids() throws Exception {
config = simpleConfig().with(MySqlConnectorConfig.GTID_SOURCE_EXCLUDES,
"7c1de3f2-3fd2-11e6-9cdc-42010af000bc")
.build();
context = new MySqlTaskContext(config, false);
context = new MySqlTaskContext(config, new Filters.Builder(config).build(), false, null);
context.start();
Predicate<String> filter = context.gtidSourceFilter();
@ -204,7 +204,7 @@ public void shouldNotAllowBothGtidSetIncludesAndExcludes() throws Exception {
.with(MySqlConnectorConfig.GTID_SOURCE_EXCLUDES,
"7c1de3f2-3fd2-11e6-9cdc-42010af000bc:1-41")
.build();
context = new MySqlTaskContext(config, false);
context = new MySqlTaskContext(config, new Filters.Builder(config).build(), false, null);
boolean valid = config.validateAndRecord(MySqlConnectorConfig.ALL_FIELDS, msg -> {});
assertThat(valid).isFalse();
}
@ -221,7 +221,7 @@ public void shouldFilterAndMergeGtidSet() throws Exception {
config = simpleConfig().with(MySqlConnectorConfig.GTID_SOURCE_INCLUDES,
"036d85a9-64e5-11e6-9b48-42010af0000c")
.build();
context = new MySqlTaskContext(config, false);
context = new MySqlTaskContext(config, new Filters.Builder(config).build(), false, null);
context.start();
context.source().setCompletedGtidSet(gtidStr);
@ -255,7 +255,7 @@ public void shouldMergeToFirstAvailableGtidSetPositions() throws Exception {
.with(MySqlConnectorConfig.GTID_NEW_CHANNEL_POSITION, GtidNewChannelPosition.EARLIEST)
.build();
context = new MySqlTaskContext(config, false);
context = new MySqlTaskContext(config, new Filters.Builder(config).build(), false, null);
context.start();
context.source().setCompletedGtidSet(gtidStr);
@ -283,7 +283,7 @@ public void shouldComparePositionsWithDifferentFields() {
+ "d079cbb3-750f-11e6-954e-42010af00c28:1-11544291:11544293-11885648";
config = simpleConfig().with(MySqlConnectorConfig.GTID_SOURCE_EXCLUDES, "96c2072e-e428-11e6-9590-42010a28002d")
.build();
context = new MySqlTaskContext(config, false);
context = new MySqlTaskContext(config, new Filters.Builder(config).build(), false, null);
context.start();
context.source().setCompletedGtidSet(lastGtidStr);
HistoryRecordComparator comparator = context.dbSchema().historyComparator();
@ -310,7 +310,7 @@ public void shouldComparePositionsWithDifferentFields() {
public void shouldIgnoreDatabaseHistoryProperties() throws Exception {
config = simpleConfig().with(KafkaDatabaseHistory.TOPIC, "dummytopic")
.build();
context = new MySqlTaskContext(config, false);
context = new MySqlTaskContext(config, new Filters.Builder(config).build(), false, null);
context.start();
context.getConnectionContext().jdbc().config().forEach((k, v) -> {

View File

@ -0,0 +1,240 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Assert;
import org.junit.Test;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import static io.debezium.connector.mysql.ParallelSnapshotReader.ParallelHaltingPredicate;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* @author Moira Tagle
*/
public class ParallelSnapshotReaderTest {
@Test
public void startStartsBothReaders() {
BinlogReader mockOldBinlogReader = mock(BinlogReader.class);
SnapshotReader mockNewSnapshotReader = mock(SnapshotReader.class);
BinlogReader mockNewBinlogReader = mock(BinlogReader.class);
ParallelSnapshotReader parallelSnapshotReader = new ParallelSnapshotReader(mockOldBinlogReader, mockNewSnapshotReader, mockNewBinlogReader);
parallelSnapshotReader.start();
Assert.assertSame(parallelSnapshotReader.state(), Reader.State.RUNNING);
verify(mockOldBinlogReader).start();
verify(mockNewSnapshotReader).start();
// chained reader will only start the snapshot reader
}
@Test
public void pollCombinesBothReadersPolls() throws InterruptedException {
BinlogReader mockOldBinlogReader = mock(BinlogReader.class);
SnapshotReader mockNewSnapshotReader = mock(SnapshotReader.class);
BinlogReader mockNewBinlogReader = mock(BinlogReader.class);
ParallelSnapshotReader parallelSnapshotReader = new ParallelSnapshotReader(mockOldBinlogReader, mockNewSnapshotReader, mockNewBinlogReader);
SourceRecord oldBinlogSourceRecord = mock(SourceRecord.class);
List<SourceRecord> oldBinlogRecords = new ArrayList<>();
oldBinlogRecords.add(oldBinlogSourceRecord);
SourceRecord newSnapshotSourceRecord = mock(SourceRecord.class);
List<SourceRecord> newSnapshotRecords = new ArrayList<>();
newSnapshotRecords.add(newSnapshotSourceRecord);
when(mockOldBinlogReader.isRunning()).thenReturn(true);
when(mockOldBinlogReader.poll()).thenReturn(oldBinlogRecords);
when(mockNewSnapshotReader.poll()).thenReturn(newSnapshotRecords);
// this needs to happen so that the chained reader can be polled.
parallelSnapshotReader.start();
List<SourceRecord> parallelRecords = parallelSnapshotReader.poll();
Assert.assertEquals(2, parallelRecords.size());
Assert.assertTrue(parallelRecords.contains(oldBinlogSourceRecord));
Assert.assertTrue(parallelRecords.contains(newSnapshotSourceRecord));
}
@Test
public void pollReturnsNewIfOldReaderIsStopped() throws InterruptedException {
BinlogReader mockOldBinlogReader = mock(BinlogReader.class);
SnapshotReader mockNewSnapshotReader = mock(SnapshotReader.class);
BinlogReader mockNewBinlogReader = mock(BinlogReader.class);
ParallelSnapshotReader parallelSnapshotReader = new ParallelSnapshotReader(mockOldBinlogReader, mockNewSnapshotReader, mockNewBinlogReader);
SourceRecord newSnapshotSourceRecord = mock(SourceRecord.class);
List<SourceRecord> newSnapshotRecords = new ArrayList<>();
newSnapshotRecords.add(newSnapshotSourceRecord);
// if the old reader is polled when it's stopped it will throw an exception.
when(mockOldBinlogReader.isRunning()).thenReturn(false);
when(mockOldBinlogReader.poll()).thenThrow(new InterruptedException());
when(mockNewSnapshotReader.poll()).thenReturn(newSnapshotRecords);
// this needs to happen so that the chained reader runs correctly.
parallelSnapshotReader.start();
List<SourceRecord> parallelRecords = parallelSnapshotReader.poll();
Assert.assertEquals(1, parallelRecords.size());
Assert.assertTrue(parallelRecords.contains(newSnapshotSourceRecord));
}
// this test and the next don't appear to be halting. Something with the chained reader maybe.
@Test
public void pollReturnsOldIfNewReaderIsStopped() throws InterruptedException {
BinlogReader mockOldBinlogReader = mock(BinlogReader.class);
SnapshotReader mockNewSnapshotReader = mock(SnapshotReader.class);
BinlogReader mockNewBinlogReader = mock(BinlogReader.class);
ParallelSnapshotReader parallelSnapshotReader = new ParallelSnapshotReader(mockOldBinlogReader, mockNewSnapshotReader, mockNewBinlogReader);
SourceRecord oldBinlogSourceRecord = mock(SourceRecord.class);
List<SourceRecord> oldBinlogRecords = new ArrayList<>();
oldBinlogRecords.add(oldBinlogSourceRecord);
when(mockOldBinlogReader.isRunning()).thenReturn(true);
when(mockOldBinlogReader.poll()).thenReturn(oldBinlogRecords);
// cheap way to have the new reader be stopped is to just not start it; so don't start the parallel reader
List<SourceRecord> parallelRecords = parallelSnapshotReader.poll();
Assert.assertEquals(1, parallelRecords.size());
Assert.assertTrue(parallelRecords.contains(oldBinlogSourceRecord));
}
@Test
public void pollReturnsNullIfBothReadersAreStopped() throws InterruptedException {
BinlogReader mockOldBinlogReader = mock(BinlogReader.class);
SnapshotReader mockNewSnapshotReader = mock(SnapshotReader.class);
BinlogReader mockNewBinlogReader = mock(BinlogReader.class);
ParallelSnapshotReader parallelSnapshotReader = new ParallelSnapshotReader(mockOldBinlogReader, mockNewSnapshotReader, mockNewBinlogReader);
when(mockOldBinlogReader.isRunning()).thenReturn(false);
when(mockOldBinlogReader.poll()).thenThrow(new InterruptedException());
when(mockNewBinlogReader.poll()).thenReturn(null);
// cheap way to have the new reader be stopped is to just not start it; so don't start the parallel reader
List<SourceRecord> parallelRecords = parallelSnapshotReader.poll();
Assert.assertEquals(null, parallelRecords);
}
@Test
public void testStopStopsBothReaders() {
BinlogReader mockOldBinlogReader = mock(BinlogReader.class);
SnapshotReader mockNewSnapshotReader = mock(SnapshotReader.class);
BinlogReader mockNewBinlogReader = mock(BinlogReader.class);
ParallelSnapshotReader parallelSnapshotReader = new ParallelSnapshotReader(mockOldBinlogReader, mockNewSnapshotReader, mockNewBinlogReader);
parallelSnapshotReader.start();
parallelSnapshotReader.stop();
Assert.assertTrue(parallelSnapshotReader.state() == Reader.State.STOPPED);
verify(mockOldBinlogReader).stop();
verify(mockNewSnapshotReader).stop();
}
@Test
public void testHaltingPredicateHonorsTimeRange() {
// verify that halting predicate does nothing and changes no state if the
// document's timestamp is outside of the time range.
AtomicBoolean thisReaderNearEnd = new AtomicBoolean(false);
AtomicBoolean otherReaderNearEnd = new AtomicBoolean(false);
long durationSec = 5 * 60; // five minutes
Duration duration = Duration.ofSeconds(durationSec);
ParallelHaltingPredicate parallelHaltingPredicate = new ParallelHaltingPredicate(thisReaderNearEnd, otherReaderNearEnd, duration);
boolean testResult = parallelHaltingPredicate.test(createSourceRecordWithTimestamp(System.currentTimeMillis()/1000 - (durationSec * 2)));
Assert.assertTrue(testResult);
Assert.assertFalse(thisReaderNearEnd.get());
Assert.assertFalse(otherReaderNearEnd.get());
}
@Test
public void testHaltingPredicateFlipsthisReaderNearEnd() {
// verify that the halting predicate flips the `this reader` boolean if the
// document's timestamp is within the time range, but still returns false.
AtomicBoolean thisReaderNearEnd = new AtomicBoolean(false);
AtomicBoolean otherReaderNearEnd = new AtomicBoolean(false);
Duration duration = Duration.ofSeconds(5 * 60); // five minutes
ParallelHaltingPredicate parallelHaltingPredicate = new ParallelHaltingPredicate(thisReaderNearEnd, otherReaderNearEnd, duration);
boolean testResult = parallelHaltingPredicate.test(createSourceRecordWithTimestamp(System.currentTimeMillis()/1000));
Assert.assertTrue(testResult);
Assert.assertTrue(thisReaderNearEnd.get());
Assert.assertFalse(otherReaderNearEnd.get());
}
@Test
public void testHaltingPredicateHalts() {
// verify that the halting predicate returns false if both the 'this' and
// 'other' reader are near the end of the binlog.
AtomicBoolean thisReaderNearEnd = new AtomicBoolean(false);
AtomicBoolean otherReaderNearEnd = new AtomicBoolean(true);
Duration duration = Duration.ofSeconds(5 * 60); // five minutes
ParallelHaltingPredicate parallelHaltingPredicate =
new ParallelHaltingPredicate(thisReaderNearEnd, otherReaderNearEnd, duration);
boolean testResult =
parallelHaltingPredicate.test(createSourceRecordWithTimestamp(System.currentTimeMillis()/1000));
Assert.assertFalse(testResult);
Assert.assertTrue(thisReaderNearEnd.get());
Assert.assertTrue(otherReaderNearEnd.get());
}
/**
* Create an "offset" containing a single timestamp element with the given value.
* Needed because {@link ParallelSnapshotReader.ParallelHaltingPredicate} halts based on how
* close the record's timestamp is to the present time.
* @param tsSec the timestamp (in seconds) in the resulting offset.
* @return an "offset" containing the given timestamp.
*/
private SourceRecord createSourceRecordWithTimestamp(long tsSec) {
Map<String, ?> offset = Collections.singletonMap(SourceInfo.TIMESTAMP_KEY, tsSec);
return new SourceRecord(null, offset, null, null, null);
}
}

View File

@ -0,0 +1,72 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author Moira Tagle
*/
public class ReconcilingBinlogReaderTest {
@Test
public void haltAfterPredicateTrue() {
List<Map<String, ?>> offsets = createOrderedOffsets(2);
ReconcilingBinlogReader.OffsetLimitPredicate offsetLimitPredicate =
new ReconcilingBinlogReader.OffsetLimitPredicate(offsets.get(1), (x) -> true);
SourceRecord testSourceRecord = createSourceRecordWithOffset(offsets.get(0));
// tested record (0) is before limit (1), so we should return true.
Assert.assertTrue(offsetLimitPredicate.test(testSourceRecord));
}
@Test
public void haltAfterPredicateFalse() {
List<Map<String, ?>> offsets = createOrderedOffsets(2);
ReconcilingBinlogReader.OffsetLimitPredicate offsetLimitPredicate =
new ReconcilingBinlogReader.OffsetLimitPredicate(offsets.get(0), (x) -> true);
SourceRecord testSourceRecord = createSourceRecordWithOffset(offsets.get(1));
// tested record (1) is beyond limit (0), so we should return false.
Assert.assertFalse(offsetLimitPredicate.test(testSourceRecord));
}
private final int SERVER_ID = 0;
private final String BINLOG_FILENAME = "bin.log1";
private final int STARTING_BINLOG_POSTION = 20;
/**
* Create an ordered list of offsets from earliest to latest.
* @param size the number of offsets to create.
* @return
*/
private List<Map<String, ?>> createOrderedOffsets(int size) {
List<Map<String, ?>> orderedDocuments = new ArrayList<>(size);
// using non-gtids because SourceInfo.isPositionAtOrBefore
// doesn't seem to function as expected when comparing gtids
for (int i = 0 ; i < size; i++) {
Map<String, Object> offset = new HashMap<>(3);
offset.put(SourceInfo.SERVER_ID_KEY, SERVER_ID);
offset.put(SourceInfo.BINLOG_FILENAME_OFFSET_KEY, BINLOG_FILENAME);
offset.put(SourceInfo.BINLOG_POSITION_OFFSET_KEY, STARTING_BINLOG_POSTION + i);
orderedDocuments.add(offset);
}
return orderedDocuments;
}
private SourceRecord createSourceRecordWithOffset(Map<String, ?> offset) {
return new SourceRecord(null, offset, null, null, null);
}
}

View File

@ -89,7 +89,7 @@ protected Configuration.Builder simpleConfig() {
public void shouldCreateSnapshotOfSingleDatabase() throws Exception {
config = simpleConfig()
.build();
context = new MySqlTaskContext(config);
context = new MySqlTaskContext(config, new Filters.Builder(config).build());
context.start();
reader = new SnapshotReader("snapshot", context);
reader.uponCompletion(completed::countDown);
@ -186,7 +186,7 @@ public void shouldCreateSnapshotOfSingleDatabase() throws Exception {
@Test
public void shouldCreateSnapshotOfSingleDatabaseUsingReadEvents() throws Exception {
config = simpleConfig().with(MySqlConnectorConfig.DATABASE_WHITELIST, "connector_(.*)_" + DATABASE.getIdentifier()).build();
context = new MySqlTaskContext(config);
context = new MySqlTaskContext(config, new Filters.Builder(config).build());
context.start();
reader = new SnapshotReader("snapshot", context);
reader.uponCompletion(completed::countDown);
@ -290,7 +290,7 @@ private String productsTableName() {
@Test
public void shouldCreateSnapshotOfSingleDatabaseWithSchemaChanges() throws Exception {
config = simpleConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).build();
context = new MySqlTaskContext(config);
context = new MySqlTaskContext(config, new Filters.Builder(config).build());
context.start();
reader = new SnapshotReader("snapshot", context);
reader.uponCompletion(completed::countDown);
@ -389,7 +389,7 @@ public void shouldCreateSnapshotOfSingleDatabaseWithSchemaChanges() throws Excep
@Test(expected = ConnectException.class)
public void shouldCreateSnapshotSchemaOnlyRecovery_exception() throws Exception {
config = simpleConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY_RECOVERY).build();
context = new MySqlTaskContext(config);
context = new MySqlTaskContext(config, new Filters.Builder(config).build());
context.start();
reader = new SnapshotReader("snapshot", context);
reader.uponCompletion(completed::countDown);
@ -418,7 +418,7 @@ public void shouldCreateSnapshotSchemaOnlyRecovery_exception() throws Exception
@Test
public void shouldCreateSnapshotSchemaOnlyRecovery() throws Exception {
config = simpleConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY_RECOVERY).build();
context = new MySqlTaskContext(config);
context = new MySqlTaskContext(config, new Filters.Builder(config).build());
context.start();
context.source().setBinlogStartPoint("binlog1", 555); // manually set for happy path testing
reader = new SnapshotReader("snapshot", context);
@ -464,7 +464,7 @@ public void shouldSnapshotTablesInOrderSpecifiedInTablesWhitelist() throws Excep
config = simpleConfig()
.with(MySqlConnectorConfig.TABLE_WHITELIST, "connector_test_ro_(.*).orders,connector_test_ro_(.*).Products,connector_test_ro_(.*).products_on_hand,connector_test_ro_(.*).dbz_342_timetest")
.build();
context = new MySqlTaskContext(config);
context = new MySqlTaskContext(config, new Filters.Builder(config).build());
context.start();
reader = new SnapshotReader("snapshot", context);
reader.uponCompletion(completed::countDown);
@ -489,7 +489,7 @@ public void shouldSnapshotTablesInOrderSpecifiedInTablesWhitelist() throws Excep
public void shouldSnapshotTablesInLexicographicalOrder() throws Exception{
config = simpleConfig()
.build();
context = new MySqlTaskContext(config);
context = new MySqlTaskContext(config, new Filters.Builder(config).build());
context.start();
reader = new SnapshotReader("snapshot", context);
reader.uponCompletion(completed::countDown);
@ -527,7 +527,7 @@ public void shouldCreateSnapshotSchemaOnly() throws Exception {
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(Heartbeat.HEARTBEAT_INTERVAL, 300_000)
.build();
context = new MySqlTaskContext(config);
context = new MySqlTaskContext(config, new Filters.Builder(config).build());
context.start();
reader = new SnapshotReader("snapshot", context);
reader.uponCompletion(completed::countDown);

View File

@ -5,6 +5,7 @@
*/
package io.debezium.connector.mysql;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.Collections;
@ -13,6 +14,7 @@
import java.util.Set;
import java.util.function.Predicate;
import io.debezium.config.Configuration;
import org.apache.avro.Schema;
import org.apache.kafka.connect.data.Struct;
import org.fest.assertions.GenericAssert;
@ -149,6 +151,43 @@ public void shouldRecoverSourceInfoFromOffsetWithNonZeroBinlogCoordinatesAndNonZ
assertThat(source.isSnapshotInEffect()).isTrue();
}
@Test
public void shouldRecoverSourceInfoFromOffsetWithFilterData() {
final String databaseWhitelist = "a,b";
final String tableWhitelist = "c.foo,d.bar,d.baz";
Map<String, String> offset = offset(10, 10);
offset.put(SourceInfo.DATABASE_WHITELIST_KEY, databaseWhitelist);
offset.put(SourceInfo.TABLE_WHITELIST_KEY, tableWhitelist);
sourceWith(offset);
assertThat(source.hasFilterInfo()).isTrue();
assertEquals(databaseWhitelist, source.getDatabaseWhitelist());
assertEquals(tableWhitelist, source.getTableWhitelist());
// confirm other filter info is null
assertThat(source.getDatabaseBlacklist()).isNull();
assertThat(source.getTableBlacklist()).isNull();
}
@Test
public void setOffsetFilterFromFilter() {
final String databaseBlacklist = "a,b";
final String tableBlacklist = "c.foo, d.bar, d.baz";
Map<String, String> offset = offset(10, 10);
sourceWith(offset);
assertThat(!source.hasFilterInfo());
final Configuration configuration = Configuration.create()
.with(MySqlConnectorConfig.DATABASE_BLACKLIST, databaseBlacklist)
.with(MySqlConnectorConfig.TABLE_BLACKLIST, tableBlacklist)
.build();
source.setFilterDataFromConfig(configuration);
assertThat(source.hasFilterInfo()).isTrue();
assertEquals(databaseBlacklist, source.getDatabaseBlacklist());
assertEquals(tableBlacklist, source.getTableBlacklist());
}
@Test
public void shouldStartSourceInfoFromBinlogCoordinatesWithGtidsAndZeroBinlogCoordinates() {
sourceWith(offset(GTID_SET, 0, 0, false));

View File

@ -84,6 +84,7 @@
<version.junit>4.12</version.junit>
<version.fest>1.4</version.fest>
<version.jmh>1.21</version.jmh>
<version.mockito>2.13.0</version.mockito>
<!-- Maven Plugins -->
<version.resources.plugin>2.7</version.resources.plugin>
@ -319,6 +320,11 @@
<artifactId>jmh-generator-annprocess</artifactId>
<version>${version.jmh}</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${version.mockito}</version>
</dependency>
<!-- Debezium artifacts -->
<dependency>