diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/MessageDecoderConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/MessageDecoderConfig.java index b3b7e05d9..236923cce 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/MessageDecoderConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/MessageDecoderConfig.java @@ -17,10 +17,12 @@ public class MessageDecoderConfig { private final Configuration configuration; private final PostgresSchema schema; + private final String publicationName; - public MessageDecoderConfig(Configuration configuration, PostgresSchema schema) { + public MessageDecoderConfig(Configuration configuration, PostgresSchema schema, String publicationName) { this.configuration = configuration; this.schema = schema; + this.publicationName = publicationName; } public Configuration getConfiguration() { @@ -30,4 +32,8 @@ public Configuration getConfiguration() { public PostgresSchema getSchema() { return schema; } + + public String getPublicationName() { + return publicationName; + } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java index a8b174cfb..d19953b25 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java @@ -103,7 +103,7 @@ private PostgresReplicationConnection(Configuration config, this.dropSlotOnClose = dropSlotOnClose; this.statusUpdateInterval = statusUpdateInterval; this.exportSnapshot = exportSnapshot; - this.messageDecoder = plugin.messageDecoder(new MessageDecoderConfig(config, schema)); + this.messageDecoder = plugin.messageDecoder(new MessageDecoderConfig(config, schema, publicationName)); this.typeRegistry = typeRegistry; this.streamParams = streamParams; this.slotCreationInfo = null; @@ -134,7 +134,7 @@ protected void initPublication() { // For situations where no publication exists, we likely cannot create it for all tables. // This is because postgres requires certain super user permissions to use "ALL TABLES". // We should restrict this to the configured tables here. - stmt.execute("CREATE PUBLICATION dbz_publication FOR ALL TABLES;"); + stmt.execute(String.format("CREATE PUBLICATION %s FOR ALL TABLES;", publicationName)); } else { LOGGER.trace( diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java index 619bdc0aa..189334407 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java @@ -175,7 +175,7 @@ public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor proces @Override public ChainedLogicalStreamBuilder optionsWithMetadata(ChainedLogicalStreamBuilder builder) { return builder.withSlotOption("proto_version", 1) - .withSlotOption("publication_names", "dbz_publication"); + .withSlotOption("publication_names", config.getPublicationName()); } @Override diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java index 5cd94865c..8a0c0c270 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java @@ -1121,6 +1121,7 @@ public void testCustomPublicationNameUsed() throws Exception { waitForAvailableRecords(100, TimeUnit.MILLISECONDS); stopConnector(value -> assertThat(logInterceptor.containsMessage("Creating new publication 'cdc' for plugin 'PGOUTPUT'")).isTrue()); + assertTrue(TestHelper.publicationExists("cdc")); } private CompletableFuture batchInsertRecords(long recordsCount, int batchSize) { diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java index ffd847f14..dfb868b9d 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java @@ -13,6 +13,7 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.sql.Connection; +import java.sql.ResultSet; import java.sql.SQLException; import java.time.Duration; import java.util.Set; @@ -272,6 +273,25 @@ protected static void dropPublication(String publicationName) { } } + protected static boolean publicationExists() { + return publicationExists(ReplicationConnection.Builder.DEFAULT_PUBLICATION_NAME); + } + + protected static boolean publicationExists(String publicationName) { + if(decoderPlugin().equals(PostgresConnectorConfig.LogicalDecoder.PGOUTPUT)) { + try(PostgresConnection connection = create()) { + String query = String.format("SELECT pubname FROM pg_catalog.pg_publication WHERE pubname = '%s'", publicationName); + try { + return connection.queryAndMap(query, ResultSet::next); + } + catch (SQLException e) { + // ignored + } + } + } + return false; + } + protected static void waitForDefaultReplicationSlotBeActive() { try (PostgresConnection connection = create()) { Awaitility.await().atMost(org.awaitility.Duration.FIVE_SECONDS).until(() ->