DBZ-1436 Fix postgres connector not honoring configured publication.name setting

This commit is contained in:
Chris Cranford 2019-09-23 13:07:30 -04:00 committed by Gunnar Morling
parent 5e19641a3e
commit f446ce6ffd
5 changed files with 31 additions and 4 deletions

View File

@ -17,10 +17,12 @@ public class MessageDecoderConfig {
private final Configuration configuration;
private final PostgresSchema schema;
private final String publicationName;
public MessageDecoderConfig(Configuration configuration, PostgresSchema schema) {
public MessageDecoderConfig(Configuration configuration, PostgresSchema schema, String publicationName) {
this.configuration = configuration;
this.schema = schema;
this.publicationName = publicationName;
}
public Configuration getConfiguration() {
@ -30,4 +32,8 @@ public Configuration getConfiguration() {
public PostgresSchema getSchema() {
return schema;
}
public String getPublicationName() {
return publicationName;
}
}

View File

@ -103,7 +103,7 @@ private PostgresReplicationConnection(Configuration config,
this.dropSlotOnClose = dropSlotOnClose;
this.statusUpdateInterval = statusUpdateInterval;
this.exportSnapshot = exportSnapshot;
this.messageDecoder = plugin.messageDecoder(new MessageDecoderConfig(config, schema));
this.messageDecoder = plugin.messageDecoder(new MessageDecoderConfig(config, schema, publicationName));
this.typeRegistry = typeRegistry;
this.streamParams = streamParams;
this.slotCreationInfo = null;
@ -134,7 +134,7 @@ protected void initPublication() {
// For situations where no publication exists, we likely cannot create it for all tables.
// This is because postgres requires certain super user permissions to use "ALL TABLES".
// We should restrict this to the configured tables here.
stmt.execute("CREATE PUBLICATION dbz_publication FOR ALL TABLES;");
stmt.execute(String.format("CREATE PUBLICATION %s FOR ALL TABLES;", publicationName));
}
else {
LOGGER.trace(

View File

@ -175,7 +175,7 @@ public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor proces
@Override
public ChainedLogicalStreamBuilder optionsWithMetadata(ChainedLogicalStreamBuilder builder) {
return builder.withSlotOption("proto_version", 1)
.withSlotOption("publication_names", "dbz_publication");
.withSlotOption("publication_names", config.getPublicationName());
}
@Override

View File

@ -1121,6 +1121,7 @@ public void testCustomPublicationNameUsed() throws Exception {
waitForAvailableRecords(100, TimeUnit.MILLISECONDS);
stopConnector(value -> assertThat(logInterceptor.containsMessage("Creating new publication 'cdc' for plugin 'PGOUTPUT'")).isTrue());
assertTrue(TestHelper.publicationExists("cdc"));
}
private CompletableFuture<Void> batchInsertRecords(long recordsCount, int batchSize) {

View File

@ -13,6 +13,7 @@
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Set;
@ -272,6 +273,25 @@ protected static void dropPublication(String publicationName) {
}
}
protected static boolean publicationExists() {
return publicationExists(ReplicationConnection.Builder.DEFAULT_PUBLICATION_NAME);
}
protected static boolean publicationExists(String publicationName) {
if(decoderPlugin().equals(PostgresConnectorConfig.LogicalDecoder.PGOUTPUT)) {
try(PostgresConnection connection = create()) {
String query = String.format("SELECT pubname FROM pg_catalog.pg_publication WHERE pubname = '%s'", publicationName);
try {
return connection.queryAndMap(query, ResultSet::next);
}
catch (SQLException e) {
// ignored
}
}
}
return false;
}
protected static void waitForDefaultReplicationSlotBeActive() {
try (PostgresConnection connection = create()) {
Awaitility.await().atMost(org.awaitility.Duration.FIVE_SECONDS).until(() ->