From 67ff871569cf071b57f56336b9637f2a5dad39ac Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Thu, 8 Dec 2022 06:08:54 +0100 Subject: [PATCH] DBZ-5895 Wait for REPLICA IDENTITY set completed --- .../postgresql/RecordsStreamProducerIT.java | 12 ++++++------ .../debezium/connector/postgresql/TestHelper.java | 15 +++++++++++++++ 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java index 264137f6a..05956b5e8 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java @@ -638,7 +638,7 @@ public void shouldReceiveChangesForUpdates() throws Exception { // alter the table and set its replica identity to full the issue another update consumer.expects(1); - TestHelper.execute("ALTER TABLE test_table REPLICA IDENTITY FULL"); + TestHelper.setReplicaIdentityForTable("test_table", "FULL"); executeAndWait("UPDATE test_table set text='update2' WHERE pk=1"); updatedRecord = consumer.remove(); @@ -666,7 +666,7 @@ public void shouldReceiveChangesForUpdates() throws Exception { assertRecordSchemaAndValues(expectedAfter, updatedRecord, Envelope.FieldName.AFTER); // without PK and with REPLICA IDENTITY DEFAULT we will get nothing - TestHelper.execute("ALTER TABLE test_table REPLICA IDENTITY DEFAULT;"); + TestHelper.setReplicaIdentityForTable("test_table", "DEFAULT"); consumer.expects(0); executeAndWaitForNoRecords("UPDATE test_table SET text = 'no_pk_and_default' WHERE pk = 1;"); assertThat(consumer.isEmpty()).isTrue(); @@ -1025,11 +1025,11 @@ public void shouldReceiveChangesForDeletesDependingOnReplicaIdentity() throws Ex VerifyRecord.isValidDelete(record, PK_FIELD, 1); // Without PK we should get delete event with REPLICA IDENTITY FULL - statement = "ALTER TABLE test_table REPLICA IDENTITY FULL;" + - "ALTER TABLE test_table DROP CONSTRAINT test_table_pkey CASCADE;" + + statement = "ALTER TABLE test_table DROP CONSTRAINT test_table_pkey CASCADE;" + "INSERT INTO test_table (pk, text) VALUES (2, 'insert2');" + "DELETE FROM test_table WHERE pk = 2;"; consumer.expects(2); + TestHelper.setReplicaIdentityForTable("test_table", "FULL"); executeAndWait(statement); assertRecordInserted("public.test_table", PK_FIELD, 2); record = consumer.remove(); @@ -1037,10 +1037,10 @@ record = consumer.remove(); VerifyRecord.isValidDelete(record, PK_FIELD, 2); // Without PK and without REPLICA IDENTITY FULL we will not get delete event - statement = "ALTER TABLE test_table REPLICA IDENTITY DEFAULT;" + - "INSERT INTO test_table (pk, text) VALUES (3, 'insert3');" + + statement = "INSERT INTO test_table (pk, text) VALUES (3, 'insert3');" + "DELETE FROM test_table WHERE pk = 3;"; consumer.expects(1); + TestHelper.setReplicaIdentityForTable("test_table", "DEFAULT"); executeAndWait(statement); assertRecordInserted("public.test_table", PK_FIELD, 3); assertThat(consumer.isEmpty()).isTrue(); diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java index 51395b22b..b9da1e862 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java @@ -395,6 +395,21 @@ protected static void waitForDefaultReplicationSlotBeActive() { } } + protected static void setReplicaIdentityForTable(String table, String identity) { + execute(String.format("ALTER TABLE %s REPLICA IDENTITY %s;", table, identity)); + try (PostgresConnection connection = create()) { + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> connection + .prepareQueryAndMap("SELECT relreplident FROM pg_class WHERE oid = ?::regclass;", statement -> { + statement.setString(1, table); + }, rs -> { + if (!rs.next()) { + return false; + } + return identity.toLowerCase().startsWith(rs.getString(1)); + })); + } + } + protected static void assertNoOpenTransactions() throws SQLException { try (PostgresConnection connection = TestHelper.create()) { connection.setAutoCommit(true);