DBZ-2788 Replace MySQL connector option with SMT for mitigating wrong op flag

This commit is contained in:
ani-sha 2021-02-16 19:09:51 +05:30 committed by Gunnar Morling
parent 34de76b3b2
commit b6c61ac810
7 changed files with 134 additions and 172 deletions

View File

@ -861,15 +861,6 @@ public static GtidNewChannelPosition parse(String value, String defaultValue) {
+ "point, both old and new binlog readers will be momentarily halted and new binlog reader will start that will read the binlog for all " + "point, both old and new binlog readers will be momentarily halted and new binlog reader will start that will read the binlog for all "
+ "configured tables. The parallel binlog reader will have a configured server id of 10000 + the primary binlog reader's server id."); + "configured tables. The parallel binlog reader will have a configured server id of 10000 + the primary binlog reader's server id.");
public static final Field SNAPSHOT_EVENTS_AS_INSERTS = Field.create("snapshot.events.as.inserts")
.withDisplayName("Mark initial table snapshot events as insert events (op 'c')")
.withType(Type.BOOLEAN)
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withDescription("Whether or not to mark snapshot events as normal inserts (op 'c'). If disabled, the standard functionality of emitting these records as"
+ " reads (op 'r') will be used.")
.withDefault(true);
public static final Field TIME_PRECISION_MODE = RelationalDatabaseConnectorConfig.TIME_PRECISION_MODE public static final Field TIME_PRECISION_MODE = RelationalDatabaseConnectorConfig.TIME_PRECISION_MODE
.withEnum(TemporalPrecisionMode.class, TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS) .withEnum(TemporalPrecisionMode.class, TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS)
.withValidation(MySqlConnectorConfig::validateTimePrecisionMode) .withValidation(MySqlConnectorConfig::validateTimePrecisionMode)

View File

@ -73,7 +73,6 @@ public ChangeEventSourceCoordinator start(Configuration config) {
try { try {
// Get the offsets for our partition ... // Get the offsets for our partition ...
boolean startWithSnapshot = false; boolean startWithSnapshot = false;
boolean snapshotEventsAsInserts = config.getBoolean(MySqlConnectorConfig.SNAPSHOT_EVENTS_AS_INSERTS);
Map<String, String> partition = Collect.hashMapOf(SourceInfo.SERVER_PARTITION_KEY, serverName); Map<String, String> partition = Collect.hashMapOf(SourceInfo.SERVER_PARTITION_KEY, serverName);
Map<String, ?> offsets = getRestartOffset(context.offsetStorageReader().offset(partition)); Map<String, ?> offsets = getRestartOffset(context.offsetStorageReader().offset(partition));
final SourceInfo source; final SourceInfo source;
@ -192,9 +191,8 @@ else if (!earliestBinlogFilename.endsWith("00001")) {
if (startWithSnapshot) { if (startWithSnapshot) {
// We're supposed to start with a snapshot, so set that up ... // We're supposed to start with a snapshot, so set that up ...
SnapshotReader snapshotReader = new SnapshotReader("snapshot", taskContext); SnapshotReader snapshotReader = new SnapshotReader("snapshot", taskContext);
if (snapshotEventsAsInserts) {
snapshotReader.generateInsertEvents(); snapshotReader.generateReadEvents();
}
if (!taskContext.getConnectorConfig().getSnapshotDelay().isZero()) { if (!taskContext.getConnectorConfig().getSnapshotDelay().isZero()) {
// Adding a timed blocking reader to delay the snapshot, can help to avoid initial rebalancing interruptions // Adding a timed blocking reader to delay the snapshot, can help to avoid initial rebalancing interruptions

View File

@ -110,17 +110,6 @@ public SnapshotReader generateReadEvents() {
return this; return this;
} }
/**
* Set this reader's {@link #execute() execution} to produce a {@link io.debezium.data.Envelope.Operation#CREATE} event for
* each row.
*
* @return this object for method chaining; never null
*/
public SnapshotReader generateInsertEvents() {
recorder = this::recordRowAsInsert;
return this;
}
@Override @Override
protected void doInitialize() { protected void doInitialize() {
metrics.register(logger); metrics.register(logger);

View File

@ -0,0 +1,75 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql.transforms;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.Transformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.config.Configuration;
import io.debezium.data.Envelope;
import io.debezium.transforms.SmtManager;
/**
* This SMT allows the MySql connector to emit snapshot events as "c" operation type (CREATE) by changing the 'op' field of the records
* from "r" (default) to "c".
*
* @param <R> the subtype of {@link ConnectRecord} on which this transformation will operate
* @author Anisha Mohanty
*/
public class ReadToInsertEvent<R extends ConnectRecord<R>> implements Transformation<R> {
private static final Logger LOGGER = LoggerFactory.getLogger(ReadToInsertEvent.class);
private SmtManager<R> smtManager;
@Override
public R apply(R record) {
if (record.value() == null || !smtManager.isValidEnvelope(record)) {
return record;
}
Struct originalValueStruct = (Struct) record.value();
Struct updatedValueStruct;
String operation = originalValueStruct.getString(Envelope.FieldName.OPERATION);
if (operation.equals(Envelope.Operation.READ.code())) {
updatedValueStruct = originalValueStruct.put("op", Envelope.Operation.CREATE.code());
}
else {
return record;
}
return record.newRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
record.valueSchema(),
updatedValueStruct,
record.timestamp());
}
@Override
public ConfigDef config() {
return null;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> props) {
final Configuration config = Configuration.from(props);
smtManager = new SmtManager<>(config);
}
}

View File

@ -306,10 +306,11 @@ public void shouldCreateSnapshotSelectively() throws Exception {
} }
@Test @Test
public void shouldCreateSnapshotOfSingleDatabaseUsingReadEvents() throws Exception { public void shouldCreateSnapshotOfSingleDatabaseUsingInsertEvents() throws Exception {
config = simpleConfig() config = simpleConfig()
.with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, "connector_(.*)_" + DATABASE.getIdentifier()) .with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, "connector_(.*)_" + DATABASE.getIdentifier())
.with(MySqlConnectorConfig.SNAPSHOT_EVENTS_AS_INSERTS, false) .with("transforms", "snapshotasinsert")
.with("transforms.snapshotasinsert.type", "io.debezium.connector.mysql.transforms.ReadToInsertEvent")
.build(); .build();
// Start the connector ... // Start the connector ...
@ -336,46 +337,46 @@ public void shouldCreateSnapshotOfSingleDatabaseUsingReadEvents() throws Excepti
assertThat(store.collectionCount()).isEqualTo(9); // 2 databases assertThat(store.collectionCount()).isEqualTo(9); // 2 databases
Collection products = store.collection(DATABASE.getDatabaseName(), productsTableName()); Collection products = store.collection(DATABASE.getDatabaseName(), productsTableName());
assertThat(products.numberOfCreates()).isEqualTo(0); assertThat(products.numberOfCreates()).isEqualTo(9);
assertThat(products.numberOfUpdates()).isEqualTo(0); assertThat(products.numberOfUpdates()).isEqualTo(0);
assertThat(products.numberOfDeletes()).isEqualTo(0); assertThat(products.numberOfDeletes()).isEqualTo(0);
assertThat(products.numberOfReads()).isEqualTo(9); assertThat(products.numberOfReads()).isEqualTo(0);
assertThat(products.numberOfTombstones()).isEqualTo(0); assertThat(products.numberOfTombstones()).isEqualTo(0);
assertThat(products.numberOfKeySchemaChanges()).isEqualTo(1); assertThat(products.numberOfKeySchemaChanges()).isEqualTo(1);
assertThat(products.numberOfValueSchemaChanges()).isEqualTo(1); assertThat(products.numberOfValueSchemaChanges()).isEqualTo(1);
Collection products_on_hand = store.collection(DATABASE.getDatabaseName(), "products_on_hand"); Collection products_on_hand = store.collection(DATABASE.getDatabaseName(), "products_on_hand");
assertThat(products_on_hand.numberOfCreates()).isEqualTo(0); assertThat(products_on_hand.numberOfCreates()).isEqualTo(9);
assertThat(products_on_hand.numberOfUpdates()).isEqualTo(0); assertThat(products_on_hand.numberOfUpdates()).isEqualTo(0);
assertThat(products_on_hand.numberOfDeletes()).isEqualTo(0); assertThat(products_on_hand.numberOfDeletes()).isEqualTo(0);
assertThat(products_on_hand.numberOfReads()).isEqualTo(9); assertThat(products_on_hand.numberOfReads()).isEqualTo(0);
assertThat(products_on_hand.numberOfTombstones()).isEqualTo(0); assertThat(products_on_hand.numberOfTombstones()).isEqualTo(0);
assertThat(products_on_hand.numberOfKeySchemaChanges()).isEqualTo(1); assertThat(products_on_hand.numberOfKeySchemaChanges()).isEqualTo(1);
assertThat(products_on_hand.numberOfValueSchemaChanges()).isEqualTo(1); assertThat(products_on_hand.numberOfValueSchemaChanges()).isEqualTo(1);
Collection customers = store.collection(DATABASE.getDatabaseName(), "customers"); Collection customers = store.collection(DATABASE.getDatabaseName(), "customers");
assertThat(customers.numberOfCreates()).isEqualTo(0); assertThat(customers.numberOfCreates()).isEqualTo(4);
assertThat(customers.numberOfUpdates()).isEqualTo(0); assertThat(customers.numberOfUpdates()).isEqualTo(0);
assertThat(customers.numberOfDeletes()).isEqualTo(0); assertThat(customers.numberOfDeletes()).isEqualTo(0);
assertThat(customers.numberOfReads()).isEqualTo(4); assertThat(customers.numberOfReads()).isEqualTo(0);
assertThat(customers.numberOfTombstones()).isEqualTo(0); assertThat(customers.numberOfTombstones()).isEqualTo(0);
assertThat(customers.numberOfKeySchemaChanges()).isEqualTo(1); assertThat(customers.numberOfKeySchemaChanges()).isEqualTo(1);
assertThat(customers.numberOfValueSchemaChanges()).isEqualTo(1); assertThat(customers.numberOfValueSchemaChanges()).isEqualTo(1);
Collection orders = store.collection(DATABASE.getDatabaseName(), "orders"); Collection orders = store.collection(DATABASE.getDatabaseName(), "orders");
assertThat(orders.numberOfCreates()).isEqualTo(0); assertThat(orders.numberOfCreates()).isEqualTo(5);
assertThat(orders.numberOfUpdates()).isEqualTo(0); assertThat(orders.numberOfUpdates()).isEqualTo(0);
assertThat(orders.numberOfDeletes()).isEqualTo(0); assertThat(orders.numberOfDeletes()).isEqualTo(0);
assertThat(orders.numberOfReads()).isEqualTo(5); assertThat(orders.numberOfReads()).isEqualTo(0);
assertThat(orders.numberOfTombstones()).isEqualTo(0); assertThat(orders.numberOfTombstones()).isEqualTo(0);
assertThat(orders.numberOfKeySchemaChanges()).isEqualTo(1); assertThat(orders.numberOfKeySchemaChanges()).isEqualTo(1);
assertThat(orders.numberOfValueSchemaChanges()).isEqualTo(1); assertThat(orders.numberOfValueSchemaChanges()).isEqualTo(1);
Collection timetest = store.collection(DATABASE.getDatabaseName(), "dbz_342_timetest"); Collection timetest = store.collection(DATABASE.getDatabaseName(), "dbz_342_timetest");
assertThat(timetest.numberOfCreates()).isEqualTo(0); assertThat(timetest.numberOfCreates()).isEqualTo(1);
assertThat(timetest.numberOfUpdates()).isEqualTo(0); assertThat(timetest.numberOfUpdates()).isEqualTo(0);
assertThat(timetest.numberOfDeletes()).isEqualTo(0); assertThat(timetest.numberOfDeletes()).isEqualTo(0);
assertThat(timetest.numberOfReads()).isEqualTo(1); assertThat(timetest.numberOfReads()).isEqualTo(0);
assertThat(timetest.numberOfTombstones()).isEqualTo(0); assertThat(timetest.numberOfTombstones()).isEqualTo(0);
assertThat(timetest.numberOfKeySchemaChanges()).isEqualTo(1); assertThat(timetest.numberOfKeySchemaChanges()).isEqualTo(1);
assertThat(timetest.numberOfValueSchemaChanges()).isEqualTo(1); assertThat(timetest.numberOfValueSchemaChanges()).isEqualTo(1);

View File

@ -136,7 +136,7 @@ private void snapshotOfSingleDatabase(boolean useGlobalLock, boolean storeOnlyMo
context.start(); context.start();
reader = new SnapshotReader("snapshot", context, useGlobalLock); reader = new SnapshotReader("snapshot", context, useGlobalLock);
reader.uponCompletion(completed::countDown); reader.uponCompletion(completed::countDown);
reader.generateInsertEvents(); reader.generateReadEvents();
// Start the snapshot ... // Start the snapshot ...
reader.start(); reader.start();
@ -169,28 +169,28 @@ private void snapshotOfSingleDatabase(boolean useGlobalLock, boolean storeOnlyMo
// Check the records via the store ... // Check the records via the store ...
assertThat(store.collectionCount()).isEqualTo(5); assertThat(store.collectionCount()).isEqualTo(5);
Collection products = store.collection(DATABASE.getDatabaseName(), productsTableName()); Collection products = store.collection(DATABASE.getDatabaseName(), productsTableName());
assertThat(products.numberOfCreates()).isEqualTo(9); assertThat(products.numberOfCreates()).isEqualTo(0);
assertThat(products.numberOfUpdates()).isEqualTo(0); assertThat(products.numberOfUpdates()).isEqualTo(0);
assertThat(products.numberOfDeletes()).isEqualTo(0); assertThat(products.numberOfDeletes()).isEqualTo(0);
assertThat(products.numberOfReads()).isEqualTo(0); assertThat(products.numberOfReads()).isEqualTo(9);
assertThat(products.numberOfTombstones()).isEqualTo(0); assertThat(products.numberOfTombstones()).isEqualTo(0);
assertThat(products.numberOfKeySchemaChanges()).isEqualTo(1); assertThat(products.numberOfKeySchemaChanges()).isEqualTo(1);
assertThat(products.numberOfValueSchemaChanges()).isEqualTo(1); assertThat(products.numberOfValueSchemaChanges()).isEqualTo(1);
Collection products_on_hand = store.collection(DATABASE.getDatabaseName(), "products_on_hand"); Collection products_on_hand = store.collection(DATABASE.getDatabaseName(), "products_on_hand");
assertThat(products_on_hand.numberOfCreates()).isEqualTo(9); assertThat(products_on_hand.numberOfCreates()).isEqualTo(0);
assertThat(products_on_hand.numberOfUpdates()).isEqualTo(0); assertThat(products_on_hand.numberOfUpdates()).isEqualTo(0);
assertThat(products_on_hand.numberOfDeletes()).isEqualTo(0); assertThat(products_on_hand.numberOfDeletes()).isEqualTo(0);
assertThat(products_on_hand.numberOfReads()).isEqualTo(0); assertThat(products_on_hand.numberOfReads()).isEqualTo(9);
assertThat(products_on_hand.numberOfTombstones()).isEqualTo(0); assertThat(products_on_hand.numberOfTombstones()).isEqualTo(0);
assertThat(products_on_hand.numberOfKeySchemaChanges()).isEqualTo(1); assertThat(products_on_hand.numberOfKeySchemaChanges()).isEqualTo(1);
assertThat(products_on_hand.numberOfValueSchemaChanges()).isEqualTo(1); assertThat(products_on_hand.numberOfValueSchemaChanges()).isEqualTo(1);
Collection customers = store.collection(DATABASE.getDatabaseName(), "customers"); Collection customers = store.collection(DATABASE.getDatabaseName(), "customers");
assertThat(customers.numberOfCreates()).isEqualTo(4); assertThat(customers.numberOfCreates()).isEqualTo(0);
assertThat(customers.numberOfUpdates()).isEqualTo(0); assertThat(customers.numberOfUpdates()).isEqualTo(0);
assertThat(customers.numberOfDeletes()).isEqualTo(0); assertThat(customers.numberOfDeletes()).isEqualTo(0);
assertThat(customers.numberOfReads()).isEqualTo(0); assertThat(customers.numberOfReads()).isEqualTo(4);
assertThat(customers.numberOfTombstones()).isEqualTo(0); assertThat(customers.numberOfTombstones()).isEqualTo(0);
assertThat(customers.numberOfKeySchemaChanges()).isEqualTo(1); assertThat(customers.numberOfKeySchemaChanges()).isEqualTo(1);
assertThat(customers.numberOfValueSchemaChanges()).isEqualTo(1); assertThat(customers.numberOfValueSchemaChanges()).isEqualTo(1);
@ -210,19 +210,19 @@ private void snapshotOfSingleDatabase(boolean useGlobalLock, boolean storeOnlyMo
assertThat(customer.get("email")).isEqualTo("sally.thomas@acme.com"); assertThat(customer.get("email")).isEqualTo("sally.thomas@acme.com");
Collection orders = store.collection(DATABASE.getDatabaseName(), "orders"); Collection orders = store.collection(DATABASE.getDatabaseName(), "orders");
assertThat(orders.numberOfCreates()).isEqualTo(5); assertThat(orders.numberOfCreates()).isEqualTo(0);
assertThat(orders.numberOfUpdates()).isEqualTo(0); assertThat(orders.numberOfUpdates()).isEqualTo(0);
assertThat(orders.numberOfDeletes()).isEqualTo(0); assertThat(orders.numberOfDeletes()).isEqualTo(0);
assertThat(orders.numberOfReads()).isEqualTo(0); assertThat(orders.numberOfReads()).isEqualTo(5);
assertThat(orders.numberOfTombstones()).isEqualTo(0); assertThat(orders.numberOfTombstones()).isEqualTo(0);
assertThat(orders.numberOfKeySchemaChanges()).isEqualTo(1); assertThat(orders.numberOfKeySchemaChanges()).isEqualTo(1);
assertThat(orders.numberOfValueSchemaChanges()).isEqualTo(1); assertThat(orders.numberOfValueSchemaChanges()).isEqualTo(1);
Collection timetest = store.collection(DATABASE.getDatabaseName(), "dbz_342_timetest"); Collection timetest = store.collection(DATABASE.getDatabaseName(), "dbz_342_timetest");
assertThat(timetest.numberOfCreates()).isEqualTo(1); assertThat(timetest.numberOfCreates()).isEqualTo(0);
assertThat(timetest.numberOfUpdates()).isEqualTo(0); assertThat(timetest.numberOfUpdates()).isEqualTo(0);
assertThat(timetest.numberOfDeletes()).isEqualTo(0); assertThat(timetest.numberOfDeletes()).isEqualTo(0);
assertThat(timetest.numberOfReads()).isEqualTo(0); assertThat(timetest.numberOfReads()).isEqualTo(1);
assertThat(timetest.numberOfTombstones()).isEqualTo(0); assertThat(timetest.numberOfTombstones()).isEqualTo(0);
assertThat(timetest.numberOfKeySchemaChanges()).isEqualTo(1); assertThat(timetest.numberOfKeySchemaChanges()).isEqualTo(1);
assertThat(timetest.numberOfValueSchemaChanges()).isEqualTo(1); assertThat(timetest.numberOfValueSchemaChanges()).isEqualTo(1);
@ -260,7 +260,7 @@ public void snapshotWithBackupLocksShouldNotWaitForReads() throws Exception {
context.start(); context.start();
reader = new SnapshotReader("snapshot", context, true); reader = new SnapshotReader("snapshot", context, true);
reader.generateInsertEvents(); reader.generateReadEvents();
if (!MySQLConnection.isPerconaServer()) { if (!MySQLConnection.isPerconaServer()) {
reader.start(); // Start the reader to avoid failure in the afterEach method. reader.start(); // Start the reader to avoid failure in the afterEach method.
@ -350,16 +350,18 @@ public void shouldCreateSnapshotSelectively() throws Exception {
assertThat(orders).isNull(); assertThat(orders).isNull();
} }
private String productsTableName() {
return context.isTableIdCaseInsensitive() ? "products" : "Products";
}
@Test @Test
public void shouldCreateSnapshotOfSingleDatabaseUsingReadEvents() throws Exception { public void shouldCreateSnapshotOfSingleDatabaseWithSchemaChanges() throws Exception {
config = simpleConfig() config = simpleConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).build();
.with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, "connector_(.*)_" + DATABASE.getIdentifier())
.with(MySqlConnectorConfig.SNAPSHOT_EVENTS_AS_INSERTS, false)
.build();
context = new MySqlTaskContext(config, new Filters.Builder(config).build()); context = new MySqlTaskContext(config, new Filters.Builder(config).build());
context.start(); context.start();
reader = new SnapshotReader("snapshot", context); reader = new SnapshotReader("snapshot", context);
reader.uponCompletion(completed::countDown); reader.uponCompletion(completed::countDown);
reader.generateReadEvents();
// Start the snapshot ... // Start the snapshot ...
reader.start(); reader.start();
@ -380,13 +382,13 @@ public void shouldCreateSnapshotOfSingleDatabaseUsingReadEvents() throws Excepti
// The last poll should always return null ... // The last poll should always return null ...
assertThat(records).isNull(); assertThat(records).isNull();
// There should be no schema changes ... // There should be 11 schema changes plus 1 SET statement ...
assertThat(schemaChanges.recordCount()).isEqualTo(0); assertThat(schemaChanges.recordCount()).isEqualTo(14);
assertThat(schemaChanges.databaseCount()).isEqualTo(2);
assertThat(schemaChanges.databases()).containsOnly(DATABASE.getDatabaseName(), "");
// Check the records via the store ... // Check the records via the store ...
assertThat(store.databases()).containsOnly(DATABASE.getDatabaseName(), OTHER_DATABASE.getDatabaseName()); // 2 databases assertThat(store.collectionCount()).isEqualTo(5);
assertThat(store.collectionCount()).isEqualTo(9); // 2 databases
Collection products = store.collection(DATABASE.getDatabaseName(), productsTableName()); Collection products = store.collection(DATABASE.getDatabaseName(), productsTableName());
assertThat(products.numberOfCreates()).isEqualTo(0); assertThat(products.numberOfCreates()).isEqualTo(0);
assertThat(products.numberOfUpdates()).isEqualTo(0); assertThat(products.numberOfUpdates()).isEqualTo(0);
@ -452,110 +454,6 @@ public void shouldCreateSnapshotOfSingleDatabaseUsingReadEvents() throws Excepti
} }
} }
private String productsTableName() {
return context.isTableIdCaseInsensitive() ? "products" : "Products";
}
@Test
public void shouldCreateSnapshotOfSingleDatabaseWithSchemaChanges() throws Exception {
config = simpleConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).build();
context = new MySqlTaskContext(config, new Filters.Builder(config).build());
context.start();
reader = new SnapshotReader("snapshot", context);
reader.uponCompletion(completed::countDown);
reader.generateInsertEvents();
// Start the snapshot ...
reader.start();
// Poll for records ...
// Testing.Print.enable();
List<SourceRecord> records = null;
KeyValueStore store = KeyValueStore.createForTopicsBeginningWith(DATABASE.getServerName() + ".");
SchemaChangeHistory schemaChanges = new SchemaChangeHistory(DATABASE.getServerName());
while ((records = reader.poll()) != null) {
records.forEach(record -> {
VerifyRecord.isValid(record);
VerifyRecord.hasNoSourceQuery(record);
store.add(record);
schemaChanges.add(record);
});
}
// The last poll should always return null ...
assertThat(records).isNull();
// There should be 11 schema changes plus 1 SET statement ...
assertThat(schemaChanges.recordCount()).isEqualTo(14);
assertThat(schemaChanges.databaseCount()).isEqualTo(2);
assertThat(schemaChanges.databases()).containsOnly(DATABASE.getDatabaseName(), "");
// Check the records via the store ...
assertThat(store.collectionCount()).isEqualTo(5);
Collection products = store.collection(DATABASE.getDatabaseName(), productsTableName());
assertThat(products.numberOfCreates()).isEqualTo(9);
assertThat(products.numberOfUpdates()).isEqualTo(0);
assertThat(products.numberOfDeletes()).isEqualTo(0);
assertThat(products.numberOfReads()).isEqualTo(0);
assertThat(products.numberOfTombstones()).isEqualTo(0);
assertThat(products.numberOfKeySchemaChanges()).isEqualTo(1);
assertThat(products.numberOfValueSchemaChanges()).isEqualTo(1);
Collection products_on_hand = store.collection(DATABASE.getDatabaseName(), "products_on_hand");
assertThat(products_on_hand.numberOfCreates()).isEqualTo(9);
assertThat(products_on_hand.numberOfUpdates()).isEqualTo(0);
assertThat(products_on_hand.numberOfDeletes()).isEqualTo(0);
assertThat(products_on_hand.numberOfReads()).isEqualTo(0);
assertThat(products_on_hand.numberOfTombstones()).isEqualTo(0);
assertThat(products_on_hand.numberOfKeySchemaChanges()).isEqualTo(1);
assertThat(products_on_hand.numberOfValueSchemaChanges()).isEqualTo(1);
Collection customers = store.collection(DATABASE.getDatabaseName(), "customers");
assertThat(customers.numberOfCreates()).isEqualTo(4);
assertThat(customers.numberOfUpdates()).isEqualTo(0);
assertThat(customers.numberOfDeletes()).isEqualTo(0);
assertThat(customers.numberOfReads()).isEqualTo(0);
assertThat(customers.numberOfTombstones()).isEqualTo(0);
assertThat(customers.numberOfKeySchemaChanges()).isEqualTo(1);
assertThat(customers.numberOfValueSchemaChanges()).isEqualTo(1);
Collection orders = store.collection(DATABASE.getDatabaseName(), "orders");
assertThat(orders.numberOfCreates()).isEqualTo(5);
assertThat(orders.numberOfUpdates()).isEqualTo(0);
assertThat(orders.numberOfDeletes()).isEqualTo(0);
assertThat(orders.numberOfReads()).isEqualTo(0);
assertThat(orders.numberOfTombstones()).isEqualTo(0);
assertThat(orders.numberOfKeySchemaChanges()).isEqualTo(1);
assertThat(orders.numberOfValueSchemaChanges()).isEqualTo(1);
Collection timetest = store.collection(DATABASE.getDatabaseName(), "dbz_342_timetest");
assertThat(timetest.numberOfCreates()).isEqualTo(1);
assertThat(timetest.numberOfUpdates()).isEqualTo(0);
assertThat(timetest.numberOfDeletes()).isEqualTo(0);
assertThat(timetest.numberOfReads()).isEqualTo(0);
assertThat(timetest.numberOfTombstones()).isEqualTo(0);
assertThat(timetest.numberOfKeySchemaChanges()).isEqualTo(1);
assertThat(timetest.numberOfValueSchemaChanges()).isEqualTo(1);
final List<Struct> timerecords = new ArrayList<>();
timetest.forEach(val -> {
timerecords.add(((Struct) val.value()).getStruct("after"));
});
Struct after = timerecords.get(0);
assertThat(after.get("c1")).isEqualTo(toMicroSeconds("PT517H51M04.78S"));
assertThat(after.get("c2")).isEqualTo(toMicroSeconds("-PT13H14M50S"));
assertThat(after.get("c3")).isEqualTo(toMicroSeconds("-PT733H0M0.001S"));
assertThat(after.get("c4")).isEqualTo(toMicroSeconds("-PT1H59M59.001S"));
assertThat(after.get("c5")).isEqualTo(toMicroSeconds("-PT838H59M58.999999S"));
// Make sure the snapshot completed ...
if (completed.await(10, TimeUnit.SECONDS)) {
// completed the snapshot ...
Testing.print("completed the snapshot");
}
else {
fail("failed to complete the snapshot within 10 seconds");
}
}
@Test(expected = ConnectException.class) @Test(expected = ConnectException.class)
public void shouldCreateSnapshotSchemaOnlyRecovery_exception() throws Exception { public void shouldCreateSnapshotSchemaOnlyRecovery_exception() throws Exception {
config = simpleConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY_RECOVERY).build(); config = simpleConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY_RECOVERY).build();
@ -563,7 +461,7 @@ public void shouldCreateSnapshotSchemaOnlyRecovery_exception() throws Exception
context.start(); context.start();
reader = new SnapshotReader("snapshot", context); reader = new SnapshotReader("snapshot", context);
reader.uponCompletion(completed::countDown); reader.uponCompletion(completed::countDown);
reader.generateInsertEvents(); reader.generateReadEvents();
// Start the snapshot ... // Start the snapshot ...
reader.start(); reader.start();
@ -593,7 +491,7 @@ public void shouldCreateSnapshotSchemaOnlyRecovery() throws Exception {
context.source().setBinlogStartPoint("binlog1", 555); // manually set for happy path testing context.source().setBinlogStartPoint("binlog1", 555); // manually set for happy path testing
reader = new SnapshotReader("snapshot", context); reader = new SnapshotReader("snapshot", context);
reader.uponCompletion(completed::countDown); reader.uponCompletion(completed::countDown);
reader.generateInsertEvents(); reader.generateReadEvents();
// Start the snapshot ... // Start the snapshot ...
reader.start(); reader.start();
@ -640,7 +538,7 @@ public void shouldSnapshotTablesInOrderSpecifiedInTableIncludeList() throws Exce
context.start(); context.start();
reader = new SnapshotReader("snapshot", context); reader = new SnapshotReader("snapshot", context);
reader.uponCompletion(completed::countDown); reader.uponCompletion(completed::countDown);
reader.generateInsertEvents(); reader.generateReadEvents();
// Start the snapshot ... // Start the snapshot ...
reader.start(); reader.start();
// Poll for records ... // Poll for records ...
@ -668,7 +566,7 @@ public void shouldSnapshotTablesInOrderSpecifiedInTablesWhitelist() throws Excep
context.start(); context.start();
reader = new SnapshotReader("snapshot", context); reader = new SnapshotReader("snapshot", context);
reader.uponCompletion(completed::countDown); reader.uponCompletion(completed::countDown);
reader.generateInsertEvents(); reader.generateReadEvents();
// Start the snapshot ... // Start the snapshot ...
reader.start(); reader.start();
// Poll for records ... // Poll for records ...
@ -694,7 +592,7 @@ public void shouldSnapshotTablesInLexicographicalOrder() throws Exception {
context.start(); context.start();
reader = new SnapshotReader("snapshot", context); reader = new SnapshotReader("snapshot", context);
reader.uponCompletion(completed::countDown); reader.uponCompletion(completed::countDown);
reader.generateInsertEvents(); reader.generateReadEvents();
// Start the snapshot ... // Start the snapshot ...
reader.start(); reader.start();
// Poll for records ... // Poll for records ...
@ -731,7 +629,7 @@ public void shouldCreateSnapshotSchemaOnly() throws Exception {
context.start(); context.start();
reader = new SnapshotReader("snapshot", context); reader = new SnapshotReader("snapshot", context);
reader.uponCompletion(completed::countDown); reader.uponCompletion(completed::countDown);
reader.generateInsertEvents(); reader.generateReadEvents();
// Start the snapshot ... // Start the snapshot ...
reader.start(); reader.start();

View File

@ -563,6 +563,7 @@ The value portion of a change event for a change to this table is described for:
* <<mysql-primary-key-updates,Primary key updates>> * <<mysql-primary-key-updates,Primary key updates>>
* <<mysql-delete-events,_delete_ events>> * <<mysql-delete-events,_delete_ events>>
* <<mysql-tombstone-events,Tombstone events>> * <<mysql-tombstone-events,Tombstone events>>
* <<mysql-snapshot-events, Snapshot events>>
// Type: continue // Type: continue
[id="mysql-create-events"] [id="mysql-create-events"]
@ -1024,6 +1025,19 @@ MySQL connector events are designed to work with link:{link-kafka-docs}/#compact
=== Tombstone events === Tombstone events
When a row is deleted, the _delete_ event value still works with log compaction, because Kafka can remove all earlier messages that have that same key. However, for Kafka to remove all messages that have that same key, the message value must be `null`. To make this possible, after {prodname}s MySQL connector emits a _delete_ event, the connector emits a special tombstone event that has the same key but a `null` value. When a row is deleted, the _delete_ event value still works with log compaction, because Kafka can remove all earlier messages that have that same key. However, for Kafka to remove all messages that have that same key, the message value must be `null`. To make this possible, after {prodname}s MySQL connector emits a _delete_ event, the connector emits a special tombstone event that has the same key but a `null` value.
// Type: continue
[id="mysql-snapshot-events"]
=== Snapshot events
The MySql connector emits snapshot events using the "r" operation type (`READ`). In case you want the connector to emit snapshot events as "c" events (`CREATE`, as done incorrectly in earlier versions), this can be achieved using a Simple Message Transforms (SMT).
Configure the Debezium ReadToInsertEvent SMT by adding the SMT configuration details to your connectors configuration.
An example of the configuration is
----
transforms=snapshotasinsert,...
transforms.snapshotasinsert.type=io.debezium.connector.mysql.transforms.ReadToInsertEvent
----
// Type: reference // Type: reference
// ModuleID: how-debezium-mysql-connectors-map-data-types // ModuleID: how-debezium-mysql-connectors-map-data-types
// Title: How {prodname} MySQL connectors map data types // Title: How {prodname} MySQL connectors map data types
@ -2337,10 +2351,6 @@ For each table that you specify, also specify another configuration property: `s
+ +
A possible use case for setting these properties is large, append-only tables. You can specify a `SELECT` statement that sets a specific point for where to start a snapshot, or where to resume a snapshot if a previous snapshot was interrupted. A possible use case for setting these properties is large, append-only tables. You can specify a `SELECT` statement that sets a specific point for where to start a snapshot, or where to resume a snapshot if a previous snapshot was interrupted.
|[[mysql-property-snapshot-events-as-inserts]]<<mysql-property-snapshot-events-as-inserts, `+snapshot.events.as.inserts+`>>
|`true`
|Controls whether or not to mark snapshot events as insert events (`"op": "c"`). If disabled, the Debezium default of marking snapshot events as reads (`"op": "r"`) will be used.
|[[mysql-property-min-row-count-to-stream-results]]<<mysql-property-min-row-count-to-stream-results, `+min.row.count.to.stream.results+`>> |[[mysql-property-min-row-count-to-stream-results]]<<mysql-property-min-row-count-to-stream-results, `+min.row.count.to.stream.results+`>>
|`1000` |`1000`
|During a snapshot, the connector queries each table for which the connector is configured to capture changes. The connector uses each query result to produce a read event that contains data for all rows in that table. This property determines whether the MySQL connector puts results for a table into memory, which is fast but requires large amounts of memory, or streams the results, which can be slower but work for very large tables. The setting of this property specifies the minimum number of rows a table must contain before the connector streams results. + |During a snapshot, the connector queries each table for which the connector is configured to capture changes. The connector uses each query result to produce a read event that contains data for all rows in that table. This property determines whether the MySQL connector puts results for a table into memory, which is fast but requires large amounts of memory, or streams the results, which can be slower but work for very large tables. The setting of this property specifies the minimum number of rows a table must contain before the connector streams results. +