DBZ-2397 Test for restart in tx

This commit is contained in:
Jiri Pechanec 2020-08-10 08:25:00 +02:00
parent aead9a6612
commit 4d0ec1f848

View File

@ -1738,6 +1738,50 @@ public void stopInTheMiddleOfTxAndResume() throws Exception {
}
}
@Test
@FixFor("DBZ-2397")
@SkipWhenDecoderPluginNameIs(value = SkipWhenDecoderPluginNameIs.DecoderPluginName.WAL2JSON, reason = "wal2json cannot resume transaction in the middle of processing")
public void restartConnectorInTheMiddleOfUncommittedTx() throws Exception {
Testing.Print.enable();
final PostgresConnection tx1Connection = TestHelper.create();
tx1Connection.setAutoCommit(false);
final PostgresConnection tx2Connection = TestHelper.create();
tx2Connection.setAutoCommit(true);
startConnector(config -> config.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, false), true);
waitForStreamingToStart();
tx1Connection.executeWithoutCommitting("INSERT INTO test_table (text) VALUES ('tx-1-1')");
tx2Connection.execute("INSERT INTO test_table (text) VALUES ('tx-2-1')");
consumer = testConsumer(1);
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
assertThat(((Struct) consumer.remove().value()).getStruct("after").getString("text")).isEqualTo("tx-2-1");
stopConnector();
startConnector(Function.identity(), false);
waitForStreamingToStart();
tx1Connection.executeWithoutCommitting("INSERT INTO test_table (text) VALUES ('tx-1-2')");
tx2Connection.execute("INSERT INTO test_table (text) VALUES ('tx-2-2')");
tx1Connection.executeWithoutCommitting("INSERT INTO test_table (text) VALUES ('tx-1-3')");
tx2Connection.execute("INSERT INTO test_table (text) VALUES ('tx-2-3')");
tx1Connection.commit();
consumer = testConsumer(5);
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
assertThat(((Struct) consumer.remove().value()).getStruct("after").getString("text")).isEqualTo("tx-2-2");
assertThat(((Struct) consumer.remove().value()).getStruct("after").getString("text")).isEqualTo("tx-2-3");
assertThat(((Struct) consumer.remove().value()).getStruct("after").getString("text")).isEqualTo("tx-1-1");
assertThat(((Struct) consumer.remove().value()).getStruct("after").getString("text")).isEqualTo("tx-1-2");
assertThat(((Struct) consumer.remove().value()).getStruct("after").getString("text")).isEqualTo("tx-1-3");
}
@Test
@FixFor("DBZ-1730")
public void shouldStartConsumingFromSlotLocation() throws Exception {