From b77f0663b86c152710d3fd3a3505f9992a11736e Mon Sep 17 00:00:00 2001 From: Roman Kudryashov Date: Sat, 17 Aug 2024 14:16:08 +0300 Subject: [PATCH] DBZ-8156 Add an option for `publication.autocreate.mode` to create a publication with no tables --- .../postgresql/PostgresConnectorConfig.java | 6 +++- .../PostgresReplicationConnection.java | 14 +++++--- .../postgresql/PostgresConnectorIT.java | 33 ++++++++++++++++++- 3 files changed, 47 insertions(+), 6 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java index 8f9b654f4..cfdcf6d8e 100755 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java @@ -625,7 +625,11 @@ public enum AutoCreateMode implements EnumeratedValue { /** * Enable publication on a specific set of tables. */ - FILTERED("filtered"); + FILTERED("filtered"), + /** + * Enable publication with no tables. + */ + NO_TABLES("no_tables"); private final String value; diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java index 507fef1d0..67fd11f66 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java @@ -155,7 +155,8 @@ protected void initPublication() { String selectPublication = String.format("SELECT puballtables FROM pg_publication WHERE pubname = '%s'", publicationName); try (Statement stmt = conn.createStatement(); ResultSet rs = stmt.executeQuery(selectPublication)) { - if (!rs.next()) { + final boolean publicationExists = rs.next(); + if (!publicationExists) { // Close eagerly as the transaction might stay running LOGGER.info("Creating new publication '{}' for plugin '{}'", publicationName, plugin); switch (publicationAutocreateMode) { @@ -168,7 +169,12 @@ protected void initPublication() { stmt.execute(createPublicationStmt); break; case FILTERED: - createOrUpdatePublicationModeFilterted(stmt, false); + createOrUpdatePublicationModeFiltered(stmt, false); + break; + case NO_TABLES: + final String createPublicationWithNoTablesStmt = String.format("CREATE PUBLICATION %s;", publicationName); + LOGGER.info("Creating publication with statement '{}'", createPublicationWithNoTablesStmt); + stmt.execute(createPublicationWithNoTablesStmt); break; } } @@ -188,7 +194,7 @@ protected void initPublication() { publicationName, plugin, database())); } else { - createOrUpdatePublicationModeFilterted(stmt, true); + createOrUpdatePublicationModeFiltered(stmt, true); } break; default: @@ -209,7 +215,7 @@ protected void initPublication() { } } - private void createOrUpdatePublicationModeFilterted(Statement stmt, boolean isUpdate) { + private void createOrUpdatePublicationModeFiltered(Statement stmt, boolean isUpdate) { String tableFilterString = null; String createOrUpdatePublicationStmt; try { diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java index 5486fa4dc..ad4065b07 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java @@ -1248,7 +1248,7 @@ public void shouldLogOwnershipErrorForReplicaIdentityUpdate() throws Exception { .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA.getValue()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE) .with(PostgresConnectorConfig.REPLICA_IDENTITY_AUTOSET_VALUES, "s1.a:FULL,s2.a:DEFAULT") - .with(PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE, "DISABLED") + .with(PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE, PostgresConnectorConfig.AutoCreateMode.DISABLED.getValue()) .with("database.user", "role_2") .with("database.password", "role_2_pass") .build(); @@ -3541,6 +3541,37 @@ public void shouldFailWhenReadOnlyIsNotSupported() { }); } + @Test + @FixFor("DBZ-8156") + @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Publication configuration only valid for PGOUTPUT decoder") + public void shouldProduceOnlyLogicalDecodingMessages() throws Exception { + TestHelper.dropAllSchemas(); + TestHelper.dropPublication("cdc"); + + Configuration.Builder configBuilder = TestHelper.defaultConfig() + .with(PostgresConnectorConfig.PUBLICATION_NAME, "cdc") + .with(PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE, PostgresConnectorConfig.AutoCreateMode.NO_TABLES.getValue()); + + start(PostgresConnector.class, configBuilder.build()); + assertConnectorIsRunning(); + waitForSnapshotToBeCompleted(); + + TestHelper.execute("SELECT pg_logical_emit_message(false, 'foo', '{}');"); + SourceRecords actualRecords = consumeRecordsByTopic(1); + assertThat(actualRecords.topics()).hasSize(1); + + List logicalDecodingMessages = actualRecords.recordsForTopic(topicName("message")); + assertThat(logicalDecodingMessages).hasSize(1); + logicalDecodingMessages.forEach(sourceRecord -> { + assertTrue(sourceRecord.value() instanceof Struct); + assertEquals("m", ((Struct) sourceRecord.value()).getString("op")); + + Struct message = ((Struct) sourceRecord.value()).getStruct("message"); + assertEquals("foo", message.getString("prefix")); + assertEquals("{}", new String(message.getBytes("content"))); + }); + } + private Predicate stopOnPKPredicate(int pkValue) { return record -> { Struct key = (Struct) record.key();