DBZ-1081 Fallback to restart_lsn in pg 9.5

In pg 9.5, confirmed_flush_lsn is not availiable. However, there is
restart_lsn, which should be safe to use with a downside of producing
some extra duplicate records.

This page: https://paquier.xyz/postgresql-2/postgres-9-6-feature-highlight-replication-slot-improvements/
indicates as such and this allows for DBZ to support PG 9.5
This commit is contained in:
Addison Higham 2018-12-20 22:55:02 -07:00 committed by Gunnar Morling
parent a11e3040c3
commit 889b307b9d
3 changed files with 76 additions and 26 deletions

View File

@ -132,12 +132,7 @@ protected ServerInfo.ReplicationSlot readReplicationSlotInfo(String slotName, St
final Metronome metronome = Metronome.parker(PAUSE_BETWEEN_REPLICATION_SLOT_RETRIEVAL_ATTEMPTS, Clock.SYSTEM); final Metronome metronome = Metronome.parker(PAUSE_BETWEEN_REPLICATION_SLOT_RETRIEVAL_ATTEMPTS, Clock.SYSTEM);
for (int attempt = 1; attempt <= MAX_ATTEMPTS_FOR_OBTAINING_REPLICATION_SLOT; attempt++) { for (int attempt = 1; attempt <= MAX_ATTEMPTS_FOR_OBTAINING_REPLICATION_SLOT; attempt++) {
final ServerInfo.ReplicationSlot slot = prepareQueryAndMap( final ServerInfo.ReplicationSlot slot = queryForSlot(slotName, database, pluginName,
"select * from pg_replication_slots where slot_name = ? and database = ? and plugin = ?", statement -> {
statement.setString(1, slotName);
statement.setString(2, database);
statement.setString(3, pluginName);
},
rs -> { rs -> {
if (rs.next()) { if (rs.next()) {
boolean active = rs.getBoolean("active"); boolean active = rs.getBoolean("active");
@ -166,34 +161,56 @@ protected ServerInfo.ReplicationSlot readReplicationSlotInfo(String slotName, St
+ "Make sure there are no long-running transactions running in parallel as they may hinder the allocation of the replication slot when starting this connector"); + "Make sure there are no long-running transactions running in parallel as they may hinder the allocation of the replication slot when starting this connector");
} }
protected ServerInfo.ReplicationSlot queryForSlot(String slotName, String database, String pluginName,
ResultSetMapper<ServerInfo.ReplicationSlot> map) throws SQLException {
return prepareQueryAndMap("select * from pg_replication_slots where slot_name = ? and database = ? and plugin = ?", statement -> {
statement.setString(1, slotName);
statement.setString(2, database);
statement.setString(3, pluginName);
}, map);
}
private Long parseConfirmedFlushLsn(String slotName, String pluginName, String database, ResultSet rs) { private Long parseConfirmedFlushLsn(String slotName, String pluginName, String database, ResultSet rs) {
Long confirmedFlushedLsn = null; Long confirmedFlushedLsn = null;
try { try {
String confirmedFlushLSNString = rs.getString("confirmed_flush_lsn"); confirmedFlushedLsn = tryParseLsn(slotName, pluginName, database, rs, "confirmed_flush_lsn");
if (confirmedFlushLSNString == null) { }
return null;
}
try {
confirmedFlushedLsn = LogSequenceNumber.valueOf(confirmedFlushLSNString).asLong();
}
catch (Exception e) {
throw new ConnectException("Value confirmed_flush_lsn in the pg_replication_slots table for slot = '"
+ slotName + "', plugin = '"
+ pluginName + "', database = '"
+ database + "' is not valid. This is an abnormal situation and the database status should be checked.");
}
if (confirmedFlushedLsn == LogSequenceNumber.INVALID_LSN.asLong()) {
throw new ConnectException("Invalid LSN returned from database");
}
}
catch (SQLException e) { catch (SQLException e) {
// info not available, so we must be prior to PG 9.6 LOGGER.info("unable to find confirmed_flushed_lsn, falling back to restart_lsn");
try {
confirmedFlushedLsn = tryParseLsn(slotName, pluginName, database, rs, "restart_lsn");
}
catch (SQLException e2) {
throw new ConnectException("Neither confirmed_flush_lsn or restart_lsn could be found");
}
} }
return confirmedFlushedLsn; return confirmedFlushedLsn;
} }
private Long tryParseLsn(String slotName, String pluginName, String database, ResultSet rs, String column) throws ConnectException, SQLException {
Long lsn = null;
String lsnStr = rs.getString(column);
if (lsnStr == null) {
return null;
}
try {
lsn = LogSequenceNumber.valueOf(lsnStr).asLong();
}
catch (Exception e) {
throw new ConnectException("Value " + column + " in the pg_replication_slots table for slot = '"
+ slotName + "', plugin = '"
+ pluginName + "', database = '"
+ database + "' is not valid. This is an abnormal situation and the database status should be checked.");
}
if (lsn == LogSequenceNumber.INVALID_LSN.asLong()) {
throw new ConnectException("Invalid LSN returned from database");
}
return lsn;
}
/** /**
* Drops a replication slot that was created on the DB * Drops a replication slot that was created on the DB
* *

View File

@ -80,7 +80,7 @@ public static ReplicationConnection createForReplication(String slotName, boolea
/** /**
* @return the decoder plugin used for testing and configured by system property * @return the decoder plugin used for testing and configured by system property
*/ */
static PostgresConnectorConfig.LogicalDecoder decoderPlugin() { public static PostgresConnectorConfig.LogicalDecoder decoderPlugin() {
final String s = System.getProperty(PostgresConnectorConfig.PLUGIN_NAME.name()); final String s = System.getProperty(PostgresConnectorConfig.PLUGIN_NAME.name());
return (s == null || s.length() == 0) ? PostgresConnectorConfig.LogicalDecoder.DECODERBUFS : PostgresConnectorConfig.LogicalDecoder.parse(s); return (s == null || s.length() == 0) ? PostgresConnectorConfig.LogicalDecoder.DECODERBUFS : PostgresConnectorConfig.LogicalDecoder.parse(s);
} }
@ -181,7 +181,7 @@ protected static Set<String> schemaNames() throws SQLException {
} }
} }
private static JdbcConfiguration defaultJdbcConfig() { public static JdbcConfiguration defaultJdbcConfig() {
return JdbcConfiguration.copy(Configuration.fromSystemProperties("database.")) return JdbcConfiguration.copy(Configuration.fromSystemProperties("database."))
.withDefault(JdbcConfiguration.DATABASE, "postgres") .withDefault(JdbcConfiguration.DATABASE, "postgres")
.withDefault(JdbcConfiguration.HOSTNAME, "localhost") .withDefault(JdbcConfiguration.HOSTNAME, "localhost")

View File

@ -9,6 +9,7 @@
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.lang.reflect.Field; import java.lang.reflect.Field;
@ -207,4 +208,36 @@ public void shouldDetectRunningConncurrentTxOnInit() throws Exception {
} }
} }
} }
@Test
public void shouldSupportPG95RestartLsn() throws Exception {
String slotName = "pg95";
try (ReplicationConnection replConnection = TestHelper.createForReplication(slotName, false)) {
assertTrue(replConnection.isConnected());
}
try (PostgresConnection conn = buildPG95PGConn("pg95")) {
ServerInfo.ReplicationSlot slotInfo = conn.readReplicationSlotInfo(slotName, TestHelper.decoderPlugin().getPostgresPluginName());
assertNotNull(slotInfo);
assertNotEquals(ServerInfo.ReplicationSlot.INVALID, slotInfo);
conn.dropReplicationSlot(slotName);
}
}
// "fake" a pg95 response by not returning confirmed_flushed_lsn
private PostgresConnection buildPG95PGConn(String name) {
return new PostgresConnection(TestHelper.defaultJdbcConfig().edit().with("ApplicationName", name).build()) {
@Override
protected ServerInfo.ReplicationSlot queryForSlot(String slotName, String database, String pluginName,
ResultSetMapper<ServerInfo.ReplicationSlot> map) throws SQLException {
String fields = "slot_name, plugin, slot_type, datoid, database, active, active_pid, xmin, catalog_xmin, restart_lsn";
return prepareQueryAndMap("select " + fields + " from pg_replication_slots where slot_name = ? and database = ? and plugin = ?", statement -> {
statement.setString(1, slotName);
statement.setString(2, database);
statement.setString(3, pluginName);
}, map);
}
};
}
} }