From 889b307b9df5445a441bac892d8385c724424624 Mon Sep 17 00:00:00 2001 From: Addison Higham Date: Thu, 20 Dec 2018 22:55:02 -0700 Subject: [PATCH] DBZ-1081 Fallback to restart_lsn in pg 9.5 In pg 9.5, confirmed_flush_lsn is not availiable. However, there is restart_lsn, which should be safe to use with a downside of producing some extra duplicate records. This page: https://paquier.xyz/postgresql-2/postgres-9-6-feature-highlight-replication-slot-improvements/ indicates as such and this allows for DBZ to support PG 9.5 --- .../connection/PostgresConnection.java | 65 ++++++++++++------- .../connector/postgresql/TestHelper.java | 4 +- .../connection/PostgresConnectionIT.java | 33 ++++++++++ 3 files changed, 76 insertions(+), 26 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java index 22c31c590..f6293599a 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java @@ -132,12 +132,7 @@ protected ServerInfo.ReplicationSlot readReplicationSlotInfo(String slotName, St final Metronome metronome = Metronome.parker(PAUSE_BETWEEN_REPLICATION_SLOT_RETRIEVAL_ATTEMPTS, Clock.SYSTEM); for (int attempt = 1; attempt <= MAX_ATTEMPTS_FOR_OBTAINING_REPLICATION_SLOT; attempt++) { - final ServerInfo.ReplicationSlot slot = prepareQueryAndMap( - "select * from pg_replication_slots where slot_name = ? and database = ? and plugin = ?", statement -> { - statement.setString(1, slotName); - statement.setString(2, database); - statement.setString(3, pluginName); - }, + final ServerInfo.ReplicationSlot slot = queryForSlot(slotName, database, pluginName, rs -> { if (rs.next()) { boolean active = rs.getBoolean("active"); @@ -166,34 +161,56 @@ protected ServerInfo.ReplicationSlot readReplicationSlotInfo(String slotName, St + "Make sure there are no long-running transactions running in parallel as they may hinder the allocation of the replication slot when starting this connector"); } + protected ServerInfo.ReplicationSlot queryForSlot(String slotName, String database, String pluginName, + ResultSetMapper map) throws SQLException { + return prepareQueryAndMap("select * from pg_replication_slots where slot_name = ? and database = ? and plugin = ?", statement -> { + statement.setString(1, slotName); + statement.setString(2, database); + statement.setString(3, pluginName); + }, map); + } + private Long parseConfirmedFlushLsn(String slotName, String pluginName, String database, ResultSet rs) { Long confirmedFlushedLsn = null; try { - String confirmedFlushLSNString = rs.getString("confirmed_flush_lsn"); - if (confirmedFlushLSNString == null) { - return null; - } - try { - confirmedFlushedLsn = LogSequenceNumber.valueOf(confirmedFlushLSNString).asLong(); - } - catch (Exception e) { - throw new ConnectException("Value confirmed_flush_lsn in the pg_replication_slots table for slot = '" - + slotName + "', plugin = '" - + pluginName + "', database = '" - + database + "' is not valid. This is an abnormal situation and the database status should be checked."); - } - if (confirmedFlushedLsn == LogSequenceNumber.INVALID_LSN.asLong()) { - throw new ConnectException("Invalid LSN returned from database"); - } - } + confirmedFlushedLsn = tryParseLsn(slotName, pluginName, database, rs, "confirmed_flush_lsn"); + } catch (SQLException e) { - // info not available, so we must be prior to PG 9.6 + LOGGER.info("unable to find confirmed_flushed_lsn, falling back to restart_lsn"); + try { + confirmedFlushedLsn = tryParseLsn(slotName, pluginName, database, rs, "restart_lsn"); + } + catch (SQLException e2) { + throw new ConnectException("Neither confirmed_flush_lsn or restart_lsn could be found"); + } } return confirmedFlushedLsn; } + private Long tryParseLsn(String slotName, String pluginName, String database, ResultSet rs, String column) throws ConnectException, SQLException { + Long lsn = null; + + String lsnStr = rs.getString(column); + if (lsnStr == null) { + return null; + } + try { + lsn = LogSequenceNumber.valueOf(lsnStr).asLong(); + } + catch (Exception e) { + throw new ConnectException("Value " + column + " in the pg_replication_slots table for slot = '" + + slotName + "', plugin = '" + + pluginName + "', database = '" + + database + "' is not valid. This is an abnormal situation and the database status should be checked."); + } + if (lsn == LogSequenceNumber.INVALID_LSN.asLong()) { + throw new ConnectException("Invalid LSN returned from database"); + } + return lsn; + } + /** * Drops a replication slot that was created on the DB * diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java index a93a9c755..5f73fca8b 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java @@ -80,7 +80,7 @@ public static ReplicationConnection createForReplication(String slotName, boolea /** * @return the decoder plugin used for testing and configured by system property */ - static PostgresConnectorConfig.LogicalDecoder decoderPlugin() { + public static PostgresConnectorConfig.LogicalDecoder decoderPlugin() { final String s = System.getProperty(PostgresConnectorConfig.PLUGIN_NAME.name()); return (s == null || s.length() == 0) ? PostgresConnectorConfig.LogicalDecoder.DECODERBUFS : PostgresConnectorConfig.LogicalDecoder.parse(s); } @@ -181,7 +181,7 @@ protected static Set schemaNames() throws SQLException { } } - private static JdbcConfiguration defaultJdbcConfig() { + public static JdbcConfiguration defaultJdbcConfig() { return JdbcConfiguration.copy(Configuration.fromSystemProperties("database.")) .withDefault(JdbcConfiguration.DATABASE, "postgres") .withDefault(JdbcConfiguration.HOSTNAME, "localhost") diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/connection/PostgresConnectionIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/connection/PostgresConnectionIT.java index 8e34dd16e..1f8c58f46 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/connection/PostgresConnectionIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/connection/PostgresConnectionIT.java @@ -9,6 +9,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import java.lang.reflect.Field; @@ -207,4 +208,36 @@ public void shouldDetectRunningConncurrentTxOnInit() throws Exception { } } } + + @Test + public void shouldSupportPG95RestartLsn() throws Exception { + String slotName = "pg95"; + try (ReplicationConnection replConnection = TestHelper.createForReplication(slotName, false)) { + assertTrue(replConnection.isConnected()); + } + try (PostgresConnection conn = buildPG95PGConn("pg95")) { + ServerInfo.ReplicationSlot slotInfo = conn.readReplicationSlotInfo(slotName, TestHelper.decoderPlugin().getPostgresPluginName()); + assertNotNull(slotInfo); + assertNotEquals(ServerInfo.ReplicationSlot.INVALID, slotInfo); + conn.dropReplicationSlot(slotName); + } + + } + + // "fake" a pg95 response by not returning confirmed_flushed_lsn + private PostgresConnection buildPG95PGConn(String name) { + return new PostgresConnection(TestHelper.defaultJdbcConfig().edit().with("ApplicationName", name).build()) { + @Override + protected ServerInfo.ReplicationSlot queryForSlot(String slotName, String database, String pluginName, + ResultSetMapper map) throws SQLException { + + String fields = "slot_name, plugin, slot_type, datoid, database, active, active_pid, xmin, catalog_xmin, restart_lsn"; + return prepareQueryAndMap("select " + fields + " from pg_replication_slots where slot_name = ? and database = ? and plugin = ?", statement -> { + statement.setString(1, slotName); + statement.setString(2, database); + statement.setString(3, pluginName); + }, map); + } + }; + } }