DBZ-5739 Make validation mandatory; more robust tests

This commit is contained in:
Jiri Pechanec 2022-10-20 13:31:01 +02:00
parent 822f87147c
commit b4c076a27c
5 changed files with 44 additions and 46 deletions

View File

@ -334,6 +334,7 @@ Plugaru Tudor
Poonam Meghnani
Pradeep Mamillapalli
Prannoy Mittal
Praveen Burgu
Preethi Sadagopan
pushpavanthar
Qishang Zhong

View File

@ -29,6 +29,7 @@
import io.debezium.function.Predicates;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
import io.debezium.relational.RelationalChangeRecordEmitter;
@ -37,6 +38,7 @@
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema;
import io.debezium.schema.DataCollectionSchema;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import io.debezium.util.Strings;
@ -234,8 +236,8 @@ private void refreshTableFromDatabase(TableId tableId) {
}
}
static Optional<DataCollectionSchema> updateSchema(PostgresPartition partition, TableId tableId, ChangeRecordEmitter<PostgresPartition> changeRecordEmitter) {
return ((PostgresChangeRecordEmitter) changeRecordEmitter).newTable(tableId);
static Optional<DataCollectionSchema> updateSchema(Partition partition, DataCollectionId tableId, ChangeRecordEmitter<PostgresPartition> changeRecordEmitter) {
return ((PostgresChangeRecordEmitter) changeRecordEmitter).newTable((TableId) tableId);
}
private boolean schemaChanged(List<ReplicationMessage.Column> columns, Table table) {

View File

@ -506,16 +506,6 @@ public static SchemaRefreshMode parse(String value) {
"Whether or not to drop the logical replication slot when the connector finishes orderly. " +
"By default the replication is kept so that on restart progress can resume from the last recorded location");
public static final Field SLOT_SEEK_TO_KNOWN_OFFSET = Field.create("slot.seek.to.known.offset.on.start")
.withDisplayName("Seek to last know offset on the replication slot")
.withType(Type.BOOLEAN)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_REPLICATION, 3))
.withDefault(false)
.withImportance(Importance.HIGH)
.withDescription(
"Whether or not to seek to the last known offset on the replication slot." +
"Enabling this option results in startup failure if the slot is re-created instead of data loss.");
public static final Field PUBLICATION_NAME = Field.create("publication.name")
.withDisplayName("Publication")
.withType(Type.STRING)
@ -904,10 +894,6 @@ protected boolean dropSlotOnStop() {
return getConfig().getBoolean(DROP_SLOT_ON_STOP);
}
public boolean slotSeekToKnownOffsetOnStart() {
return getConfig().getBoolean(SLOT_SEEK_TO_KNOWN_OFFSET);
}
public String publicationName() {
return getConfig().getString(PUBLICATION_NAME);
}

View File

@ -325,30 +325,7 @@ public ReplicationStream startStreaming(Lsn offset, WalPositionLocator walPositi
LOGGER.debug("starting streaming from LSN '{}'", lsn);
}
if (connectorConfig.slotSeekToKnownOffsetOnStart()) {
try {
Statement stmt = pgConnection().createStatement();
String seekCommand = String.format(
"SELECT pg_replication_slot_advance('%s', '%s')",
slotName,
lsn.asString());
LOGGER.info("Seeking to {} on the replication slot with command {}", lsn, seekCommand);
stmt.execute(seekCommand);
}
catch (PSQLException e) {
if (e.getMessage().matches("ERROR: function pg_replication_slot_advance.*does not exist(.|\\n)*")) {
LOGGER.warn("Postgres server doesn't support the command pg_replication_slot_advance(). Not seeking to last known offset.");
}
else if (e.getMessage().matches("ERROR: cannot advance replication slot to.*")) {
throw new DebeziumException(
String.format("Cannot seek to the last known offset '%s' on replication slot '%s'. Error from server: %s", lsn.asString(), slotName,
e.getMessage()));
}
else {
throw e;
}
}
}
validateSlotIsInExpectedState(lsn);
final int maxRetries = connectorConfig.maxRetries();
final Duration delay = connectorConfig.retryDelay();
@ -374,6 +351,31 @@ else if (e.getMessage().matches("ERROR: cannot advance replication slot to.*"))
}
}
protected void validateSlotIsInExpectedState(Lsn lsn) throws SQLException, PSQLException {
try {
Statement stmt = pgConnection().createStatement();
String seekCommand = String.format(
"SELECT pg_replication_slot_advance('%s', '%s')",
slotName,
lsn.asString());
LOGGER.info("Seeking to {} on the replication slot with command {}", lsn, seekCommand);
stmt.execute(seekCommand);
}
catch (PSQLException e) {
if (e.getMessage().matches("ERROR: function pg_replication_slot_advance.*does not exist(.|\\n)*")) {
LOGGER.info("Postgres server doesn't support the command pg_replication_slot_advance(). Not seeking to last known offset.");
}
else if (e.getMessage().matches("ERROR: cannot advance replication slot to.*")) {
throw new DebeziumException(
String.format("Cannot seek to the last known offset '%s' on replication slot '%s'. Error from server: %s", lsn.asString(), slotName,
e.getMessage()));
}
else {
throw e;
}
}
}
@Override
public void initConnection() throws SQLException, InterruptedException {
// See https://www.postgresql.org/docs/current/logical-replication-quick-setup.html

View File

@ -2941,6 +2941,8 @@ public void shouldSuppressLoggingOptionalOfExcludedColumns() throws Exception {
@FixFor("DBZ-5739")
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 11, reason = "This needs pg_replication_slot_advance which is supported only on Postgres 11+")
public void shouldStopConnectorOnSlotRecreation() throws InterruptedException {
final LogInterceptor logInterceptor = new LogInterceptor(PostgresConnectorIT.class);
TestHelper.execute(SETUP_TABLES_STMT);
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_ONLY.name())
@ -2959,18 +2961,20 @@ public void shouldStopConnectorOnSlotRecreation() throws InterruptedException {
configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.name())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
.with(PostgresConnectorConfig.SLOT_SEEK_TO_KNOWN_OFFSET, Boolean.TRUE);
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE);
start(PostgresConnector.class, configBuilder.build());
Thread.sleep(1000);
Awaitility.await().atMost(TestHelper.waitTimeForRecords() * 5, TimeUnit.SECONDS)
.until(() -> logInterceptor.containsStacktraceElement("Cannot seek to the last known offset "));
assertConnectorNotRunning();
}
@Test
@FixFor("DBZ-5739")
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 11, reason = "This needs pg_replication_slot_advance which is supported only on Postgres 11+")
@SkipWhenDatabaseVersion(check = EqualityCheck.GREATER_THAN_OR_EQUAL, major = 11, reason = "This pg_replication_slot_advance is not present Postgres 10")
public void shouldSeekToCorrectOffset() throws InterruptedException {
final LogInterceptor logInterceptor = new LogInterceptor(PostgresReplicationConnection.class);
TestHelper.execute(SETUP_TABLES_STMT);
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL.name())
@ -2989,12 +2993,15 @@ public void shouldSeekToCorrectOffset() throws InterruptedException {
configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.name())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
.with(PostgresConnectorConfig.SLOT_SEEK_TO_KNOWN_OFFSET, Boolean.TRUE);
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE);
start(PostgresConnector.class, configBuilder.build());
consumeRecordsByTopic(1);
assertConnectorIsRunning();
Awaitility.await().atMost(TestHelper.waitTimeForRecords() * 5, TimeUnit.SECONDS)
.until(() -> logInterceptor
.containsMessage("Postgres server doesn't support the command pg_replication_slot_advance(). Not seeking to last known offset."));
}
private Predicate<SourceRecord> stopOnPKPredicate(int pkValue) {