DBZ-1813 Adding ability to configure pgoutput publication based on table filters

defined.
This commit is contained in:
Brandon Brown 2020-05-21 08:59:41 -04:00 committed by Jiri Pechanec
parent de424d4980
commit bd64ace7fc
6 changed files with 365 additions and 22 deletions

View File

@ -592,6 +592,85 @@ public static SchemaRefreshMode parse(String value) {
.withDescription("The name of the Postgres 10+ publication used for streaming changes from a plugin." +
"Defaults to '" + ReplicationConnection.Builder.DEFAULT_PUBLICATION_NAME + "'");
public enum AutoCreateMode implements EnumeratedValue {
/**
* No Publication will be created, it's expected the user
* has already created the publication.
*/
DISABLED("disabled"),
/**
* Enable publication for all tables.
*/
ALL_TABLES("all_tables"),
/**
* Enable publication on a specific set of tables.
*/
FILTERED("filtered");
private final String value;
AutoCreateMode(String value) {
this.value = value;
}
@Override
public String getValue() {
return value;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @return the matching option, or null if no match is found
*/
public static AutoCreateMode parse(String value) {
if (value == null) {
return null;
}
value = value.trim();
for (AutoCreateMode option : AutoCreateMode.values()) {
if (option.getValue().equalsIgnoreCase(value)) {
return option;
}
}
return null;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @param defaultValue the default value; may be null
* @return the matching option, or null if no match is found and the non-null default is invalid
*/
public static AutoCreateMode parse(String value, String defaultValue) {
AutoCreateMode mode = parse(value);
if (mode == null && defaultValue != null) {
mode = parse(defaultValue);
}
return mode;
}
}
public static final Field PUBLICATION_AUTOCREATE_MODE = Field.create("publication.autocreate.mode")
.withDisplayName("Publication Auto Create Mode")
.withEnum(AutoCreateMode.class, AutoCreateMode.ALL_TABLES)
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withDescription(
"Applies only when streaming changes using pgoutput." +
"Determine how creation of a publication should work, the default is all_tables." +
"DISABLED - The connector will not attempt to create a publication at all. The expectation is " +
"that the user has created the publication up-front. If the publication isn't found to exist upon " +
"startup, the connector will throw an exception and stop." +
"ALL_TABLES - If no publication exists, the connector will create a new publication for all tables. " +
"Note this requires that the configured user has access. If the publication already exists, it will be used" +
". i.e CREATE PUBLICATION <publication_name> FOR ALL TABLES;" +
"FILTERED - If no publication exists, the connector will create a new publication for all those tables matching" +
"the current filter configuration (see table/database whitelist/blacklist properties). If the publication already" +
" exists, it will be used. i.e CREATE PUBLICATION <publication_name> FOR TABLE <tbl1, tbl2, etc>");
public static final Field STREAM_PARAMS = Field.create("slot.stream.params")
.withDisplayName("Optional parameters to pass to the logical decoder when the stream is started.")
.withType(Type.STRING)
@ -880,6 +959,10 @@ protected String publicationName() {
return getConfig().getString(PUBLICATION_NAME);
}
protected AutoCreateMode publicationAutocreateMode() {
return AutoCreateMode.parse(getConfig().getString(PUBLICATION_AUTOCREATE_MODE));
}
protected String streamParams() {
return getConfig().getString(STREAM_PARAMS);
}
@ -981,6 +1064,7 @@ protected SourceInfoStructMaker<? extends AbstractSourceInfo> getSourceInfoStruc
PLUGIN_NAME,
SLOT_NAME,
PUBLICATION_NAME,
PUBLICATION_AUTOCREATE_MODE,
DROP_SLOT_ON_STOP,
STREAM_PARAMS,
ON_CONNECT_STATEMENTS,

View File

@ -111,6 +111,8 @@ protected ReplicationConnection createReplicationConnection(boolean exportSnapsh
return ReplicationConnection.builder(config.jdbcConfig())
.withSlot(config.slotName())
.withPublication(config.publicationName())
.withTableFilter(config.getTableFilters())
.withPublicationAutocreateMode(config.publicationAutocreateMode())
.withPlugin(config.plugin())
.dropSlotOnClose(dropSlotOnStop)
.streamParams(config.streamParams())

View File

@ -14,13 +14,19 @@
import java.sql.SQLWarning;
import java.sql.Statement;
import java.time.Duration;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.connect.errors.ConnectException;
import org.postgresql.core.BaseConnection;
@ -39,6 +45,8 @@
import io.debezium.connector.postgresql.spi.SlotCreationResult;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.jdbc.JdbcConnectionException;
import io.debezium.relational.RelationalTableFilters;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
@ -54,6 +62,8 @@ public class PostgresReplicationConnection extends JdbcConnection implements Rep
private final String slotName;
private final String publicationName;
private final RelationalTableFilters tableFilter;
private final PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode;
private final PostgresConnectorConfig.LogicalDecoder plugin;
private final boolean dropSlotOnClose;
private final boolean exportSnapshot;
@ -70,22 +80,26 @@ public class PostgresReplicationConnection extends JdbcConnection implements Rep
/**
* Creates a new replication connection with the given params.
*
* @param config the JDBC configuration for the connection; may not be null
* @param slotName the name of the DB slot for logical replication; may not be null
* @param publicationName the name of the DB publication for logical replication; may not be null
* @param plugin decoder matching the server side plug-in used for streaming changes; may not be null
* @param dropSlotOnClose whether the replication slot should be dropped once the connection is closed
* @param statusUpdateInterval the interval at which the replication connection should periodically send status
* @param exportSnapshot whether the replication should export a snapshot when created
* @param typeRegistry registry with PostgreSQL types
* @param streamParams additional parameters to pass to the replication stream
* @param schema the schema; must not be null
*
* updates to the server
* @param config the JDBC configuration for the connection; may not be null
* @param slotName the name of the DB slot for logical replication; may not be null
* @param publicationName the name of the DB publication for logical replication; may not be null
* @param tableFilter the tables to watch of the DB publication for logical replication; may not be null
* @param publicationAutocreateMode the mode for publication autocreation; may not be null
* @param plugin decoder matching the server side plug-in used for streaming changes; may not be null
* @param dropSlotOnClose whether the replication slot should be dropped once the connection is closed
* @param statusUpdateInterval the interval at which the replication connection should periodically send status
* @param exportSnapshot whether the replication should export a snapshot when created
* @param typeRegistry registry with PostgreSQL types
* @param streamParams additional parameters to pass to the replication stream
* @param schema the schema; must not be null
* <p>
* updates to the server
*/
private PostgresReplicationConnection(Configuration config,
String slotName,
String publicationName,
RelationalTableFilters tableFilter,
PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode,
PostgresConnectorConfig.LogicalDecoder plugin,
boolean dropSlotOnClose,
boolean exportSnapshot,
@ -98,6 +112,8 @@ private PostgresReplicationConnection(Configuration config,
this.originalConfig = config;
this.slotName = slotName;
this.publicationName = publicationName;
this.tableFilter = tableFilter;
this.publicationAutocreateMode = publicationAutocreateMode;
this.plugin = plugin;
this.dropSlotOnClose = dropSlotOnClose;
this.statusUpdateInterval = statusUpdateInterval;
@ -116,6 +132,8 @@ private ServerInfo.ReplicationSlot getSlotInfo() throws SQLException, Interrupte
}
protected void initPublication() {
String createPublicationStmt;
String tableFilterString = null;
if (PostgresConnectorConfig.LogicalDecoder.PGOUTPUT.equals(plugin)) {
LOGGER.info("Initializing PgOutput logical decoder publication");
try {
@ -125,12 +143,31 @@ protected void initPublication() {
Long count = rs.getLong(1);
if (count == 0L) {
LOGGER.info("Creating new publication '{}' for plugin '{}'", publicationName, plugin);
// Publication doesn't exist, create it.
// todo: DBZ-766 - Change this to be restricted based on configured whitelist tables?
// For situations where no publication exists, we likely cannot create it for all tables.
// This is because postgres requires certain super user permissions to use "ALL TABLES".
// We should restrict this to the configured tables here.
stmt.execute(String.format("CREATE PUBLICATION %s FOR ALL TABLES;", publicationName));
switch (publicationAutocreateMode) {
case DISABLED:
throw new ConnectException("Publication autocreation is disabled, please create one and restart the connector.");
case ALL_TABLES:
createPublicationStmt = String.format("CREATE PUBLICATION %s FOR ALL TABLES;", publicationName);
LOGGER.info("Creating Publication with statement '{}'", createPublicationStmt);
// Publication doesn't exist, create it.
stmt.execute(createPublicationStmt);
break;
case FILTERED:
try {
Set<TableId> tablesToCapture = determineCapturedTables();
tableFilterString = tablesToCapture.stream().map(TableId::toDoubleQuotedString).collect(Collectors.joining(", "));
createPublicationStmt = String.format("CREATE PUBLICATION %s FOR TABLE %s;", publicationName, tableFilterString);
LOGGER.info("Creating Publication with statement '{}'", createPublicationStmt);
// Publication doesn't exist, create it but restrict to the tableFilter.
stmt.execute(createPublicationStmt);
}
catch (Exception e) {
LOGGER.error("Unable to create publication for {}", publicationName);
throw new ConnectException(String.format("Unable to create filtered publication %s for %s", publicationName, tableFilterString),
e);
}
break;
}
}
else {
LOGGER.trace(
@ -151,6 +188,34 @@ protected void initPublication() {
}
}
private Stream<TableId> toTableIds(Set<TableId> tableIds, Pattern pattern) {
return tableIds
.stream()
.filter(tid -> pattern.asPredicate().test(tid.toString()))
.sorted();
}
private Set<TableId> determineCapturedTables() throws Exception {
Set<TableId> allTableIds = this.connect().readTableNames(pgConnection().getCatalog(), null, null, new String[]{ "TABLE" });
Set<TableId> capturedTables = new HashSet<>();
for (TableId tableId : allTableIds) {
if (tableFilter.dataCollectionFilter().isIncluded(tableId)) {
LOGGER.trace("Adding table {} to the list of captured tables", tableId);
capturedTables.add(tableId);
}
else {
LOGGER.trace("Ignoring table {} as it's not included in the filter configuration", tableId);
}
}
return capturedTables
.stream()
.sorted()
.collect(Collectors.toCollection(LinkedHashSet::new));
}
protected void initReplicationSlot() throws SQLException, InterruptedException {
ServerInfo.ReplicationSlot slotInfo = getSlotInfo();
@ -211,13 +276,14 @@ private boolean useTemporarySlot() throws SQLException {
* b. the slot isn't currently being used
* 2. we query to get our potential start position in the slot (lsn)
* 3. we try and start streaming, depending on our options (such as in wal2json)
* this may fail, which can result in the connection being killed and we need to start
* the process over if we are using a temporary slot
* this may fail, which can result in the connection being killed and we need to start
* the process over if we are using a temporary slot
* 4. actually start the streamer
*
* <p>
* This method takes care of all of these and this method queries for a default starting position
* If you know where you are starting from you should call {@link #startStreaming(Long)}, this method
* delegates to that method
*
* @return
* @throws SQLException
* @throws InterruptedException
@ -549,6 +615,8 @@ protected static class ReplicationConnectionBuilder implements Builder {
private final Configuration config;
private String slotName = DEFAULT_SLOT_NAME;
private String publicationName = DEFAULT_PUBLICATION_NAME;
private RelationalTableFilters tableFilter;
private PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode = PostgresConnectorConfig.AutoCreateMode.ALL_TABLES;
private PostgresConnectorConfig.LogicalDecoder plugin = PostgresConnectorConfig.LogicalDecoder.DECODERBUFS;
private boolean dropSlotOnClose = DEFAULT_DROP_SLOT_ON_CLOSE;
private Duration statusUpdateIntervalVal;
@ -576,6 +644,19 @@ public Builder withPublication(String publicationName) {
return this;
}
public Builder withTableFilter(RelationalTableFilters tableFilter) {
assert tableFilter != null;
this.tableFilter = tableFilter;
return this;
}
@Override
public Builder withPublicationAutocreateMode(PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode) {
assert publicationName != null;
this.publicationAutocreateMode = publicationAutocreateMode;
return this;
}
@Override
public ReplicationConnectionBuilder withPlugin(final PostgresConnectorConfig.LogicalDecoder plugin) {
assert plugin != null;
@ -622,7 +703,7 @@ public Builder exportSnapshotOnCreate(boolean exportSnapshot) {
@Override
public ReplicationConnection build() {
assert plugin != null : "Decoding plugin name is not set";
return new PostgresReplicationConnection(config, slotName, publicationName, plugin, dropSlotOnClose, exportSnapshot,
return new PostgresReplicationConnection(config, slotName, publicationName, tableFilter, publicationAutocreateMode, plugin, dropSlotOnClose, exportSnapshot,
statusUpdateIntervalVal, typeRegistry, slotStreamParams, schema);
}

View File

@ -19,6 +19,7 @@
import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.TypeRegistry;
import io.debezium.connector.postgresql.spi.SlotCreationResult;
import io.debezium.relational.RelationalTableFilters;
/**
* A Postgres logical streaming replication connection. Replication connections are established for a slot and a given plugin
@ -133,6 +134,24 @@ interface Builder {
*/
Builder withPublication(final String publicationName);
/**
* Sets the publication tables to watch for the PG logical publication
*
* @param tableFilter the configured table filters
* @return this instance
* @see #config.getTableFilters()
*/
Builder withTableFilter(final RelationalTableFilters tableFilter);
/**
* Sets the publication autocreate mode for the PG logical publication
*
* @param publicationAutocreateMode the name of the publication, may not be null.
* @return this instance
* @see #PostgresConnectorConfig.PublicationAutocreateMode.ALL_TABLES
*/
Builder withPublicationAutocreateMode(final PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode);
/**
* Sets the instance for the PG logical decoding plugin
*

View File

@ -11,6 +11,7 @@
import static io.debezium.junit.EqualityCheck.LESS_THAN;
import static junit.framework.TestCase.assertEquals;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -40,6 +41,7 @@
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ComparisonFailure;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
@ -1628,6 +1630,148 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception {
stopConnector();
}
@Test
@FixFor("DBZ-1813")
@SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Publication configuration only valid for PGOUTPUT decoder")
public void shouldConfigureSubscriptionsForAllTablesByDefault() throws Exception {
// This captures all logged messages, allowing us to verify log message was written.
final LogInterceptor logInterceptor = new LogInterceptor();
TestHelper.dropAllSchemas();
TestHelper.dropPublication("cdc");
TestHelper.executeDDL("postgres_create_tables.ddl");
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.PUBLICATION_NAME, "cdc");
start(PostgresConnector.class, configBuilder.build());
assertConnectorIsRunning();
waitForAvailableRecords(100, TimeUnit.MILLISECONDS);
stopConnector(value -> assertTrue(
logInterceptor.containsMessage("Creating Publication with statement 'CREATE PUBLICATION cdc FOR ALL TABLES;'") &&
logInterceptor.containsMessage("Creating new publication 'cdc' for plugin 'PGOUTPUT'")));
assertTrue(TestHelper.publicationExists("cdc"));
}
@Test
@FixFor("DBZ-1813")
@SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Publication configuration only valid for PGOUTPUT decoder")
public void shouldConfigureSubscriptionsFromTableFilters() throws Exception {
// This captures all logged messages, allowing us to verify log message was written.
final LogInterceptor logInterceptor = new LogInterceptor();
TestHelper.dropAllSchemas();
TestHelper.dropPublication("cdc");
TestHelper.executeDDL("postgres_create_tables.ddl");
TestHelper.execute(SETUP_TABLES_STMT);
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.PUBLICATION_NAME, "cdc")
.with(PostgresConnectorConfig.TABLE_WHITELIST, "public.numeric_table,public.text_table,s1.a,s2.a")
.with(PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE, PostgresConnectorConfig.AutoCreateMode.FILTERED.getValue());
start(PostgresConnector.class, configBuilder.build());
assertConnectorIsRunning();
waitForAvailableRecords(100, TimeUnit.MILLISECONDS);
// check the records from the snapshot
assertRecordsFromSnapshot(2, 1, 1);
// insert 2 new records
TestHelper.execute(INSERT_STMT);
assertRecordsAfterInsert(2, 2, 2);
final long recordsCount = 10;
final int batchSize = 100;
batchInsertRecords(recordsCount, batchSize);
CompletableFuture.runAsync(() -> consumeRecords(recordsCount))
.exceptionally(throwable -> {
throw new RuntimeException(throwable);
}).get();
stopConnector(value -> {
assertTrue(
logInterceptor.containsMessage(
"Creating Publication with statement 'CREATE PUBLICATION cdc FOR TABLE \"public\".\"numeric_table\", \"public\".\"text_table\", \"s1\".\"a\", \"s2\".\"a\";'")
&&
logInterceptor.containsMessage("Creating new publication 'cdc' for plugin 'PGOUTPUT'"));
});
assertTrue(TestHelper.publicationExists("cdc"));
}
@Test
@FixFor("DBZ-1813")
@SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Publication configuration only valid for PGOUTPUT decoder")
public void shouldThrowWhenAutocreationIsDisabled() throws Exception {
TestHelper.dropAllSchemas();
TestHelper.dropPublication("cdc");
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SLOT_NAME, "cdc")
.with(PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE, PostgresConnectorConfig.AutoCreateMode.DISABLED.getValue());
EmbeddedEngine.CompletionCallback cb = new EmbeddedEngine.CompletionCallback() {
@Override
public void handle(boolean success, String message, Throwable error) {
assertEquals(error.getClass(), ConnectException.class);
assertEquals(error.getMessage(), "Publication autocreation is disabled, please create one and restart the connector.");
}
};
start(PostgresConnector.class, configBuilder.build(), cb);
waitForAvailableRecords(100, TimeUnit.MILLISECONDS);
stopConnector();
assertFalse(TestHelper.publicationExists("cdc"));
}
@Test
@FixFor("DBZ-1813")
@SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Publication configuration only valid for PGOUTPUT decoder")
public void shouldProduceMessagesOnlyForConfiguredTables() throws Exception {
final long recordsCount = 10;
final int batchSize = 100;
TestHelper.dropAllSchemas();
TestHelper.dropPublication("cdc");
TestHelper.executeDDL("postgres_create_tables.ddl");
TestHelper.execute(SETUP_TABLES_STMT);
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.PUBLICATION_NAME, "cdc")
.with(PostgresConnectorConfig.TABLE_WHITELIST, "public.text_table")
.with(PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE, PostgresConnectorConfig.AutoCreateMode.FILTERED.getValue());
start(PostgresConnector.class, configBuilder.build());
assertConnectorIsRunning();
waitForAvailableRecords(100, TimeUnit.MILLISECONDS);
// insert records to a watched table
batchInsertRecords(recordsCount, batchSize);
CompletableFuture.runAsync(() -> consumeRecords(recordsCount))
.exceptionally(throwable -> {
throw new RuntimeException(throwable);
}).get();
// assert that inserts on unwatched tables are not emitted.
try {
// check the records from the snapshot
assertRecordsFromSnapshot(2, 1, 1);
// insert 2 new records
TestHelper.execute(INSERT_STMT);
assertRecordsAfterInsert(2, 2, 2);
fail("Insert events should be ignored");
}
catch (ComparisonFailure e) {
assertThat(e).hasMessage("expected:<[2]> but was:<[0]>");
}
stopConnector();
}
private CompletableFuture<Void> batchInsertRecords(long recordsCount, int batchSize) {
String insertStmt = "INSERT INTO text_table(j, jb, x, u) " +
"VALUES ('{\"bar\": \"baz\"}'::json, '{\"bar\": \"baz\"}'::jsonb, " +

View File

@ -1892,6 +1892,19 @@ See the {link-prefix}:{link-postgresql-connector}#postgresql-data-types[list of
Each item (regular expression) must match the fully-qualified `<fully-qualified table>:<a comma-separated list of columns>` representing the custom key. +
Fully-qualified tables could be defined as _schemaName_._tableName_.
|[[postgresql-publication-autocreate-mode]]<<postgresql-publication-autocreate-mode, `publication.autocreate.mode`>>
|_all_tables_
|Applies only when streaming changes using https://www.postgresql.org/docs/current/sql-createpublication.html[pgoutput].
Determine how creation of a https://www.postgresql.org/docs/current/logical-replication-publication.html[publication] should work, the default is all_tables.
_disabled_ - The connector will not attempt to create a publication at all. The expectation is
that the user has created the publication up-front. If the publication isn't found to exist upon
startup, the connector will throw an exception and stop.
_all_tables_ - If no publication exists, the connector will create a new publication for all tables. Note this requires
that the configured user has access. If the publication already exists, it will be used. i.e `CREATE PUBLICATION <publication_name> FOR ALL TABLES;`
_filtered_ - If no publication exists, the connector will create a new publication for all those tables matching the
current filter configuration (see table/database whitelist/blacklist properties). If the publication already exists,
it will be used. i.e `CREATE PUBLICATION <publication_name> FOR TABLE <tbl1, tbl2, etc>`
|===