DBZ-1052 Test for limited Postgres/protobuf
This commit is contained in:
parent
fcafd2f193
commit
c466143f35
@ -95,7 +95,7 @@ public static ReplicationConnection createForReplication(String slotName, boolea
|
||||
*/
|
||||
public static PostgresConnectorConfig.LogicalDecoder decoderPlugin() {
|
||||
final String s = System.getProperty(PostgresConnectorConfig.PLUGIN_NAME.name());
|
||||
return (s == null || s.length() == 0) ? PostgresConnectorConfig.LogicalDecoder.WAL2JSON_STREAMING : PostgresConnectorConfig.LogicalDecoder.parse(s);
|
||||
return (s == null || s.length() == 0) ? PostgresConnectorConfig.LogicalDecoder.DECODERBUFS : PostgresConnectorConfig.LogicalDecoder.parse(s);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -23,7 +23,7 @@
|
||||
import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode;
|
||||
import io.debezium.connector.postgresql.junit.SkipTestDependingOnDecoderPluginNameRule;
|
||||
import io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIs;
|
||||
import io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIs.DecoderPluginName;
|
||||
import io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIsNot;
|
||||
import io.debezium.embedded.AbstractConnectorTest;
|
||||
import io.debezium.util.Testing;
|
||||
|
||||
@ -61,7 +61,7 @@ public void after() {
|
||||
}
|
||||
|
||||
@Test
|
||||
@SkipWhenDecoderPluginNameIs(value = DecoderPluginName.DECODERBUFS, reason = "Only pgoutput plguin has enabled BEGIN/COMMIT messages")
|
||||
@SkipWhenDecoderPluginNameIs(value = io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIs.DecoderPluginName.DECODERBUFS, reason = "Only pgoutput plguin has enabled BEGIN/COMMIT messages")
|
||||
public void transactionMetadata() throws InterruptedException {
|
||||
Testing.Print.enable();
|
||||
|
||||
@ -92,4 +92,36 @@ public void transactionMetadata() throws InterruptedException {
|
||||
assertRecordTransactionMetadata(records.get(2), txId, 2, 1);
|
||||
assertEndTransaction(records.get(3), txId, 2);
|
||||
}
|
||||
|
||||
@Test
|
||||
@SkipWhenDecoderPluginNameIsNot(value = io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIsNot.DecoderPluginName.DECODERBUFS, reason = "Only pgoutput plguin has enabled BEGIN/COMMIT messages")
|
||||
public void transactionMetadataForProtobuf() throws InterruptedException {
|
||||
Testing.Print.enable();
|
||||
|
||||
TestHelper.dropDefaultReplicationSlot();
|
||||
TestHelper.execute(SETUP_TABLES_STMT);
|
||||
Configuration config = TestHelper.defaultConfig()
|
||||
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue())
|
||||
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
|
||||
.with(PostgresConnectorConfig.PROVIDE_TRANSACTION_METADATA, true)
|
||||
.build();
|
||||
start(PostgresConnector.class, config);
|
||||
assertConnectorIsRunning();
|
||||
TestHelper.waitForDefaultReplicationSlotBeActive();
|
||||
|
||||
waitForAvailableRecords(100, TimeUnit.MILLISECONDS);
|
||||
// there shouldn't be any snapshot records
|
||||
assertNoRecordsToConsume();
|
||||
|
||||
// insert and verify 2 new records
|
||||
TestHelper.execute(INSERT_STMT);
|
||||
|
||||
// BEGIN, 2 * data, END
|
||||
final List<SourceRecord> records = consumeRecordsByTopic(3).allRecordsInOrder();
|
||||
|
||||
Assertions.assertThat(records).hasSize(3);
|
||||
final String txId = assertBeginTransaction(records.get(0));
|
||||
assertRecordTransactionMetadata(records.get(1), txId, 1, 1);
|
||||
assertRecordTransactionMetadata(records.get(2), txId, 2, 1);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user