DBZ-129 MySQL connector can now use subset of GTID set when reconnecting to MySQL

When a connector is originally connected to a MySQL server, it will record the GTID set that identifies the position in the binlog. When all of the interesting transactions originate on a different server (i.e., the server we're listening to is a replica), the server we're listening to will still include some transactions in the binlog (e.g., for the information schema, performance, or other internal databases), and so the GTID set will include a GTID range for our server. If we stop the connector and want to point it to a different MySQL server, asking MySQL to position the binlog using the complete GTID set (including the GTID range for our old replica) will cause an error, since the new server does not have any GTID ranges from the old replica. Therefore, the connector needs to be able to exclude some GTID ranges that originated on the original replica, using the `server_uuid` property of the replica server.

This change adds two configuration properties: `gtid.source.includes` and `gtid.source.excludes`. Both are optional, but at most only one of these can be used. These properties contain a comma-separated list of GTID sources (i.e., the `server_uuid` value for the server where the transaction originated) or regular expressions matching GTID sources, and upon connector startup the connector uses the list to filter the previously-recorded GTID set against the available GTID set in the current MySQL server. By including specific GTID sources, an administrator can control the subset of GTID ranges that govern the binlog position.

These properties will not be useful in some topologies, especially when the MySQL server from which the binlog is being read is the originating server for some of the transactions. However, these properties may be very useful in any topology where the connector is _only_ reading from replicas, so that the connector can be switched to another replica at any time. In some cases it may be easier to exclude all of the replicas' `server_uuid` values, while in other cases it may be easier to include all of the `server_uuid` values where transactions can originate.
This commit is contained in:
Randall Hauch 2016-10-07 12:50:28 -05:00
parent 1a99f5bbc7
commit 305c4c5ac6
8 changed files with 355 additions and 106 deletions

View File

@ -16,6 +16,7 @@
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceRecord;
@ -151,9 +152,21 @@ protected void doStart() {
// The 'source' object holds the starting point in the binlog where we should start reading, // The 'source' object holds the starting point in the binlog where we should start reading,
// set set the client to start from that point ... // set set the client to start from that point ...
client.setGtidSet(source.gtidSet()); // may be null String gtidSetStr = source.gtidSet();
client.setBinlogFilename(source.binlogFilename()); if (gtidSetStr != null) {
client.setBinlogPosition(source.nextBinlogPosition()); logger.info("GTID set from previous recorded offset: {}", gtidSetStr);
// Remove any of the GTID sources that are not required/acceptable ...
Predicate<String> gtidSourceFilter = context.gtidSourceFilter();
if ( gtidSourceFilter != null) {
GtidSet gtidSet = new GtidSet(gtidSetStr).retainAll(gtidSourceFilter);
gtidSetStr = gtidSet.toString();
logger.info("GTID set after applying GTID source includes/excludes: {}", gtidSetStr);
}
client.setGtidSet(gtidSetStr);
} else {
client.setBinlogFilename(source.binlogFilename());
client.setBinlogPosition(source.nextBinlogPosition());
}
// Set the starting row number, which is the next row number to be read ... // Set the starting row number, which is the next row number to be read ...
startingRowNumber = source.nextEventRowNumber(); startingRowNumber = source.nextEventRowNumber();
@ -329,6 +342,14 @@ protected void handleRotateLogsEvent(Event event) {
/** /**
* Handle the supplied event with a {@link GtidEventData} that signals the beginning of a GTID transaction. * Handle the supplied event with a {@link GtidEventData} that signals the beginning of a GTID transaction.
* We don't yet know whether this transaction contains any events we're interested in, but we have to record
* it so that we know the position of this event and know we've processed the binlog to this point.
* <p>
* Note that this captures the current GTID and complete GTID set, regardless of whether the connector is
* {@link MySqlTaskContext#gtidSourceFilter() filtering} the GTID set upon connection. We do this because
* we actually want to capture all GTID set values found in the binlog, whether or not we process them.
* However, only when we connect do we actually want to pass to MySQL only those GTID ranges that are applicable
* per the configuration.
* *
* @param event the GTID event to be processed; may not be null * @param event the GTID event to be processed; may not be null
*/ */

View File

@ -13,6 +13,8 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import io.debezium.annotation.Immutable; import io.debezium.annotation.Immutable;
@ -27,6 +29,10 @@ public final class GtidSet {
private final Map<String, UUIDSet> uuidSetsByServerId = new TreeMap<>(); // sorts on keys private final Map<String, UUIDSet> uuidSetsByServerId = new TreeMap<>(); // sorts on keys
protected GtidSet(Map<String, UUIDSet> uuidSetsByServerId) {
this.uuidSetsByServerId.putAll(uuidSetsByServerId);
}
/** /**
* @param gtids the string representation of the GTIDs. * @param gtids the string representation of the GTIDs.
*/ */
@ -42,6 +48,22 @@ public GtidSet(String gtids) {
}); });
} }
/**
* Obtain a copy of this {@link GtidSet} except with only the GTID ranges that have server UUIDs that match the given
* predicate.
*
* @param sourceFilter the predicate that returns whether a server UUID is to be included
* @return the new GtidSet, or this object if {@code sourceFilter} is null; never null
*/
public GtidSet retainAll(Predicate<String> sourceFilter) {
if (sourceFilter == null) return this;
Map<String, UUIDSet> newSets = this.uuidSetsByServerId.entrySet()
.stream()
.filter(entry -> sourceFilter.test(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return new GtidSet(newSets);
}
/** /**
* Get an immutable collection of the {@link UUIDSet range of GTIDs for a single server}. * Get an immutable collection of the {@link UUIDSet range of GTIDs for a single server}.
* *
@ -99,7 +121,7 @@ public String toString() {
for (UUIDSet uuidSet : uuidSetsByServerId.values()) { for (UUIDSet uuidSet : uuidSetsByServerId.values()) {
gtids.add(uuidSet.toString()); gtids.add(uuidSet.toString());
} }
return String.join(",",gtids); return String.join(",", gtids);
} }
/** /**
@ -117,13 +139,13 @@ protected UUIDSet(com.github.shyiko.mysql.binlog.GtidSet.UUIDSet uuidSet) {
intervals.add(new Interval(interval.getStart(), interval.getEnd())); intervals.add(new Interval(interval.getStart(), interval.getEnd()));
}); });
Collections.sort(this.intervals); Collections.sort(this.intervals);
if ( this.intervals.size() > 1 ) { if (this.intervals.size() > 1) {
// Collapse adjacent intervals ... // Collapse adjacent intervals ...
for ( int i=intervals.size()-1; i!=0; --i) { for (int i = intervals.size() - 1; i != 0; --i) {
Interval before = this.intervals.get(i-1); Interval before = this.intervals.get(i - 1);
Interval after = this.intervals.get(i); Interval after = this.intervals.get(i);
if ( (before.getEnd() + 1) == after.getStart() ) { if ((before.getEnd() + 1) == after.getStart()) {
this.intervals.set(i-1,new Interval(before.getStart(),after.getEnd())); this.intervals.set(i - 1, new Interval(before.getStart(), after.getEnd()));
this.intervals.remove(i); this.intervals.remove(i);
} }
} }
@ -202,8 +224,8 @@ public String toString() {
if (sb.length() != 0) sb.append(','); if (sb.length() != 0) sb.append(',');
sb.append(uuid).append(':'); sb.append(uuid).append(':');
Iterator<Interval> iter = intervals.iterator(); Iterator<Interval> iter = intervals.iterator();
if ( iter.hasNext() ) sb.append(iter.next()); if (iter.hasNext()) sb.append(iter.next());
while ( iter.hasNext() ) { while (iter.hasNext()) {
sb.append(':'); sb.append(':');
sb.append(iter.next()); sb.append(iter.next());
} }
@ -224,6 +246,7 @@ public Interval(long start, long end) {
/** /**
* Get the starting transaction number in this interval. * Get the starting transaction number in this interval.
*
* @return this interval's first transaction number * @return this interval's first transaction number
*/ */
public long getStart() { public long getStart() {
@ -232,6 +255,7 @@ public long getStart() {
/** /**
* Get the ending transaction number in this interval. * Get the ending transaction number in this interval.
*
* @return this interval's last transaction number * @return this interval's last transaction number
*/ */
public long getEnd() { public long getEnd() {
@ -254,10 +278,10 @@ public boolean isContainedWithin(Interval other) {
@Override @Override
public int compareTo(Interval that) { public int compareTo(Interval that) {
if ( that == this ) return 0; if (that == this) return 0;
long diff = this.start - that.start; long diff = this.start - that.start;
if ( diff > Integer.MAX_VALUE ) return Integer.MAX_VALUE; if (diff > Integer.MAX_VALUE) return Integer.MAX_VALUE;
if ( diff < Integer.MIN_VALUE ) return Integer.MIN_VALUE; if (diff < Integer.MIN_VALUE) return Integer.MIN_VALUE;
return (int) diff; return (int) diff;
} }

View File

@ -403,6 +403,36 @@ public static SecureConnectionMode parse(String value, String defaultValue) {
.withValidation(MySqlConnectorConfig::validateColumnBlacklist) .withValidation(MySqlConnectorConfig::validateColumnBlacklist)
.withDescription(""); .withDescription("");
/**
* A comma-separated list of regular expressions that match source UUIDs in the GTID set used to find the binlog
* position in the MySQL server. Only the GTID ranges that have sources matching one of these include patterns will
* be used.
* May not be used with {@link #GTID_SOURCE_EXCLUDES}.
*/
public static final Field GTID_SOURCE_INCLUDES = Field.create("gtid.source.includes")
.withDisplayName("Include GTID sources")
.withType(Type.LIST)
.withWidth(Width.LONG)
.withImportance(Importance.HIGH)
.withRecommender(DATABASE_LIST_RECOMMENDER)
.withDependents(TABLE_WHITELIST_NAME)
.withDescription("The source UUIDs used to include GTID ranges when determine the starting position in the MySQL server's binlog.");
/**
* A comma-separated list of regular expressions that match source UUIDs in the GTID set used to find the binlog
* position in the MySQL server. Only the GTID ranges that have sources matching none of these exclude patterns will
* be used.
* May not be used with {@link #GTID_SOURCE_INCLUDES}.
*/
public static final Field GTID_SOURCE_EXCLUDES = Field.create("gtid.source.excludes")
.withDisplayName("Exclude GTID sources")
.withType(Type.STRING)
.withWidth(Width.LONG)
.withImportance(Importance.MEDIUM)
.withValidation(MySqlConnectorConfig::validateGtidSetExcludes)
.withInvisibleRecommender()
.withDescription("The source UUIDs used to exclude GTID ranges when determine the starting position in the MySQL server's binlog.");
public static final Field CONNECTION_TIMEOUT_MS = Field.create("connect.timeout.ms") public static final Field CONNECTION_TIMEOUT_MS = Field.create("connect.timeout.ms")
.withDisplayName("Connection Timeout (ms)") .withDisplayName("Connection Timeout (ms)")
.withType(Type.INT) .withType(Type.INT)
@ -561,6 +591,7 @@ public static final Field MASK_COLUMN(int length) {
TABLE_WHITELIST, TABLE_BLACKLIST, TABLES_IGNORE_BUILTIN, TABLE_WHITELIST, TABLE_BLACKLIST, TABLES_IGNORE_BUILTIN,
DATABASE_WHITELIST, DATABASE_BLACKLIST, DATABASE_WHITELIST, DATABASE_BLACKLIST,
COLUMN_BLACKLIST, SNAPSHOT_MODE, SNAPSHOT_MINIMAL_LOCKING, COLUMN_BLACKLIST, SNAPSHOT_MODE, SNAPSHOT_MINIMAL_LOCKING,
GTID_SOURCE_INCLUDES, GTID_SOURCE_EXCLUDES,
TIME_PRECISION_MODE, TIME_PRECISION_MODE,
SSL_MODE, SSL_KEYSTORE, SSL_KEYSTORE_PASSWORD, SSL_MODE, SSL_KEYSTORE, SSL_KEYSTORE_PASSWORD,
SSL_TRUSTSTORE, SSL_TRUSTSTORE_PASSWORD); SSL_TRUSTSTORE, SSL_TRUSTSTORE_PASSWORD);
@ -584,7 +615,8 @@ protected static ConfigDef configDef() {
KafkaDatabaseHistory.TOPIC, KafkaDatabaseHistory.RECOVERY_POLL_ATTEMPTS, KafkaDatabaseHistory.TOPIC, KafkaDatabaseHistory.RECOVERY_POLL_ATTEMPTS,
KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS, DATABASE_HISTORY); KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS, DATABASE_HISTORY);
Field.group(config, "Events", INCLUDE_SCHEMA_CHANGES, TABLES_IGNORE_BUILTIN, DATABASE_WHITELIST, TABLE_WHITELIST, Field.group(config, "Events", INCLUDE_SCHEMA_CHANGES, TABLES_IGNORE_BUILTIN, DATABASE_WHITELIST, TABLE_WHITELIST,
COLUMN_BLACKLIST, TABLE_BLACKLIST, DATABASE_BLACKLIST); COLUMN_BLACKLIST, TABLE_BLACKLIST, DATABASE_BLACKLIST,
GTID_SOURCE_INCLUDES, GTID_SOURCE_EXCLUDES);
Field.group(config, "Connector", CONNECTION_TIMEOUT_MS, KEEP_ALIVE, MAX_QUEUE_SIZE, MAX_BATCH_SIZE, POLL_INTERVAL_MS, Field.group(config, "Connector", CONNECTION_TIMEOUT_MS, KEEP_ALIVE, MAX_QUEUE_SIZE, MAX_BATCH_SIZE, POLL_INTERVAL_MS,
SNAPSHOT_MODE, SNAPSHOT_MINIMAL_LOCKING, TIME_PRECISION_MODE); SNAPSHOT_MODE, SNAPSHOT_MINIMAL_LOCKING, TIME_PRECISION_MODE);
return config; return config;
@ -682,6 +714,16 @@ private static int validateTableBlacklist(Configuration config, Field field, Val
return 0; return 0;
} }
private static int validateGtidSetExcludes(Configuration config, Field field, ValidationOutput problems) {
String includes = config.getString(GTID_SOURCE_INCLUDES);
String excludes = config.getString(GTID_SOURCE_EXCLUDES);
if (includes != null && excludes != null) {
problems.accept(GTID_SOURCE_EXCLUDES, excludes, "Included GTID source UUIDs are already specified");
return 1;
}
return 0;
}
private static int validateColumnBlacklist(Configuration config, Field field, ValidationOutput problems) { private static int validateColumnBlacklist(Configuration config, Field field, ValidationOutput problems) {
// String blacklist = config.getString(COLUMN_BLACKLIST); // String blacklist = config.getString(COLUMN_BLACKLIST);
return 0; return 0;

View File

@ -249,8 +249,9 @@ protected boolean isBinlogAvailable() {
logger.info("Connector used GTIDs previously, but MySQL does not know of any GTIDs or they are not enabled"); logger.info("Connector used GTIDs previously, but MySQL does not know of any GTIDs or they are not enabled");
return false; return false;
} }
// GTIDs are enabled, and we used them previously ... // GTIDs are enabled, and we used them previously, but retain only those GTID ranges for the allowed source UUIDs ...
GtidSet gtidSet = new GtidSet(gtidStr); GtidSet gtidSet = new GtidSet(gtidStr).retainAll(taskContext.gtidSourceFilter());
// Get the GTID set that is available in the server ...
GtidSet availableGtidSet = new GtidSet(knownGtidSet()); GtidSet availableGtidSet = new GtidSet(knownGtidSet());
if (gtidSet.isContainedWithin(availableGtidSet)) { if (gtidSet.isContainedWithin(availableGtidSet)) {
logger.info("MySQL current GTID set {} does contain the GTID set required by the connector {}", availableGtidSet, gtidSet); logger.info("MySQL current GTID set {} does contain the GTID set required by the connector {}", availableGtidSet, gtidSet);

View File

@ -6,9 +6,11 @@
package io.debezium.connector.mysql; package io.debezium.connector.mysql;
import java.util.Map; import java.util.Map;
import java.util.function.Predicate;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode; import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
import io.debezium.function.Predicates;
import io.debezium.util.Clock; import io.debezium.util.Clock;
import io.debezium.util.LoggingContext; import io.debezium.util.LoggingContext;
import io.debezium.util.LoggingContext.PreviousContext; import io.debezium.util.LoggingContext.PreviousContext;
@ -26,6 +28,7 @@ public final class MySqlTaskContext extends MySqlJdbcContext {
private final MySqlSchema dbSchema; private final MySqlSchema dbSchema;
private final TopicSelector topicSelector; private final TopicSelector topicSelector;
private final RecordMakers recordProcessor; private final RecordMakers recordProcessor;
private final Predicate<String> gtidSourceFilter;
private final Clock clock = Clock.system(); private final Clock clock = Clock.system();
public MySqlTaskContext(Configuration config) { public MySqlTaskContext(Configuration config) {
@ -43,6 +46,11 @@ public MySqlTaskContext(Configuration config) {
// Set up the record processor ... // Set up the record processor ...
this.recordProcessor = new RecordMakers(dbSchema, source, topicSelector); this.recordProcessor = new RecordMakers(dbSchema, source, topicSelector);
String gtidSetIncludes = config.getString(MySqlConnectorConfig.GTID_SOURCE_INCLUDES);
String gtidSetExcludes = config.getString(MySqlConnectorConfig.GTID_SOURCE_EXCLUDES);
this.gtidSourceFilter = gtidSetIncludes != null ? Predicates.includes(gtidSetIncludes)
: (gtidSetExcludes != null ? Predicates.excludes(gtidSetExcludes) : null);
} }
public TopicSelector topicSelector() { public TopicSelector topicSelector() {
@ -61,6 +69,16 @@ public RecordMakers makeRecord() {
return recordProcessor; return recordProcessor;
} }
/**
* Get the predicate function that will return {@code true} if a GTID source is to be included, or {@code false} if
* a GTID source is to be excluded.
*
* @return the GTID source predicate function; never null
*/
public Predicate<String> gtidSourceFilter() {
return gtidSourceFilter;
}
/** /**
* Initialize the database history with any server-specific information. This should be done only upon connector startup * Initialize the database history with any server-specific information. This should be done only upon connector startup
* when the connector has no prior history. * when the connector has no prior history.

View File

@ -6,7 +6,10 @@
package io.debezium.connector.mysql; package io.debezium.connector.mysql;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
import org.junit.Test; import org.junit.Test;
@ -14,6 +17,7 @@
import io.debezium.connector.mysql.GtidSet.Interval; import io.debezium.connector.mysql.GtidSet.Interval;
import io.debezium.connector.mysql.GtidSet.UUIDSet; import io.debezium.connector.mysql.GtidSet.UUIDSet;
import io.debezium.util.Collect;
/** /**
* @author Randall Hauch * @author Randall Hauch
@ -109,6 +113,25 @@ public void shouldCorrectlyDetermineIfComplexGtidSetWithVariousLineSeparatorsIsC
}); });
} }
@Test
public void shouldFilterServerUuids() {
String gtidStr = "036d85a9-64e5-11e6-9b48-42010af0000c:1-2,"
+ "7145bf69-d1ca-11e5-a588-0242ac110004:1-3200,"
+ "7c1de3f2-3fd2-11e6-9cdc-42010af000bc:1-41";
Collection<String> keepers = Collect.arrayListOf("036d85a9-64e5-11e6-9b48-42010af0000c",
"7c1de3f2-3fd2-11e6-9cdc-42010af000bc",
"wont-be-found");
GtidSet original = new GtidSet(gtidStr);
assertThat(original.forServerWithId("036d85a9-64e5-11e6-9b48-42010af0000c")).isNotNull();
assertThat(original.forServerWithId("7c1de3f2-3fd2-11e6-9cdc-42010af000bc")).isNotNull();
assertThat(original.forServerWithId("7145bf69-d1ca-11e5-a588-0242ac110004")).isNotNull();
GtidSet filtered = original.retainAll(keepers::contains);
List<String> actualUuids = filtered.getUUIDSets().stream().map(UUIDSet::getUUID).collect(Collectors.toList());
assertThat(keepers.containsAll(actualUuids)).isTrue();
assertThat(filtered.forServerWithId("7145bf69-d1ca-11e5-a588-0242ac110004")).isNull();
}
protected void asertIntervalCount(String uuid, int count) { protected void asertIntervalCount(String uuid, int count) {
UUIDSet set = gtids.forServerWithId(uuid); UUIDSet set = gtids.forServerWithId(uuid);
assertThat(set.getIntervals().size()).isEqualTo(count); assertThat(set.getIntervals().size()).isEqualTo(count);

View File

@ -5,78 +5,18 @@
*/ */
package io.debezium.connector.mysql; package io.debezium.connector.mysql;
import java.nio.file.Path;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import static org.fest.assertions.Assertions.assertThat; import static org.fest.assertions.Assertions.assertThat;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.util.Testing;
/** /**
* @author Randall Hauch * @author Randall Hauch
* *
*/ */
public class MySqlTaskContextIT { public class MySqlTaskContextIT extends MySqlTaskContextTest {
private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-context.txt").toAbsolutePath();
private String hostname;
private int port;
private String username;
private String password;
private int serverId;
private String serverName;
private String databaseName;
private Configuration config;
private MySqlTaskContext context;
@Before
public void beforeEach() {
hostname = System.getProperty("database.hostname");
port = Integer.parseInt(System.getProperty("database.port"));
username = "snapper";
password = "snapperpass";
serverId = 18965;
serverName = "logical_server_name";
databaseName = "connector_test_ro";
Testing.Files.delete(DB_HISTORY_PATH);
}
@After
public void afterEach() {
if (context != null) {
try {
context.shutdown();
} finally {
context = null;
Testing.Files.delete(DB_HISTORY_PATH);
}
}
}
protected Configuration.Builder simpleConfig() {
return Configuration.create()
.with(MySqlConnectorConfig.HOSTNAME, hostname)
.with(MySqlConnectorConfig.PORT, port)
.with(MySqlConnectorConfig.USER, username)
.with(MySqlConnectorConfig.PASSWORD, password)
.with(MySqlConnectorConfig.SSL_MODE, SecureConnectionMode.DISABLED.name().toLowerCase())
.with(MySqlConnectorConfig.SERVER_ID, serverId)
.with(MySqlConnectorConfig.SERVER_NAME, serverName)
.with(MySqlConnectorConfig.DATABASE_WHITELIST, databaseName)
.with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class)
.with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH);
}
@Test @Test
public void shouldCreateTaskFromConfiguration() throws Exception { public void shouldCreateTaskFromConfiguration() throws Exception {
@ -113,30 +53,6 @@ public void shouldCreateTaskFromConfiguration() throws Exception {
assertNotConnectedToJdbc(); assertNotConnectedToJdbc();
} }
@Test
public void shouldCreateTaskFromConfigurationWithNeverSnapshotMode() throws Exception {
config = simpleConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue())
.build();
context = new MySqlTaskContext(config);
context.start();
assertThat("" + context.snapshotMode().getValue()).isEqualTo(SnapshotMode.NEVER.getValue());
assertThat(context.isSnapshotAllowedWhenNeeded()).isEqualTo(false);
assertThat(context.isSnapshotNeverAllowed()).isEqualTo(true);
}
@Test
public void shouldCreateTaskFromConfigurationWithWhenNeededSnapshotMode() throws Exception {
config = simpleConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.WHEN_NEEDED.getValue())
.build();
context = new MySqlTaskContext(config);
context.start();
assertThat("" + context.snapshotMode().getValue()).isEqualTo(SnapshotMode.WHEN_NEEDED.getValue());
assertThat(context.isSnapshotAllowedWhenNeeded()).isEqualTo(true);
assertThat(context.isSnapshotNeverAllowed()).isEqualTo(false);
}
@Test @Test
public void shouldCloseJdbcConnectionOnShutdown() throws Exception { public void shouldCloseJdbcConnectionOnShutdown() throws Exception {
config = simpleConfig().build(); config = simpleConfig().build();

View File

@ -0,0 +1,204 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import java.nio.file.Path;
import java.util.function.Predicate;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.fest.assertions.Assertions.assertThat;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.util.Testing;
/**
* @author Randall Hauch
*
*/
public class MySqlTaskContextTest {
protected static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-context.txt").toAbsolutePath();
protected String hostname;
protected int port;
protected String username;
protected String password;
protected int serverId;
protected String serverName;
protected String databaseName;
protected Configuration config;
protected MySqlTaskContext context;
@Before
public void beforeEach() {
hostname = System.getProperty("database.hostname");
if (hostname == null) hostname = "localhost";
String portStr = System.getProperty("database.port");
if (portStr != null) {
port = Integer.parseInt(portStr);
} else {
port = (Integer) MySqlConnectorConfig.PORT.defaultValue();
}
username = "snapper";
password = "snapperpass";
serverId = 18965;
serverName = "logical_server_name";
databaseName = "connector_test_ro";
Testing.Files.delete(DB_HISTORY_PATH);
}
@After
public void afterEach() {
if (context != null) {
try {
context.shutdown();
} finally {
context = null;
Testing.Files.delete(DB_HISTORY_PATH);
}
}
}
protected Configuration.Builder simpleConfig() {
return Configuration.create()
.with(MySqlConnectorConfig.HOSTNAME, hostname)
.with(MySqlConnectorConfig.PORT, port)
.with(MySqlConnectorConfig.USER, username)
.with(MySqlConnectorConfig.PASSWORD, password)
.with(MySqlConnectorConfig.SSL_MODE, SecureConnectionMode.DISABLED.name().toLowerCase())
.with(MySqlConnectorConfig.SERVER_ID, serverId)
.with(MySqlConnectorConfig.SERVER_NAME, serverName)
.with(MySqlConnectorConfig.DATABASE_WHITELIST, databaseName)
.with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class)
.with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH);
}
@Test
public void shouldCreateTaskFromConfigurationWithNeverSnapshotMode() throws Exception {
config = simpleConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue())
.build();
context = new MySqlTaskContext(config);
context.start();
assertThat("" + context.snapshotMode().getValue()).isEqualTo(SnapshotMode.NEVER.getValue());
assertThat(context.isSnapshotAllowedWhenNeeded()).isEqualTo(false);
assertThat(context.isSnapshotNeverAllowed()).isEqualTo(true);
}
@Test
public void shouldCreateTaskFromConfigurationWithWhenNeededSnapshotMode() throws Exception {
config = simpleConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.WHEN_NEEDED.getValue())
.build();
context = new MySqlTaskContext(config);
context.start();
assertThat("" + context.snapshotMode().getValue()).isEqualTo(SnapshotMode.WHEN_NEEDED.getValue());
assertThat(context.isSnapshotAllowedWhenNeeded()).isEqualTo(true);
assertThat(context.isSnapshotNeverAllowed()).isEqualTo(false);
}
@Test
public void shouldUseGtidSetIncludes() throws Exception {
config = simpleConfig().with(MySqlConnectorConfig.GTID_SOURCE_INCLUDES, "a,b,c,d.*")
.build();
context = new MySqlTaskContext(config);
context.start();
Predicate<String> filter = context.gtidSourceFilter();
assertThat(filter).isNotNull();
assertThat(filter.test("a")).isTrue();
assertThat(filter.test("b")).isTrue();
assertThat(filter.test("c")).isTrue();
assertThat(filter.test("d")).isTrue();
assertThat(filter.test("d1")).isTrue();
assertThat(filter.test("d2")).isTrue();
assertThat(filter.test("d1234xdgfe")).isTrue();
assertThat(filter.test("a1")).isFalse();
assertThat(filter.test("a2")).isFalse();
assertThat(filter.test("b1")).isFalse();
assertThat(filter.test("c1")).isFalse();
assertThat(filter.test("e")).isFalse();
}
@Test
public void shouldUseGtidSetIncludesLiteralUuids() throws Exception {
String gtidStr = "036d85a9-64e5-11e6-9b48-42010af0000c:1-2,"
+ "7145bf69-d1ca-11e5-a588-0242ac110004:1-3200,"
+ "7c1de3f2-3fd2-11e6-9cdc-42010af000bc:1-41";
config = simpleConfig().with(MySqlConnectorConfig.GTID_SOURCE_INCLUDES,
"036d85a9-64e5-11e6-9b48-42010af0000c,7145bf69-d1ca-11e5-a588-0242ac110004")
.build();
context = new MySqlTaskContext(config);
context.start();
Predicate<String> filter = context.gtidSourceFilter();
assertThat(filter).isNotNull();
assertThat(filter.test("036d85a9-64e5-11e6-9b48-42010af0000c")).isTrue();
assertThat(filter.test("7145bf69-d1ca-11e5-a588-0242ac110004")).isTrue();
assertThat(filter.test("036d85a9-64e5-11e6-9b48-42010af0000c-extra")).isFalse();
assertThat(filter.test("7145bf69-d1ca-11e5-a588-0242ac110004-extra")).isFalse();
assertThat(filter.test("7c1de3f2-3fd2-11e6-9cdc-42010af000bc")).isFalse();
GtidSet original = new GtidSet(gtidStr);
assertThat(original.forServerWithId("036d85a9-64e5-11e6-9b48-42010af0000c")).isNotNull();
assertThat(original.forServerWithId("7c1de3f2-3fd2-11e6-9cdc-42010af000bc")).isNotNull();
assertThat(original.forServerWithId("7145bf69-d1ca-11e5-a588-0242ac110004")).isNotNull();
GtidSet filtered = original.retainAll(filter);
assertThat(filtered.forServerWithId("036d85a9-64e5-11e6-9b48-42010af0000c")).isNotNull();
assertThat(filtered.forServerWithId("7c1de3f2-3fd2-11e6-9cdc-42010af000bc")).isNull();
assertThat(filtered.forServerWithId("7145bf69-d1ca-11e5-a588-0242ac110004")).isNotNull();
}
@Test
public void shouldUseGtidSetxcludesLiteralUuids() throws Exception {
String gtidStr = "036d85a9-64e5-11e6-9b48-42010af0000c:1-2,"
+ "7145bf69-d1ca-11e5-a588-0242ac110004:1-3200,"
+ "7c1de3f2-3fd2-11e6-9cdc-42010af000bc:1-41";
config = simpleConfig().with(MySqlConnectorConfig.GTID_SOURCE_EXCLUDES,
"7c1de3f2-3fd2-11e6-9cdc-42010af000bc")
.build();
context = new MySqlTaskContext(config);
context.start();
Predicate<String> filter = context.gtidSourceFilter();
assertThat(filter).isNotNull();
assertThat(filter.test("036d85a9-64e5-11e6-9b48-42010af0000c")).isTrue();
assertThat(filter.test("7145bf69-d1ca-11e5-a588-0242ac110004")).isTrue();
assertThat(filter.test("036d85a9-64e5-11e6-9b48-42010af0000c-extra")).isTrue();
assertThat(filter.test("7145bf69-d1ca-11e5-a588-0242ac110004-extra")).isTrue();
assertThat(filter.test("7c1de3f2-3fd2-11e6-9cdc-42010af000bc")).isFalse();
GtidSet original = new GtidSet(gtidStr);
assertThat(original.forServerWithId("036d85a9-64e5-11e6-9b48-42010af0000c")).isNotNull();
assertThat(original.forServerWithId("7c1de3f2-3fd2-11e6-9cdc-42010af000bc")).isNotNull();
assertThat(original.forServerWithId("7145bf69-d1ca-11e5-a588-0242ac110004")).isNotNull();
GtidSet filtered = original.retainAll(filter);
assertThat(filtered.forServerWithId("036d85a9-64e5-11e6-9b48-42010af0000c")).isNotNull();
assertThat(filtered.forServerWithId("7c1de3f2-3fd2-11e6-9cdc-42010af000bc")).isNull();
assertThat(filtered.forServerWithId("7145bf69-d1ca-11e5-a588-0242ac110004")).isNotNull();
}
@Test
public void shouldNotAllowBothGtidSetIncludesAndExcludes() throws Exception {
config = simpleConfig().with(MySqlConnectorConfig.GTID_SOURCE_INCLUDES,
"036d85a9-64e5-11e6-9b48-42010af0000c,7145bf69-d1ca-11e5-a588-0242ac110004")
.with(MySqlConnectorConfig.GTID_SOURCE_EXCLUDES,
"7c1de3f2-3fd2-11e6-9cdc-42010af000bc:1-41")
.build();
context = new MySqlTaskContext(config);
boolean valid = config.validateAndRecord(MySqlConnectorConfig.ALL_FIELDS,msg->{});
assertThat(valid).isFalse();
}
}