diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java index 3b8577662..53ac0e434 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java @@ -203,17 +203,16 @@ else if (shouldCreateSlot || !slotInfo.hasValidFlushedLsn()) { } private boolean useTemporarySlot() throws SQLException { - return dropSlotOnClose && isPg10OrGreater(); + return dropSlotOnClose && isPg94OrGreater(); } private boolean exportSnapshotOnCreate() throws SQLException { - return exportSnapshot && isPg10OrGreater(); + return exportSnapshot && isPg94OrGreater(); } - private boolean isPg10OrGreater() throws SQLException { - return pgConnection().getServerMajorVersion() >= 10; + private boolean isPg94OrGreater() throws SQLException { + return pgConnection().haveMinimumServerVersion(90400); } - /** * creating a replication connection and starting to stream involves a few steps: * 1. we create the connection and ensure that @@ -270,37 +269,34 @@ public void initConnection() throws SQLException, InterruptedException { @Override public Optional createReplicationSlot() throws SQLException { - // note that some of these options are only supported in pg10 or newer, additionally + // note that some of these options are only supported in pg94+, additionally // the options are not yet exported by the jdbc api wrapper, therefore, we just do this ourselves // but eventually this should be moved back to the jdbc API // see https://github.com/pgjdbc/pgjdbc/issues/1305 LOGGER.debug("Creating new replication slot '{}' for plugin '{}'", slotName, plugin); String tempPart = ""; - String exportPart = ""; - Boolean isPg10 = isPg10OrGreater(); - if ((dropSlotOnClose || exportSnapshot) && !isPg10OrGreater()) { - LOGGER.warn("A slot marked as temporary or with an exported snapshot was created, but not on a supported version of Postgres, ignoring!"); + Boolean canExportSnapshot = isPg94OrGreater(); + if ((dropSlotOnClose || exportSnapshot) && !canExportSnapshot) { + LOGGER.warn("A slot marked as temporary or with an exported snapshot was created, " + + "but not on a supported version of Postgres, ignoring!"); } if (useTemporarySlot()) { tempPart = "TEMPORARY"; } - if (exportSnapshotOnCreate()) { - exportPart = "EXPORT_SNAPSHOT"; - } + try (Statement stmt = pgConnection().createStatement()) { String createCommand = String.format( - "CREATE_REPLICATION_SLOT %s %s LOGICAL %s %s", + "CREATE_REPLICATION_SLOT %s %s LOGICAL %s", slotName, tempPart, - plugin.getPostgresPluginName(), - exportPart + plugin.getPostgresPluginName() ); LOGGER.info("Creating replication slot with command {}", createCommand); stmt.execute(createCommand); - // when we are in pg10, we can parse the slot creation info, otherwise, it returns + // when we are in pg94+, we can parse the slot creation info, otherwise, it returns // nothing - if (isPg10) { + if (canExportSnapshot) { this.slotCreationInfo = parseSlotCreation(stmt.getResultSet()); }