DBZ-1238 Avoiding explicit check for slot state;
This causes issues with PG 10 temporary slots which are always active. The check isn't really needed anyways, because the server will raise an exception itself when trying to connect to an already active slot.
This commit is contained in:
parent
a2e0c7cd2a
commit
bb8d7683e3
@ -98,15 +98,6 @@ private PostgresReplicationConnection(Configuration config,
|
||||
this.streamParams = streamParams;
|
||||
this.slotCreationInfo = null;
|
||||
this.hasInitedSlot = false;
|
||||
|
||||
// to keep the same
|
||||
try {
|
||||
ensureSlotNotActive(getSlotInfo());
|
||||
}
|
||||
catch (Throwable t) {
|
||||
close();
|
||||
throw new ConnectException(t);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -116,15 +107,6 @@ private ServerInfo.ReplicationSlot getSlotInfo() throws SQLException, Interrupte
|
||||
}
|
||||
}
|
||||
|
||||
private void ensureSlotNotActive(ServerInfo.ReplicationSlot slotInfo) throws IllegalStateException {
|
||||
if (slotInfo.active()) {
|
||||
throw new IllegalStateException(
|
||||
"A logical replication slot named '" + slotName + "' for plugin '" + plugin.getPostgresPluginName() + "' and database '" + database() + "' is already active on the server." +
|
||||
"You cannot have multiple slots with the same name active for the same database."
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
protected void initReplicationSlot() throws SQLException, InterruptedException {
|
||||
final String postgresPluginName = plugin.getPostgresPluginName();
|
||||
ServerInfo.ReplicationSlot slotInfo = getSlotInfo();
|
||||
@ -135,7 +117,6 @@ protected void initReplicationSlot() throws SQLException, InterruptedException {
|
||||
if (shouldCreateSlot) {
|
||||
this.createReplicationSlot();
|
||||
}
|
||||
ensureSlotNotActive(slotInfo);
|
||||
|
||||
AtomicLong xlogStart = new AtomicLong();
|
||||
// replication connection does not support parsing of SQL statements so we need to create
|
||||
@ -227,7 +208,13 @@ public ReplicationStream startStreaming(Long offset) throws SQLException, Interr
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("starting streaming from LSN '{}'", lsn.asString());
|
||||
}
|
||||
return createReplicationStream(lsn);
|
||||
|
||||
try {
|
||||
return createReplicationStream(lsn);
|
||||
}
|
||||
catch(Exception e) {
|
||||
throw new ConnectException("Failed to start replication stream at " + lsn, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
Loading…
Reference in New Issue
Block a user