DBZ-4272 Add drop column schema change to test

This commit is contained in:
Chris Cranford 2021-11-12 15:26:45 -05:00 committed by Gunnar Morling
parent 3a8d1ff838
commit 652f41f7fb
3 changed files with 27 additions and 5 deletions

View File

@ -127,7 +127,7 @@ protected String pkFieldName() {
}
@Override
protected String alterTableStatement(String tableName) {
protected String alterTableAddColumnStatement(String tableName) {
return "ALTER TABLE " + tableName + " ADD col3 INTEGER DEFAULT 0";
}
}

View File

@ -57,7 +57,9 @@ protected Configuration.Builder config() {
.with(PostgresConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10)
.with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "s1")
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, false)
.with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, "s1.a42:pk1,pk2,pk3,pk4");
.with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, "s1.a42:pk1,pk2,pk3,pk4")
// DBZ-4272 required to allow dropping columns during incremental snapshots
.with("database.autosave", "conservative");
}
@Override

View File

@ -54,10 +54,14 @@ public abstract class AbstractIncrementalSnapshotTest<T extends SourceConnector>
protected abstract Configuration.Builder config();
protected String alterTableStatement(String tableName) {
protected String alterTableAddColumnStatement(String tableName) {
return "ALTER TABLE " + tableName + " add col3 int default 0";
}
protected String alterTableDropColumnStatement(String tableName) {
return "ALTER TABLE " + tableName + " drop column col3";
}
protected String tableDataCollectionId() {
return tableName();
}
@ -419,8 +423,9 @@ public void snapshotProceededBySchemaChange() throws Exception {
waitForConnectorToStart();
// Initiate a schema change to the table immediately before the adhoc-snapshot
// Adds a new column to the table; this column will be dropped later in this test.
try (JdbcConnection connection = databaseConnection()) {
connection.execute(alterTableStatement(tableName()));
connection.execute(alterTableAddColumnStatement(tableName()));
}
// Some connectors, such as PostgreSQL won't be notified of the previous schema change
@ -428,7 +433,22 @@ public void snapshotProceededBySchemaChange() throws Exception {
sendAdHocSnapshotSignal();
final int expectedRecordCount = ROW_COUNT;
final Map<Integer, Integer> dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount);
Map<Integer, Integer> dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount);
for (int i = 0; i < expectedRecordCount; i++) {
Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i));
}
// Initiate a schema change to the table immediately before the adhoc-snapshot
// This schema change will drop the previously added column from above.
try (JdbcConnection connection = databaseConnection()) {
connection.execute(alterTableDropColumnStatement(tableName()));
}
// Some connectors, such as PostgreSQL won't be notified of the previous schema change
// until a DML event occurs, but regardless the incremental snapshot should succeed.
sendAdHocSnapshotSignal();
dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount);
for (int i = 0; i < expectedRecordCount; i++) {
Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i));
}