DBZ-349 Better support for large append-only tables by making the snapshotting process restartable
This commit is contained in:
parent
4e8cedd094
commit
631c518d8e
@ -38,6 +38,11 @@
|
||||
<groupId>mysql</groupId>
|
||||
<artifactId>mysql-connector-java</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
</dependency>
|
||||
|
||||
|
||||
<!-- Testing -->
|
||||
<dependency>
|
||||
|
@ -735,6 +735,15 @@ public static EventDeserializationFailureHandlingMode parse(String value) {
|
||||
+ "'warn' the problematic event and its binlog position will be logged and the event will be skipped;"
|
||||
+ "'ignore' the problematic event will be skipped.");
|
||||
|
||||
public static final Field SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE = Field.create("snapshot.select.statement.overrides")
|
||||
.withDisplayName("Overrides for the default select statement used during snapshotting")
|
||||
.withType(Type.STRING)
|
||||
.withWidth(Width.LONG)
|
||||
.withImportance(Importance.MEDIUM)
|
||||
.withDescription("The value is a JSON-encoded map, where the key is the name of the table, the value is the select statement to use when retrieving data from the " +
|
||||
"specific table during snapshotting. A possible use case for large append-only tables is setting a specific point where to start snapshotting, in case " +
|
||||
"a previous snapshotting was interrupted.");
|
||||
|
||||
/**
|
||||
* Method that generates a Field for specifying that string columns whose names match a set of regular expressions should
|
||||
* have their values truncated to be no longer than the specified number of characters.
|
||||
|
@ -5,12 +5,17 @@
|
||||
*/
|
||||
package io.debezium.connector.mysql;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import javax.management.MalformedObjectNameException;
|
||||
import javax.management.ObjectName;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
|
||||
import io.debezium.function.Predicates;
|
||||
@ -35,6 +40,8 @@ public final class MySqlTaskContext extends MySqlJdbcContext {
|
||||
private final Predicate<String> gtidSourceFilter;
|
||||
private final Predicate<String> ddlFilter;
|
||||
private final Clock clock = Clock.system();
|
||||
private final ObjectMapper jsonObjectMapper = new ObjectMapper();
|
||||
private Map<String, String> snapshotSelectOverridesByTable;
|
||||
|
||||
public MySqlTaskContext(Configuration config) {
|
||||
super(config);
|
||||
@ -195,6 +202,24 @@ public boolean useMinimalSnapshotLocking() {
|
||||
return config.getBoolean(MySqlConnectorConfig.SNAPSHOT_MINIMAL_LOCKING);
|
||||
}
|
||||
|
||||
public Optional<String> getSnapshotSelectOverride(String tableId) {
|
||||
if (snapshotSelectOverridesByTable == null) {
|
||||
snapshotSelectOverridesByTable = new HashMap<>();
|
||||
String overridesInJson = config.getString(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, ()->null);
|
||||
if (overridesInJson != null) {
|
||||
TypeReference<HashMap<String, String>> typeRef = new TypeReference<HashMap<String, String>>() {};
|
||||
try {
|
||||
snapshotSelectOverridesByTable = jsonObjectMapper.readValue(overridesInJson, typeRef);
|
||||
} catch (IOException ioe) {
|
||||
logger.warn(String.format("Failed to parse value of %s as JSON. Value is: %s",
|
||||
MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE,
|
||||
overridesInJson));
|
||||
}
|
||||
}
|
||||
}
|
||||
return Optional.ofNullable(snapshotSelectOverridesByTable.get(tableId));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
super.start();
|
||||
|
@ -508,7 +508,10 @@ protected void execute() {
|
||||
// Scan the rows in the table ...
|
||||
long start = clock.currentTimeInMillis();
|
||||
logger.info("Step {}: - scanning table '{}' ({} of {} tables)", step, tableId, ++counter, tableIds.size());
|
||||
sql.set("SELECT * FROM " + quote(tableId));
|
||||
String selectStatement = context.getSnapshotSelectOverride(tableId.toString())
|
||||
.orElse("SELECT * FROM " + quote(tableId));
|
||||
logger.info("For table '{}' using select statement: '{}'", tableId, selectStatement);
|
||||
sql.set(selectStatement);
|
||||
try {
|
||||
int stepNum = step;
|
||||
mysql.query(sql.get(), statementFactory, rs -> {
|
||||
|
@ -604,6 +604,49 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldUseSpecificSelectStatementDuringSnapshotting() throws SQLException, InterruptedException {
|
||||
String masterPort = System.getProperty("database.port");
|
||||
String replicaPort = System.getProperty("database.replica.port");
|
||||
boolean replicaIsMaster = masterPort.equals(replicaPort);
|
||||
if (!replicaIsMaster) {
|
||||
// Give time for the replica to catch up to the master ...
|
||||
Thread.sleep(5000L);
|
||||
}
|
||||
|
||||
config = Configuration.create()
|
||||
.with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.replica.hostname"))
|
||||
.with(MySqlConnectorConfig.PORT, System.getProperty("database.replica.port"))
|
||||
.with(MySqlConnectorConfig.USER, "snapper")
|
||||
.with(MySqlConnectorConfig.PASSWORD, "snapperpass")
|
||||
.with(MySqlConnectorConfig.SERVER_ID, 28765)
|
||||
.with(MySqlConnectorConfig.SERVER_NAME, "myServer")
|
||||
.with(MySqlConnectorConfig.SSL_MODE, SecureConnectionMode.DISABLED)
|
||||
.with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10)
|
||||
.with(MySqlConnectorConfig.DATABASE_WHITELIST, "connector_test")
|
||||
.with(MySqlConnectorConfig.TABLE_WHITELIST, "connector_test.products")
|
||||
.with(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, "{\"connector_test.products\":\"SELECT * from connector_test.products where id>=108\"}")
|
||||
.with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class)
|
||||
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
|
||||
.with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH)
|
||||
.build();
|
||||
|
||||
// Start the connector ...
|
||||
start(MySqlConnector.class, config);
|
||||
|
||||
Testing.Print.enable();
|
||||
|
||||
// ---------------------------------------------------------------------------------------------------------------
|
||||
// Consume all of the events due to startup and initialization of the database
|
||||
// ---------------------------------------------------------------------------------------------------------------
|
||||
SourceRecords records = consumeRecordsByTopic(6+2); // 6 DDL and 2 insert records
|
||||
assertThat(records.recordsForTopic("myServer").size()).isEqualTo(6);
|
||||
assertThat(records.recordsForTopic("myServer.connector_test.products").size()).isEqualTo(2);
|
||||
|
||||
// Check that all records are valid, can be serialized and deserialized ...
|
||||
records.forEach(this::validate);
|
||||
}
|
||||
|
||||
protected static class BinlogPosition {
|
||||
private String binlogFilename;
|
||||
private long binlogPosition;
|
||||
|
6
pom.xml
6
pom.xml
@ -165,6 +165,12 @@
|
||||
<artifactId>jackson-core</artifactId>
|
||||
<version>${version.jackson}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
<version>${version.jackson}</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
<!-- Kafka Connect -->
|
||||
<dependency>
|
||||
|
Loading…
Reference in New Issue
Block a user