DBZ-6828 Process drop table events during blocking snapshot

This commit is contained in:
mfvitale 2023-09-18 16:03:37 +02:00 committed by Jiri Pechanec
parent ae199d2053
commit 68eb4b2df2
7 changed files with 94 additions and 20 deletions

View File

@ -309,6 +309,7 @@ private void emitChangeEvent(MySqlPartition partition, MySqlOffsetContext offset
((TableAlteredEvent) event).previousTableId());
}
else {
Table table = getTable(tableId, type);
schemaChangeEvent = SchemaChangeEvent.of(
type,
partition,
@ -316,12 +317,26 @@ private void emitChangeEvent(MySqlPartition partition, MySqlOffsetContext offset
sanitizedDbName,
null,
event.statement(),
tableId != null ? tables().forTable(tableId) : null,
table,
snapshot);
}
schemaChangeEvents.add(schemaChangeEvent);
}
private Table getTable(TableId tableId, SchemaChangeEventType type) {
if (tableId == null) {
return null;
}
if (SchemaChangeEventType.DROP == type) {
// DROP events don't have information about tableChanges, so we are creating a Table object with just the tableId to be use
// during blocking snapshot to filter out drop events not related to table to be snapshotted.
return Table.editor().tableId(tableId).create();
}
return tables().forTable(tableId);
}
private boolean acceptableDatabase(final String databaseName) {
return !storeOnlyCapturedTables()
|| filters.databaseFilter().test(databaseName)

View File

@ -5,6 +5,8 @@
*/
package io.debezium.connector.mysql;
import static java.util.function.Predicate.not;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
@ -319,8 +321,13 @@ else if (!connectorConfig.getSnapshotMode().shouldStream()) {
private void addSchemaEvent(RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> snapshotContext,
String database, String ddl) {
schemaEvents.addAll(databaseSchema.parseSnapshotDdl(snapshotContext.partition, ddl, database,
snapshotContext.offset, clock.currentTimeAsInstant()));
List<SchemaChangeEvent> schemaChangeEvents = databaseSchema.parseSnapshotDdl(snapshotContext.partition, ddl, database,
snapshotContext.offset, clock.currentTimeAsInstant());
List<SchemaChangeEvent> missingSchemaChangeEvents = schemaChangeEvents.stream()
.filter(not(schemaEvents::contains))
.collect(Collectors.toList());
schemaEvents.addAll(missingSchemaChangeEvents);
}
@Override

View File

@ -11,11 +11,13 @@
import java.sql.SQLException;
import java.util.List;
import org.jetbrains.annotations.NotNull;
import org.junit.After;
import org.junit.Before;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.junit.MySqlDatabaseVersionResolver;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.AbstractBlockingSnapshotTest;
import io.debezium.relational.TableId;
@ -25,7 +27,9 @@
public class BlockingSnapshotIT extends AbstractBlockingSnapshotTest {
protected static final String SERVER_NAME = "is_test";
public static final int MYSQL8 = 8;
protected final UniqueDatabase DATABASE = new UniqueDatabase(SERVER_NAME, "blocking_snapshot_test", "1", null).withDbHistoryPath(SCHEMA_HISTORY_PATH);
private final MySqlDatabaseVersionResolver databaseVersionResolver = new MySqlDatabaseVersionResolver();
@Before
public void before() throws SQLException {
@ -71,7 +75,7 @@ protected Configuration.Builder mutableConfig(boolean signalTableOnly, boolean s
.with(MySqlConnectorConfig.USER, "mysqluser")
.with(MySqlConnectorConfig.PASSWORD, "mysqlpw")
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL.getValue())
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
.with(MySqlConnectorConfig.SIGNAL_DATA_COLLECTION, DATABASE.qualifiedTableName("debezium_signal"))
.with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 5)
.with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, storeOnlyCapturedDdl)
@ -79,6 +83,13 @@ protected Configuration.Builder mutableConfig(boolean signalTableOnly, boolean s
.with(CommonConnectorConfig.SCHEMA_NAME_ADJUSTMENT_MODE, CommonConnectorConfig.SchemaNameAdjustmentMode.AVRO);
}
@Override
protected Configuration.Builder historizedMutableConfig(boolean signalTableOnly, boolean storeOnlyCapturedDdl) {
return mutableConfig(signalTableOnly, storeOnlyCapturedDdl)
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true);
}
@Override
protected String connector() {
return "mysql";
@ -158,11 +169,24 @@ protected int expectedDdlsCount() {
protected void assertDdl(List<String> schemaChangesDdls) {
assertThat(schemaChangesDdls.get(schemaChangesDdls.size() - 2)).isEqualTo("DROP TABLE IF EXISTS `blocking_snapshot_test_1`.`b`");
assertThat(schemaChangesDdls.get(schemaChangesDdls.size() - 1)).isEqualTo("\"CREATE TABLE `b` (\n" +
" `pk` int NOT NULL AUTO_INCREMENT,\n" +
" `aa` int DEFAULT NULL,\n" +
" PRIMARY KEY (`pk`)\n" +
") ENGINE=InnoDB AUTO_INCREMENT=1001 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci\"");
assertThat(schemaChangesDdls.get(schemaChangesDdls.size() - 1)).isEqualTo(getDdlString(databaseVersionResolver));
}
@NotNull
private static String getDdlString(MySqlDatabaseVersionResolver databaseVersionResolver) {
return databaseVersionResolver.getVersion().getMajor() < MYSQL8 ? "CREATE TABLE `b` (\n" +
" `pk` int(11) NOT NULL AUTO_INCREMENT,\n" +
" `aa` int(11) DEFAULT NULL,\n" +
" PRIMARY KEY (`pk`)\n" +
") ENGINE=InnoDB AUTO_INCREMENT=1001 DEFAULT CHARSET=latin1"
: "CREATE TABLE `b` (\n" +
" `pk` int NOT NULL AUTO_INCREMENT,\n" +
" `aa` int DEFAULT NULL,\n" +
" PRIMARY KEY (`pk`)\n" +
") ENGINE=InnoDB AUTO_INCREMENT=1001 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci";
}
}

View File

@ -127,8 +127,7 @@ protected Configuration.Builder config() {
.with(OracleConnectorConfig.SIGNAL_DATA_COLLECTION, TestHelper.getDatabaseName() + ".DEBEZIUM.DEBEZIUM_SIGNAL")
.with(OracleConnectorConfig.SCHEMA_INCLUDE_LIST, "DEBEZIUM")
.with(OracleConnectorConfig.SNAPSHOT_MODE_TABLES, TestHelper.getDatabaseName() + ".DEBEZIUM.A")
.with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true);
.with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, false);
}
@Override
@ -136,6 +135,13 @@ protected Configuration.Builder mutableConfig(boolean signalTableOnly, boolean s
return config();
}
@Override
protected Configuration.Builder historizedMutableConfig(boolean signalTableOnly, boolean storeOnlyCapturedDdl) {
return config()
.with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true);
}
@Override
protected String valueFieldName() {
return "AA";

View File

@ -151,7 +151,7 @@ public SnapshotResult<O> doExecute(ChangeEventSourceContext context, O previousO
if (snapshottingTask.snapshotSchema()) {
LOGGER.info("Snapshot step 6 - Persisting schema history");
createSchemaChangeEventsForTables(context, ctx, snapshottingTask); // TODO check
createSchemaChangeEventsForTables(context, ctx, snapshottingTask);
// if we've been interrupted before, the TX rollback will cause any locks to be released
releaseSchemaSnapshotLocks(ctx);

View File

@ -125,6 +125,24 @@ public TableChanges getTableChanges() {
return tableChanges;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SchemaChangeEvent that = (SchemaChangeEvent) o;
return Objects.equals(database, that.database) && Objects.equals(schema, that.schema) && Objects.equals(ddl,
that.ddl) && type == that.type;
}
@Override
public int hashCode() {
return Objects.hash(database, schema, ddl, type);
}
@Override
public String toString() {
return "SchemaChangeEvent [database=" + database + ", schema=" + schema + ", ddl=" + ddl + ", tables=" + tables

View File

@ -40,7 +40,6 @@
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.snapshot.incremental.AbstractSnapshotTest;
import io.debezium.util.Testing;
public abstract class AbstractBlockingSnapshotTest extends AbstractSnapshotTest {
private int signalingRecords;
@ -63,6 +62,10 @@ public abstract class AbstractBlockingSnapshotTest extends AbstractSnapshotTest
@Override
protected abstract String server();
protected Configuration.Builder historizedMutableConfig(boolean signalTableOnly, boolean storeOnlyCapturedDdl) {
return mutableConfig(signalTableOnly, storeOnlyCapturedDdl);
}
@Test
public void executeBlockingSnapshot() throws Exception {
// Testing.Print.enable();
@ -75,7 +78,8 @@ public void executeBlockingSnapshot() throws Exception {
insertRecords(ROW_COUNT, ROW_COUNT);
assertRecordsFromSnapshotAndStreamingArePresent(ROW_COUNT * 2, consumeRecordsByTopic(ROW_COUNT * 2, 10));
SourceRecords consumedRecordsByTopic = consumeRecordsByTopic(ROW_COUNT * 2, 10);
assertRecordsFromSnapshotAndStreamingArePresent(ROW_COUNT * 2, consumedRecordsByTopic);
sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey("", "", BLOCKING, tableDataCollectionId());
@ -119,8 +123,9 @@ public void executeBlockingSnapshotWhileStreaming() throws Exception {
signalingRecords = 1; // from streaming
SourceRecords consumeRecordsByTopic = consumeRecordsByTopic((int) ((ROW_COUNT * 3) + totalSnapshotRecords + signalingRecords), 10);
assertRecordsWithValuesPresent((int) ((ROW_COUNT * 3) + totalSnapshotRecords),
getExpectedValues(totalSnapshotRecords), topicName(), consumeRecordsByTopic((int) ((ROW_COUNT * 3) + totalSnapshotRecords + signalingRecords), 10));
getExpectedValues(totalSnapshotRecords), topicName(), consumeRecordsByTopic);
}
@Test
@ -141,8 +146,9 @@ public void executeBlockingSnapshotWithAdditionalCondition() throws Exception {
signalingRecords = 1; // from streaming
SourceRecords consumedRecordsByTopic = consumeRecordsByTopic(500 + signalingRecords, 10);
assertRecordsWithValuesPresent(500, IntStream.rangeClosed(0, 499).boxed().collect(Collectors.toList()), topicNames().get(1).toString(),
consumeRecordsByTopic(500 + signalingRecords, 10));
consumedRecordsByTopic);
}
@ -151,11 +157,11 @@ public void executeBlockingSnapshotWithAdditionalCondition() throws Exception {
@SkipWhenConnectorUnderTest(check = EqualityCheck.EQUAL, value = SkipWhenConnectorUnderTest.Connector.SQL_SERVER)
@SkipWhenConnectorUnderTest(check = EqualityCheck.EQUAL, value = SkipWhenConnectorUnderTest.Connector.DB2)
public void readsSchemaOnlyForSignaledTables() throws Exception {
Testing.Print.enable();
// Testing.Print.enable();
populateTable(tableNames().get(1).toString());
startConnectorWithSnapshot(x -> mutableConfig(false, false));
startConnectorWithSnapshot(x -> historizedMutableConfig(false, false));
waitForStreamingRunning(connector(), server(), getStreamingNamespace(), task());
@ -176,8 +182,6 @@ public void readsSchemaOnlyForSignaledTables() throws Exception {
.map(sourceRecord -> ((Struct) sourceRecord.value()).getString("ddl"))
.collect(Collectors.toList());
Testing.print(ddls);
assertDdl(ddls);
}