DBZ-1111 Allowing to use drop-slot-on-close with PG 10 or newer

This commit is contained in:
Gunnar Morling 2019-01-29 14:05:43 +01:00 committed by Jiri Pechanec
parent 2a80931d03
commit cc0df61665
2 changed files with 19 additions and 6 deletions

View File

@ -106,7 +106,7 @@ protected void initReplicationSlot() throws SQLException, InterruptedException {
// creating a temporary slot if it should be dropped an we're on 10 or newer;
// this is not supported through the API yet
// see https://github.com/pgjdbc/pgjdbc/issues/1305
if (dropSlotOnClose && pgConnection().getServerMajorVersion() >= 10) {
if (useTemporarySlot()) {
try (Statement stmt = pgConnection().createStatement()) {
stmt.execute(String.format(
"CREATE_REPLICATION_SLOT %s TEMPORARY LOGICAL %s",
@ -164,13 +164,17 @@ else if (slotInfo.active()) {
}
}
private boolean useTemporarySlot() throws SQLException {
return dropSlotOnClose && pgConnection().getServerMajorVersion() >= 10;
}
@Override
public ReplicationStream startStreaming() throws SQLException {
public ReplicationStream startStreaming() throws SQLException, InterruptedException {
return startStreaming(defaultStartingPos);
}
@Override
public ReplicationStream startStreaming(Long offset) throws SQLException {
public ReplicationStream startStreaming(Long offset) throws SQLException, InterruptedException {
connect();
if (offset == null || offset <= 0) {
offset = defaultStartingPos;
@ -184,7 +188,7 @@ protected PgConnection pgConnection() throws SQLException {
return (PgConnection) connection(false);
}
private ReplicationStream createReplicationStream(final LogSequenceNumber lsn) throws SQLException {
private ReplicationStream createReplicationStream(final LogSequenceNumber lsn) throws SQLException, InterruptedException {
PGReplicationStream s;
try {
@ -198,6 +202,10 @@ private ReplicationStream createReplicationStream(final LogSequenceNumber lsn) t
catch (PSQLException e) {
LOGGER.debug("Could not register for streaming, retrying without optional options", e);
if (useTemporarySlot()) {
initReplicationSlot();
}
s = startPgReplicationStream(lsn, plugin.forceRds() ? messageDecoder::optionsWithoutMetadata : messageDecoder::optionsWithMetadata);
messageDecoder.setContainsMetadata(plugin.forceRds() ? false : true);
}
@ -206,6 +214,11 @@ private ReplicationStream createReplicationStream(final LogSequenceNumber lsn) t
if (e.getMessage().matches("(?s)ERROR: option .* is unknown.*")) {
// It is possible we are connecting to an old wal2json plug-in
LOGGER.warn("Could not register for streaming with metadata in messages, falling back to messages without metadata");
if (useTemporarySlot()) {
initReplicationSlot();
}
s = startPgReplicationStream(lsn, messageDecoder::optionsWithoutMetadata);
messageDecoder.setContainsMetadata(false);
}

View File

@ -37,7 +37,7 @@ public interface ReplicationConnection extends AutoCloseable {
* @return a {@link PGReplicationStream} from which data is read; never null
* @throws SQLException if there is a problem obtaining the replication stream
*/
ReplicationStream startStreaming() throws SQLException;
ReplicationStream startStreaming() throws SQLException, InterruptedException;
/**
* Opens a stream for reading logical replication changes from a given LSN position.
@ -52,7 +52,7 @@ public interface ReplicationConnection extends AutoCloseable {
* @see org.postgresql.replication.LogSequenceNumber
* @throws SQLException if anything fails
*/
ReplicationStream startStreaming(Long offset) throws SQLException;
ReplicationStream startStreaming(Long offset) throws SQLException, InterruptedException;
/**
* Checks whether this connection is open or not