DBZ-8156 Add an option for publication.autocreate.mode to create a publication with no tables

This commit is contained in:
Roman Kudryashov 2024-08-17 14:16:08 +03:00 committed by Jiri Pechanec
parent 8617f474c5
commit b77f0663b8
3 changed files with 47 additions and 6 deletions

View File

@ -625,7 +625,11 @@ public enum AutoCreateMode implements EnumeratedValue {
/**
* Enable publication on a specific set of tables.
*/
FILTERED("filtered");
FILTERED("filtered"),
/**
* Enable publication with no tables.
*/
NO_TABLES("no_tables");
private final String value;

View File

@ -155,7 +155,8 @@ protected void initPublication() {
String selectPublication = String.format("SELECT puballtables FROM pg_publication WHERE pubname = '%s'", publicationName);
try (Statement stmt = conn.createStatement(); ResultSet rs = stmt.executeQuery(selectPublication)) {
if (!rs.next()) {
final boolean publicationExists = rs.next();
if (!publicationExists) {
// Close eagerly as the transaction might stay running
LOGGER.info("Creating new publication '{}' for plugin '{}'", publicationName, plugin);
switch (publicationAutocreateMode) {
@ -168,7 +169,12 @@ protected void initPublication() {
stmt.execute(createPublicationStmt);
break;
case FILTERED:
createOrUpdatePublicationModeFilterted(stmt, false);
createOrUpdatePublicationModeFiltered(stmt, false);
break;
case NO_TABLES:
final String createPublicationWithNoTablesStmt = String.format("CREATE PUBLICATION %s;", publicationName);
LOGGER.info("Creating publication with statement '{}'", createPublicationWithNoTablesStmt);
stmt.execute(createPublicationWithNoTablesStmt);
break;
}
}
@ -188,7 +194,7 @@ protected void initPublication() {
publicationName, plugin, database()));
}
else {
createOrUpdatePublicationModeFilterted(stmt, true);
createOrUpdatePublicationModeFiltered(stmt, true);
}
break;
default:
@ -209,7 +215,7 @@ protected void initPublication() {
}
}
private void createOrUpdatePublicationModeFilterted(Statement stmt, boolean isUpdate) {
private void createOrUpdatePublicationModeFiltered(Statement stmt, boolean isUpdate) {
String tableFilterString = null;
String createOrUpdatePublicationStmt;
try {

View File

@ -1248,7 +1248,7 @@ public void shouldLogOwnershipErrorForReplicaIdentityUpdate() throws Exception {
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NO_DATA.getValue())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE)
.with(PostgresConnectorConfig.REPLICA_IDENTITY_AUTOSET_VALUES, "s1.a:FULL,s2.a:DEFAULT")
.with(PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE, "DISABLED")
.with(PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE, PostgresConnectorConfig.AutoCreateMode.DISABLED.getValue())
.with("database.user", "role_2")
.with("database.password", "role_2_pass")
.build();
@ -3541,6 +3541,37 @@ public void shouldFailWhenReadOnlyIsNotSupported() {
});
}
@Test
@FixFor("DBZ-8156")
@SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Publication configuration only valid for PGOUTPUT decoder")
public void shouldProduceOnlyLogicalDecodingMessages() throws Exception {
TestHelper.dropAllSchemas();
TestHelper.dropPublication("cdc");
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.PUBLICATION_NAME, "cdc")
.with(PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE, PostgresConnectorConfig.AutoCreateMode.NO_TABLES.getValue());
start(PostgresConnector.class, configBuilder.build());
assertConnectorIsRunning();
waitForSnapshotToBeCompleted();
TestHelper.execute("SELECT pg_logical_emit_message(false, 'foo', '{}');");
SourceRecords actualRecords = consumeRecordsByTopic(1);
assertThat(actualRecords.topics()).hasSize(1);
List<SourceRecord> logicalDecodingMessages = actualRecords.recordsForTopic(topicName("message"));
assertThat(logicalDecodingMessages).hasSize(1);
logicalDecodingMessages.forEach(sourceRecord -> {
assertTrue(sourceRecord.value() instanceof Struct);
assertEquals("m", ((Struct) sourceRecord.value()).getString("op"));
Struct message = ((Struct) sourceRecord.value()).getStruct("message");
assertEquals("foo", message.getString("prefix"));
assertEquals("{}", new String(message.getBytes("content")));
});
}
private Predicate<SourceRecord> stopOnPKPredicate(int pkValue) {
return record -> {
Struct key = (Struct) record.key();