DBZ-5895 Wait for REPLICA IDENTITY set completed

This commit is contained in:
Jiri Pechanec 2022-12-08 06:08:54 +01:00
parent f437788325
commit 67ff871569
2 changed files with 21 additions and 6 deletions

View File

@ -638,7 +638,7 @@ public void shouldReceiveChangesForUpdates() throws Exception {
// alter the table and set its replica identity to full the issue another update // alter the table and set its replica identity to full the issue another update
consumer.expects(1); consumer.expects(1);
TestHelper.execute("ALTER TABLE test_table REPLICA IDENTITY FULL"); TestHelper.setReplicaIdentityForTable("test_table", "FULL");
executeAndWait("UPDATE test_table set text='update2' WHERE pk=1"); executeAndWait("UPDATE test_table set text='update2' WHERE pk=1");
updatedRecord = consumer.remove(); updatedRecord = consumer.remove();
@ -666,7 +666,7 @@ public void shouldReceiveChangesForUpdates() throws Exception {
assertRecordSchemaAndValues(expectedAfter, updatedRecord, Envelope.FieldName.AFTER); assertRecordSchemaAndValues(expectedAfter, updatedRecord, Envelope.FieldName.AFTER);
// without PK and with REPLICA IDENTITY DEFAULT we will get nothing // without PK and with REPLICA IDENTITY DEFAULT we will get nothing
TestHelper.execute("ALTER TABLE test_table REPLICA IDENTITY DEFAULT;"); TestHelper.setReplicaIdentityForTable("test_table", "DEFAULT");
consumer.expects(0); consumer.expects(0);
executeAndWaitForNoRecords("UPDATE test_table SET text = 'no_pk_and_default' WHERE pk = 1;"); executeAndWaitForNoRecords("UPDATE test_table SET text = 'no_pk_and_default' WHERE pk = 1;");
assertThat(consumer.isEmpty()).isTrue(); assertThat(consumer.isEmpty()).isTrue();
@ -1025,11 +1025,11 @@ public void shouldReceiveChangesForDeletesDependingOnReplicaIdentity() throws Ex
VerifyRecord.isValidDelete(record, PK_FIELD, 1); VerifyRecord.isValidDelete(record, PK_FIELD, 1);
// Without PK we should get delete event with REPLICA IDENTITY FULL // Without PK we should get delete event with REPLICA IDENTITY FULL
statement = "ALTER TABLE test_table REPLICA IDENTITY FULL;" + statement = "ALTER TABLE test_table DROP CONSTRAINT test_table_pkey CASCADE;" +
"ALTER TABLE test_table DROP CONSTRAINT test_table_pkey CASCADE;" +
"INSERT INTO test_table (pk, text) VALUES (2, 'insert2');" + "INSERT INTO test_table (pk, text) VALUES (2, 'insert2');" +
"DELETE FROM test_table WHERE pk = 2;"; "DELETE FROM test_table WHERE pk = 2;";
consumer.expects(2); consumer.expects(2);
TestHelper.setReplicaIdentityForTable("test_table", "FULL");
executeAndWait(statement); executeAndWait(statement);
assertRecordInserted("public.test_table", PK_FIELD, 2); assertRecordInserted("public.test_table", PK_FIELD, 2);
record = consumer.remove(); record = consumer.remove();
@ -1037,10 +1037,10 @@ record = consumer.remove();
VerifyRecord.isValidDelete(record, PK_FIELD, 2); VerifyRecord.isValidDelete(record, PK_FIELD, 2);
// Without PK and without REPLICA IDENTITY FULL we will not get delete event // Without PK and without REPLICA IDENTITY FULL we will not get delete event
statement = "ALTER TABLE test_table REPLICA IDENTITY DEFAULT;" + statement = "INSERT INTO test_table (pk, text) VALUES (3, 'insert3');" +
"INSERT INTO test_table (pk, text) VALUES (3, 'insert3');" +
"DELETE FROM test_table WHERE pk = 3;"; "DELETE FROM test_table WHERE pk = 3;";
consumer.expects(1); consumer.expects(1);
TestHelper.setReplicaIdentityForTable("test_table", "DEFAULT");
executeAndWait(statement); executeAndWait(statement);
assertRecordInserted("public.test_table", PK_FIELD, 3); assertRecordInserted("public.test_table", PK_FIELD, 3);
assertThat(consumer.isEmpty()).isTrue(); assertThat(consumer.isEmpty()).isTrue();

View File

@ -395,6 +395,21 @@ protected static void waitForDefaultReplicationSlotBeActive() {
} }
} }
protected static void setReplicaIdentityForTable(String table, String identity) {
execute(String.format("ALTER TABLE %s REPLICA IDENTITY %s;", table, identity));
try (PostgresConnection connection = create()) {
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> connection
.prepareQueryAndMap("SELECT relreplident FROM pg_class WHERE oid = ?::regclass;", statement -> {
statement.setString(1, table);
}, rs -> {
if (!rs.next()) {
return false;
}
return identity.toLowerCase().startsWith(rs.getString(1));
}));
}
}
protected static void assertNoOpenTransactions() throws SQLException { protected static void assertNoOpenTransactions() throws SQLException {
try (PostgresConnection connection = TestHelper.create()) { try (PostgresConnection connection = TestHelper.create()) {
connection.setAutoCommit(true); connection.setAutoCommit(true);