diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java index 87ba85db6..65c8db0f5 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java @@ -253,10 +253,12 @@ public ReplicationStream startStreaming(Long offset) throws SQLException, Interr @Override public void initConnection() throws SQLException, InterruptedException { + // See https://www.postgresql.org/docs/current/logical-replication-quick-setup.html + // For pgoutput specifically, the publication must be created before the slot. + initPublication(); if (!hasInitedSlot) { initReplicationSlot(); } - initPublication(); } @Override @@ -278,6 +280,10 @@ public Optional createReplicationSlot() throws SQLException tempPart = "TEMPORARY"; } + // See https://www.postgresql.org/docs/current/logical-replication-quick-setup.html + // For pgoutput specifically, the publication must be created prior to the slot. + initPublication(); + try (Statement stmt = pgConnection().createStatement()) { String createCommand = String.format( "CREATE_REPLICATION_SLOT %s %s LOGICAL %s",