diff --git a/COPYRIGHT.txt b/COPYRIGHT.txt index 902b44739..8b6515fbd 100644 --- a/COPYRIGHT.txt +++ b/COPYRIGHT.txt @@ -334,6 +334,7 @@ Plugaru Tudor Poonam Meghnani Pradeep Mamillapalli Prannoy Mittal +Praveen Burgu Preethi Sadagopan pushpavanthar Qishang Zhong diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeRecordEmitter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeRecordEmitter.java index 6791d12db..9d0ef5658 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeRecordEmitter.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeRecordEmitter.java @@ -29,6 +29,7 @@ import io.debezium.function.Predicates; import io.debezium.pipeline.spi.ChangeRecordEmitter; import io.debezium.pipeline.spi.OffsetContext; +import io.debezium.pipeline.spi.Partition; import io.debezium.relational.Column; import io.debezium.relational.ColumnEditor; import io.debezium.relational.RelationalChangeRecordEmitter; @@ -37,6 +38,7 @@ import io.debezium.relational.TableId; import io.debezium.relational.TableSchema; import io.debezium.schema.DataCollectionSchema; +import io.debezium.spi.schema.DataCollectionId; import io.debezium.util.Clock; import io.debezium.util.Strings; @@ -234,8 +236,8 @@ private void refreshTableFromDatabase(TableId tableId) { } } - static Optional updateSchema(PostgresPartition partition, TableId tableId, ChangeRecordEmitter changeRecordEmitter) { - return ((PostgresChangeRecordEmitter) changeRecordEmitter).newTable(tableId); + static Optional updateSchema(Partition partition, DataCollectionId tableId, ChangeRecordEmitter changeRecordEmitter) { + return ((PostgresChangeRecordEmitter) changeRecordEmitter).newTable((TableId) tableId); } private boolean schemaChanged(List columns, Table table) { diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java index 04c859cfc..cf5f81625 100755 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java @@ -506,16 +506,6 @@ public static SchemaRefreshMode parse(String value) { "Whether or not to drop the logical replication slot when the connector finishes orderly. " + "By default the replication is kept so that on restart progress can resume from the last recorded location"); - public static final Field SLOT_SEEK_TO_KNOWN_OFFSET = Field.create("slot.seek.to.known.offset.on.start") - .withDisplayName("Seek to last know offset on the replication slot") - .withType(Type.BOOLEAN) - .withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_REPLICATION, 3)) - .withDefault(false) - .withImportance(Importance.HIGH) - .withDescription( - "Whether or not to seek to the last known offset on the replication slot." + - "Enabling this option results in startup failure if the slot is re-created instead of data loss."); - public static final Field PUBLICATION_NAME = Field.create("publication.name") .withDisplayName("Publication") .withType(Type.STRING) @@ -904,10 +894,6 @@ protected boolean dropSlotOnStop() { return getConfig().getBoolean(DROP_SLOT_ON_STOP); } - public boolean slotSeekToKnownOffsetOnStart() { - return getConfig().getBoolean(SLOT_SEEK_TO_KNOWN_OFFSET); - } - public String publicationName() { return getConfig().getString(PUBLICATION_NAME); } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java index 3f216f28e..536be8e1c 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java @@ -325,30 +325,7 @@ public ReplicationStream startStreaming(Lsn offset, WalPositionLocator walPositi LOGGER.debug("starting streaming from LSN '{}'", lsn); } - if (connectorConfig.slotSeekToKnownOffsetOnStart()) { - try { - Statement stmt = pgConnection().createStatement(); - String seekCommand = String.format( - "SELECT pg_replication_slot_advance('%s', '%s')", - slotName, - lsn.asString()); - LOGGER.info("Seeking to {} on the replication slot with command {}", lsn, seekCommand); - stmt.execute(seekCommand); - } - catch (PSQLException e) { - if (e.getMessage().matches("ERROR: function pg_replication_slot_advance.*does not exist(.|\\n)*")) { - LOGGER.warn("Postgres server doesn't support the command pg_replication_slot_advance(). Not seeking to last known offset."); - } - else if (e.getMessage().matches("ERROR: cannot advance replication slot to.*")) { - throw new DebeziumException( - String.format("Cannot seek to the last known offset '%s' on replication slot '%s'. Error from server: %s", lsn.asString(), slotName, - e.getMessage())); - } - else { - throw e; - } - } - } + validateSlotIsInExpectedState(lsn); final int maxRetries = connectorConfig.maxRetries(); final Duration delay = connectorConfig.retryDelay(); @@ -374,6 +351,31 @@ else if (e.getMessage().matches("ERROR: cannot advance replication slot to.*")) } } + protected void validateSlotIsInExpectedState(Lsn lsn) throws SQLException, PSQLException { + try { + Statement stmt = pgConnection().createStatement(); + String seekCommand = String.format( + "SELECT pg_replication_slot_advance('%s', '%s')", + slotName, + lsn.asString()); + LOGGER.info("Seeking to {} on the replication slot with command {}", lsn, seekCommand); + stmt.execute(seekCommand); + } + catch (PSQLException e) { + if (e.getMessage().matches("ERROR: function pg_replication_slot_advance.*does not exist(.|\\n)*")) { + LOGGER.info("Postgres server doesn't support the command pg_replication_slot_advance(). Not seeking to last known offset."); + } + else if (e.getMessage().matches("ERROR: cannot advance replication slot to.*")) { + throw new DebeziumException( + String.format("Cannot seek to the last known offset '%s' on replication slot '%s'. Error from server: %s", lsn.asString(), slotName, + e.getMessage())); + } + else { + throw e; + } + } + } + @Override public void initConnection() throws SQLException, InterruptedException { // See https://www.postgresql.org/docs/current/logical-replication-quick-setup.html diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java index 6bdd38d1a..638072550 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java @@ -2941,6 +2941,8 @@ public void shouldSuppressLoggingOptionalOfExcludedColumns() throws Exception { @FixFor("DBZ-5739") @SkipWhenDatabaseVersion(check = LESS_THAN, major = 11, reason = "This needs pg_replication_slot_advance which is supported only on Postgres 11+") public void shouldStopConnectorOnSlotRecreation() throws InterruptedException { + final LogInterceptor logInterceptor = new LogInterceptor(PostgresConnectorIT.class); + TestHelper.execute(SETUP_TABLES_STMT); Configuration.Builder configBuilder = TestHelper.defaultConfig() .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_ONLY.name()) @@ -2959,18 +2961,20 @@ public void shouldStopConnectorOnSlotRecreation() throws InterruptedException { configBuilder = TestHelper.defaultConfig() .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.name()) - .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) - .with(PostgresConnectorConfig.SLOT_SEEK_TO_KNOWN_OFFSET, Boolean.TRUE); + .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE); start(PostgresConnector.class, configBuilder.build()); - Thread.sleep(1000); + Awaitility.await().atMost(TestHelper.waitTimeForRecords() * 5, TimeUnit.SECONDS) + .until(() -> logInterceptor.containsStacktraceElement("Cannot seek to the last known offset ")); assertConnectorNotRunning(); } @Test @FixFor("DBZ-5739") - @SkipWhenDatabaseVersion(check = LESS_THAN, major = 11, reason = "This needs pg_replication_slot_advance which is supported only on Postgres 11+") + @SkipWhenDatabaseVersion(check = EqualityCheck.GREATER_THAN_OR_EQUAL, major = 11, reason = "This pg_replication_slot_advance is not present Postgres 10") public void shouldSeekToCorrectOffset() throws InterruptedException { + final LogInterceptor logInterceptor = new LogInterceptor(PostgresReplicationConnection.class); + TestHelper.execute(SETUP_TABLES_STMT); Configuration.Builder configBuilder = TestHelper.defaultConfig() .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL.name()) @@ -2989,12 +2993,15 @@ public void shouldSeekToCorrectOffset() throws InterruptedException { configBuilder = TestHelper.defaultConfig() .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.name()) - .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) - .with(PostgresConnectorConfig.SLOT_SEEK_TO_KNOWN_OFFSET, Boolean.TRUE); + .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE); start(PostgresConnector.class, configBuilder.build()); consumeRecordsByTopic(1); assertConnectorIsRunning(); + + Awaitility.await().atMost(TestHelper.waitTimeForRecords() * 5, TimeUnit.SECONDS) + .until(() -> logInterceptor + .containsMessage("Postgres server doesn't support the command pg_replication_slot_advance(). Not seeking to last known offset.")); } private Predicate stopOnPKPredicate(int pkValue) {