From 652f41f7fb9d300b3afe281e738d94ccae867b56 Mon Sep 17 00:00:00 2001 From: Chris Cranford Date: Fri, 12 Nov 2021 15:26:45 -0500 Subject: [PATCH] DBZ-4272 Add drop column schema change to test --- .../oracle/IncrementalSnapshotIT.java | 2 +- .../postgresql/IncrementalSnapshotIT.java | 4 ++- .../AbstractIncrementalSnapshotTest.java | 26 ++++++++++++++++--- 3 files changed, 27 insertions(+), 5 deletions(-) diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/IncrementalSnapshotIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/IncrementalSnapshotIT.java index a798065ea..ece49591c 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/IncrementalSnapshotIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/IncrementalSnapshotIT.java @@ -127,7 +127,7 @@ protected String pkFieldName() { } @Override - protected String alterTableStatement(String tableName) { + protected String alterTableAddColumnStatement(String tableName) { return "ALTER TABLE " + tableName + " ADD col3 INTEGER DEFAULT 0"; } } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/IncrementalSnapshotIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/IncrementalSnapshotIT.java index fe120b6a3..96282969b 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/IncrementalSnapshotIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/IncrementalSnapshotIT.java @@ -57,7 +57,9 @@ protected Configuration.Builder config() { .with(PostgresConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10) .with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "s1") .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, false) - .with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, "s1.a42:pk1,pk2,pk3,pk4"); + .with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, "s1.a42:pk1,pk2,pk3,pk4") + // DBZ-4272 required to allow dropping columns during incremental snapshots + .with("database.autosave", "conservative"); } @Override diff --git a/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotTest.java b/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotTest.java index 728a75f2b..18f4cf898 100644 --- a/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotTest.java +++ b/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotTest.java @@ -54,10 +54,14 @@ public abstract class AbstractIncrementalSnapshotTest protected abstract Configuration.Builder config(); - protected String alterTableStatement(String tableName) { + protected String alterTableAddColumnStatement(String tableName) { return "ALTER TABLE " + tableName + " add col3 int default 0"; } + protected String alterTableDropColumnStatement(String tableName) { + return "ALTER TABLE " + tableName + " drop column col3"; + } + protected String tableDataCollectionId() { return tableName(); } @@ -419,8 +423,9 @@ public void snapshotProceededBySchemaChange() throws Exception { waitForConnectorToStart(); // Initiate a schema change to the table immediately before the adhoc-snapshot + // Adds a new column to the table; this column will be dropped later in this test. try (JdbcConnection connection = databaseConnection()) { - connection.execute(alterTableStatement(tableName())); + connection.execute(alterTableAddColumnStatement(tableName())); } // Some connectors, such as PostgreSQL won't be notified of the previous schema change @@ -428,7 +433,22 @@ public void snapshotProceededBySchemaChange() throws Exception { sendAdHocSnapshotSignal(); final int expectedRecordCount = ROW_COUNT; - final Map dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount); + Map dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount); + for (int i = 0; i < expectedRecordCount; i++) { + Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i)); + } + + // Initiate a schema change to the table immediately before the adhoc-snapshot + // This schema change will drop the previously added column from above. + try (JdbcConnection connection = databaseConnection()) { + connection.execute(alterTableDropColumnStatement(tableName())); + } + + // Some connectors, such as PostgreSQL won't be notified of the previous schema change + // until a DML event occurs, but regardless the incremental snapshot should succeed. + sendAdHocSnapshotSignal(); + + dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount); for (int i = 0; i < expectedRecordCount; i++) { Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i)); }