DBZ-2288 Test for pgoutput

This commit is contained in:
Jiri Pechanec 2020-07-14 14:51:03 +02:00
parent 8b0b252eb9
commit ea942a4268

View File

@ -1121,6 +1121,7 @@ public void shouldAllowForExportedSnapshot() throws Exception {
@Test
@FixFor("DBZ-2288")
@SkipWhenDecoderPluginNameIs(value = SkipWhenDecoderPluginNameIs.DecoderPluginName.PGOUTPUT, reason = "PgOutput needs publication for manually created slot")
public void exportedSnapshotShouldNotSkipRecordOfParallelTx() throws Exception {
TestHelper.dropDefaultReplicationSlot();
TestHelper.createDefaultReplicationSlot();
@ -1156,7 +1157,57 @@ public void exportedSnapshotShouldNotSkipRecordOfParallelTx() throws Exception {
// Consume records from the snapshot
SourceRecords actualRecords = consumeRecordsByTopic(4);
pgConnection.commit();
// Consume records from concurrent transactions
actualRecords = consumeRecordsByTopic(4);
List<SourceRecord> s1recs = actualRecords.recordsForTopic(topicName("s1.a"));
List<SourceRecord> s2recs = actualRecords.recordsForTopic(topicName("s1.a"));
s2recs = actualRecords.recordsForTopic(topicName("s2.a"));
assertThat(s1recs.size()).isEqualTo(2);
assertThat(s2recs.size()).isEqualTo(2);
stopConnector();
TestHelper.dropDefaultReplicationSlot();
}
@Test
@FixFor("DBZ-2288")
@SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Publication not supported")
public void exportedSnapshotShouldNotSkipRecordOfParallelTxPgoutput() throws Exception {
TestHelper.dropDefaultReplicationSlot();
TestHelper.createDefaultReplicationSlot();
TestHelper.execute("CREATE PUBLICATION dbz_publication FOR ALL TABLES;");
// Testing.Print.enable();
TestHelper.execute(SETUP_TABLES_STMT);
TestHelper.execute(INSERT_STMT);
Configuration config = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.EXPORTED.getValue())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE)
.with(PostgresConnectorConfig.MAX_QUEUE_SIZE, 2)
.with(PostgresConnectorConfig.MAX_BATCH_SIZE, 1)
.build();
final PostgresConnection pgConnection = TestHelper.create();
pgConnection.setAutoCommit(false);
pgConnection.executeWithoutCommitting(INSERT_STMT);
final AtomicBoolean inserted = new AtomicBoolean();
start(PostgresConnector.class, config, loggingCompletion(), x -> false, x -> {
if (!inserted.get()) {
TestHelper.execute(INSERT_STMT);
try {
pgConnection.commit();
}
catch (Exception e) {
e.printStackTrace();
}
inserted.set(true);
}
});
assertConnectorIsRunning();
// Consume records from the snapshot
SourceRecords actualRecords = consumeRecordsByTopic(4);
// Consume records from concurrent transactions
actualRecords = consumeRecordsByTopic(4);
@ -1166,6 +1217,10 @@ public void exportedSnapshotShouldNotSkipRecordOfParallelTx() throws Exception {
s2recs = actualRecords.recordsForTopic(topicName("s2.a"));
assertThat(s1recs.size()).isEqualTo(2);
assertThat(s2recs.size()).isEqualTo(2);
stopConnector();
TestHelper.dropPublication();
TestHelper.dropDefaultReplicationSlot();
}
@Test