DBZ-6112 Set Replica Identity for PostgreSQL tables

Introduces a new PostgreSQL configuration parameter called `replica.autoset.type`. With this parameter, you can easily specify the Replica Identity value for each table that is captured by the connector
This commit is contained in:
miguelbirdie 2023-02-23 16:51:21 +01:00 committed by Jiri Pechanec
parent bd097117b9
commit 63cc36e9a2
16 changed files with 753 additions and 51 deletions

View File

@ -515,4 +515,6 @@ Moustapha Mahfoud
Đỗ Ngọc Sơn Đỗ Ngọc Sơn
RJ Nowling RJ Nowling
蔡灿材 蔡灿材
ddsr-ops ddsr-ops
Ben White
Miguel Angel Sotomayor

View File

@ -10,6 +10,7 @@
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Importance;
@ -612,6 +613,26 @@ public static AutoCreateMode parse(String value, String defaultValue) {
"the current filter configuration (see table/database include/exclude list properties). If the publication already" + "the current filter configuration (see table/database include/exclude list properties). If the publication already" +
" exists, it will be used. i.e CREATE PUBLICATION <publication_name> FOR TABLE <tbl1, tbl2, etc>"); " exists, it will be used. i.e CREATE PUBLICATION <publication_name> FOR TABLE <tbl1, tbl2, etc>");
public static final Field REPLICA_IDENTITY_AUTOSET_VALUES = Field.create("replica.identity.autoset.values")
.withDisplayName("Replica Identity Auto Set Values")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_REPLICATION, 10))
.withWidth(Width.LONG)
.withImportance(Importance.MEDIUM)
.withValidation(PostgresConnectorConfig::validateReplicaAutoSetField)
.withDescription(
"Applies only when streaming changes using pgoutput." +
"Determines the value for Replica Identity at table level. This option will overwrite the existing value in database" +
"A comma-separated list of regular expressions that match fully-qualified tables and Replica Identity value to be used in the table. " +
"Each expression must match the pattern '<fully-qualified table name>:<replica identity>', " +
"where the table names could be defined as (SCHEMA_NAME.TABLE_NAME), " +
"and the replica identity values are: " +
"DEFAULT - Records the old values of the columns of the primary key, if any. This is the default for non-system tables." +
"INDEX index_name - Records the old values of the columns covered by the named index, that must be unique, not partial, not deferrable, " +
"and include only columns marked NOT NULL. If this index is dropped, the behavior is the same as NOTHING." +
"FULL - Records the old values of all columns in the row." +
"NOTHING - Records no information about the old row. This is the default for system tables.");
public static final Field STREAM_PARAMS = Field.create("slot.stream.params") public static final Field STREAM_PARAMS = Field.create("slot.stream.params")
.withDisplayName("Optional parameters to pass to the logical decoder when the stream is started.") .withDisplayName("Optional parameters to pass to the logical decoder when the stream is started.")
.withType(Type.STRING) .withType(Type.STRING)
@ -877,6 +898,7 @@ public static AutoCreateMode parse(String value, String defaultValue) {
private final SnapshotMode snapshotMode; private final SnapshotMode snapshotMode;
private final SchemaRefreshMode schemaRefreshMode; private final SchemaRefreshMode schemaRefreshMode;
private final boolean flushLsnOnSource; private final boolean flushLsnOnSource;
private final ReplicaIdentityMapper replicaIdentityMapper;
public PostgresConnectorConfig(Configuration config) { public PostgresConnectorConfig(Configuration config) {
super( super(
@ -895,6 +917,8 @@ public PostgresConnectorConfig(Configuration config) {
this.snapshotMode = SnapshotMode.parse(config.getString(SNAPSHOT_MODE)); this.snapshotMode = SnapshotMode.parse(config.getString(SNAPSHOT_MODE));
this.schemaRefreshMode = SchemaRefreshMode.parse(config.getString(SCHEMA_REFRESH_MODE)); this.schemaRefreshMode = SchemaRefreshMode.parse(config.getString(SCHEMA_REFRESH_MODE));
this.flushLsnOnSource = config.getBoolean(SHOULD_FLUSH_LSN_IN_SOURCE_DB); this.flushLsnOnSource = config.getBoolean(SHOULD_FLUSH_LSN_IN_SOURCE_DB);
final var replicaIdentityMapping = config.getString(REPLICA_IDENTITY_AUTOSET_VALUES);
this.replicaIdentityMapper = (replicaIdentityMapping != null) ? new ReplicaIdentityMapper(replicaIdentityMapping) : null;
} }
protected String hostname() { protected String hostname() {
@ -994,6 +1018,10 @@ public byte[] getUnavailableValuePlaceholder() {
return placeholder.getBytes(); return placeholder.getBytes();
} }
public Optional<ReplicaIdentityMapper> replicaIdentityMapper() {
return Optional.ofNullable(this.replicaIdentityMapper);
}
protected int moneyFractionDigits() { protected int moneyFractionDigits() {
return getConfig().getInteger(MONEY_FRACTION_DIGITS); return getConfig().getInteger(MONEY_FRACTION_DIGITS);
} }
@ -1016,6 +1044,7 @@ protected SourceInfoStructMaker<? extends AbstractSourceInfo> getSourceInfoStruc
SLOT_NAME, SLOT_NAME,
PUBLICATION_NAME, PUBLICATION_NAME,
PUBLICATION_AUTOCREATE_MODE, PUBLICATION_AUTOCREATE_MODE,
REPLICA_IDENTITY_AUTOSET_VALUES,
DROP_SLOT_ON_STOP, DROP_SLOT_ON_STOP,
STREAM_PARAMS, STREAM_PARAMS,
ON_CONNECT_STATEMENTS, ON_CONNECT_STATEMENTS,
@ -1091,6 +1120,26 @@ private static int validateFlushLsnSource(Configuration config, Field field, Fie
return 0; return 0;
} }
protected static int validateReplicaAutoSetField(Configuration config, Field field, Field.ValidationOutput problems) {
String replica_autoset_values = config.getString(PostgresConnectorConfig.REPLICA_IDENTITY_AUTOSET_VALUES);
int problemCount = 0;
if (replica_autoset_values != null) {
if (replica_autoset_values.isEmpty()) {
problems.accept(PostgresConnectorConfig.REPLICA_IDENTITY_AUTOSET_VALUES, "", "Must not be empty");
}
for (String substring : ReplicaIdentityMapper.PATTERN_SPLIT.split(replica_autoset_values)) {
if (!ReplicaIdentityMapper.REPLICA_AUTO_SET_PATTERN.asPredicate().test(substring)) {
problems.accept(PostgresConnectorConfig.REPLICA_IDENTITY_AUTOSET_VALUES, substring,
substring + " has an invalid format (expecting '" + ReplicaIdentityMapper.REPLICA_AUTO_SET_PATTERN.pattern() + "')");
problemCount++;
}
}
}
return problemCount;
}
@Override @Override
public String getContextName() { public String getContextName() {
return Module.contextName(); return Module.contextName();

View File

@ -21,7 +21,7 @@
import io.debezium.annotation.NotThreadSafe; import io.debezium.annotation.NotThreadSafe;
import io.debezium.connector.postgresql.connection.PostgresConnection; import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.PostgresDefaultValueConverter; import io.debezium.connector.postgresql.connection.PostgresDefaultValueConverter;
import io.debezium.connector.postgresql.connection.ServerInfo; import io.debezium.connector.postgresql.connection.ReplicaIdentityInfo;
import io.debezium.jdbc.JdbcConnection; import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.RelationalDatabaseSchema; import io.debezium.relational.RelationalDatabaseSchema;
import io.debezium.relational.Table; import io.debezium.relational.Table;
@ -96,7 +96,7 @@ protected PostgresSchema refresh(PostgresConnection connection, boolean printRep
private void printReplicaIdentityInfo(PostgresConnection connection, TableId tableId) { private void printReplicaIdentityInfo(PostgresConnection connection, TableId tableId) {
try { try {
ServerInfo.ReplicaIdentity replicaIdentity = connection.readReplicaIdentityInfo(tableId); ReplicaIdentityInfo replicaIdentity = connection.readReplicaIdentityInfo(tableId);
LOGGER.info("REPLICA IDENTITY for '{}' is '{}'; {}", tableId, replicaIdentity, replicaIdentity.description()); LOGGER.info("REPLICA IDENTITY for '{}' is '{}'; {}", tableId, replicaIdentity, replicaIdentity.description());
} }
catch (SQLException e) { catch (SQLException e) {

View File

@ -0,0 +1,81 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import io.debezium.DebeziumException;
import io.debezium.annotation.Immutable;
import io.debezium.connector.postgresql.connection.ReplicaIdentityInfo;
import io.debezium.function.Predicates;
import io.debezium.relational.TableId;
/**
* Class that records Replica Identity information for the {@link PostgresConnector}
* @author Ben White, Miguel Sotomayor
*/
@Immutable
public class ReplicaIdentityMapper {
public static final Pattern REPLICA_AUTO_SET_PATTERN = Pattern
.compile("(?i)^\\s*(?<tablepredicate>[^\\s:]+):(?<replicaidentity>DEFAULT|(INDEX) (?<indexname>.\\w*)|FULL|NOTHING)\\s*$");
public static final Pattern PATTERN_SPLIT = Pattern.compile(",");
private Map<Predicate<TableId>, ReplicaIdentityInfo> replicaIdentityPredicateMap;
private final String replicaAutoSetValue;
public ReplicaIdentityMapper(String replicaAutoSetValue) {
this.replicaAutoSetValue = replicaAutoSetValue;
}
/**
* This method get the Replica Identity of the {@link TableId}, checking if the {@link TableId} is contained in the {@link Map} of {@link Predicates} stored
* in {@link ReplicaIdentityMapper#replicaIdentityPredicateMap}
*
* @param tableId the identifier of the table
* @return {@link ReplicaIdentityInfo} of the {@link TableId}
* @throws DebeziumException if there is a problem obtaining the replica identity for the given table
*/
public Optional<ReplicaIdentityInfo> findReplicaIdentity(TableId tableId) throws DebeziumException {
if (replicaIdentityPredicateMap == null) {
this.replicaIdentityPredicateMap = this.getReplicaIdentityPredicateMap();
}
return this.replicaIdentityPredicateMap
.entrySet()
.stream()
.filter(item -> item.getKey().test(tableId))
.map(Map.Entry::getValue)
.reduce((a, b) -> { // If the map has only one entry, then the reduction will only happen once, and the final result will be the single entry.
throw new DebeziumException(String.format("More than one Regular expressions matched table %s", tableId));
});
}
/**
* This method parses the property `replica.identity.autoset.values` stored in {@link ReplicaIdentityMapper#replicaAutoSetValue} attribute
* generating a map collection with the Table and its respective Replica Identity
*
* @return {@link Map} collection that contains {@link TableId} as Key and {@link ReplicaIdentityInfo} as value
*/
private Map<Predicate<TableId>, ReplicaIdentityInfo> getReplicaIdentityPredicateMap() {
if (replicaAutoSetValue == null) {
return Collections.emptyMap();
}
return Arrays.stream(PATTERN_SPLIT.split(replicaAutoSetValue))
.map(REPLICA_AUTO_SET_PATTERN::matcher)
.filter(Matcher::matches)
.collect(Collectors.toMap(
t -> Predicates.includes(t.group("tablepredicate"), TableId::toString),
t -> new ReplicaIdentityInfo(ReplicaIdentityInfo.ReplicaIdentity.valueOf(
t.group("replicaidentity").replaceAll("\\s.*", "")), t.group("indexname"))));
}
}

View File

@ -163,7 +163,8 @@ public String connectionString() {
* @return the replica identity information; never null * @return the replica identity information; never null
* @throws SQLException if there is a problem obtaining the replica identity information for the given table * @throws SQLException if there is a problem obtaining the replica identity information for the given table
*/ */
public ServerInfo.ReplicaIdentity readReplicaIdentityInfo(TableId tableId) throws SQLException { @VisibleForTesting
public ReplicaIdentityInfo readReplicaIdentityInfo(TableId tableId) throws SQLException {
String statement = "SELECT relreplident FROM pg_catalog.pg_class c " + String statement = "SELECT relreplident FROM pg_catalog.pg_class c " +
"LEFT JOIN pg_catalog.pg_namespace n ON c.relnamespace=n.oid " + "LEFT JOIN pg_catalog.pg_namespace n ON c.relnamespace=n.oid " +
"WHERE n.nspname=? and c.relname=?"; "WHERE n.nspname=? and c.relname=?";
@ -180,7 +181,65 @@ public ServerInfo.ReplicaIdentity readReplicaIdentityInfo(TableId tableId) throw
LOGGER.warn("Cannot determine REPLICA IDENTITY information for table '{}'", tableId); LOGGER.warn("Cannot determine REPLICA IDENTITY information for table '{}'", tableId);
} }
}); });
return ServerInfo.ReplicaIdentity.parseFromDB(replIdentity.toString()); return new ReplicaIdentityInfo(ReplicaIdentityInfo.ReplicaIdentity.parseFromDB(replIdentity.toString()));
}
/**
* This query retrieves information about the INDEX as long as replica identity is configure USING INDEX
*
* @param tableId the identifier of the table
* @return Index name linked to replica identity; never null
* @throws SQLException if there is a problem obtaining the replica identity and index information for the given table
*/
@VisibleForTesting
public String readIndexOfReplicaIdentity(TableId tableId) throws SQLException {
String statement = "with rel_index as (" +
"select split_part(indexrelid::regclass::text, '.', 1) as index_schema, split_part(indexrelid::regclass::text, '.', 2) as index_name " +
"from pg_catalog.pg_index " +
"where indisreplident " +
") " +
"SELECT i.index_name " +
"FROM pg_catalog.pg_class c " +
" LEFT JOIN pg_catalog.pg_namespace n ON c.relnamespace=n.oid " +
" LEFT join rel_index i on n.nspname = i.index_schema " +
"WHERE n.nspname=? and c.relname=?";
String schema = tableId.schema() != null && tableId.schema().length() > 0 ? tableId.schema() : "public";
StringBuilder indexName = new StringBuilder();
prepareQuery(statement, stmt -> {
stmt.setString(1, schema);
stmt.setString(2, tableId.table());
}, rs -> {
if (rs.next()) {
indexName.append(rs.getString(1));
}
else {
LOGGER.warn("Cannot determine index linked to REPLICA IDENTITY for table '{}'", tableId);
}
});
return indexName.toString();
}
/**
* Update REPLICA IDENTITY status of a table.
* This in turn determines how much information is available for UPDATE and DELETE operations for logical replication.
*
* @param tableId the identifier of the table
* @param replicaIdentityValue Replica Identity value
*/
public void setReplicaIdentityForTable(TableId tableId, ReplicaIdentityInfo replicaIdentityValue) {
try {
LOGGER.debug("Updating Replica Identity '{}'", tableId.table());
execute(String.format("ALTER TABLE %s REPLICA IDENTITY %s;", tableId, replicaIdentityValue));
}
catch (SQLException e) {
if (e.getSQLState().equals("42501")) {
LOGGER.error("Replica identity could not be updated because of lack of privileges", e);
}
else {
LOGGER.error("Unexpected error while attempting to alter Replica Identity", e);
}
}
} }
/** /**

View File

@ -40,6 +40,7 @@
import io.debezium.DebeziumException; import io.debezium.DebeziumException;
import io.debezium.connector.postgresql.PostgresConnectorConfig; import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresSchema; import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.ReplicaIdentityMapper;
import io.debezium.connector.postgresql.TypeRegistry; import io.debezium.connector.postgresql.TypeRegistry;
import io.debezium.connector.postgresql.spi.SlotCreationResult; import io.debezium.connector.postgresql.spi.SlotCreationResult;
import io.debezium.jdbc.JdbcConfiguration; import io.debezium.jdbc.JdbcConfiguration;
@ -77,6 +78,8 @@ public class PostgresReplicationConnection extends JdbcConnection implements Rep
private SlotCreationResult slotCreationInfo; private SlotCreationResult slotCreationInfo;
private boolean hasInitedSlot; private boolean hasInitedSlot;
private Optional<ReplicaIdentityMapper> replicaIdentityMapper;
/** /**
* Creates a new replication connection with the given params. * Creates a new replication connection with the given params.
* *
@ -92,8 +95,6 @@ public class PostgresReplicationConnection extends JdbcConnection implements Rep
* @param typeRegistry registry with PostgreSQL types * @param typeRegistry registry with PostgreSQL types
* @param streamParams additional parameters to pass to the replication stream * @param streamParams additional parameters to pass to the replication stream
* @param schema the schema; must not be null * @param schema the schema; must not be null
* <p>
* updates to the server
*/ */
private PostgresReplicationConnection(PostgresConnectorConfig config, private PostgresReplicationConnection(PostgresConnectorConfig config,
String slotName, String slotName,
@ -123,6 +124,7 @@ private PostgresReplicationConnection(PostgresConnectorConfig config,
this.streamParams = streamParams; this.streamParams = streamParams;
this.slotCreationInfo = null; this.slotCreationInfo = null;
this.hasInitedSlot = false; this.hasInitedSlot = false;
this.replicaIdentityMapper = config.replicaIdentityMapper();
} }
private static JdbcConfiguration addDefaultSettings(JdbcConfiguration configuration) { private static JdbcConfiguration addDefaultSettings(JdbcConfiguration configuration) {
@ -215,6 +217,65 @@ private void createOrUpdatePublicationModeFilterted(String tableFilterString, St
} }
} }
/**
* Check all tables captured by the connector, contained in {@link Set<TableId>} from {@link PostgresReplicationConnection#determineCapturedTables()}.
* Updating Replica Identity in PostgreSQL database based on {@link PostgresConnectorConfig#REPLICA_IDENTITY_AUTOSET_VALUES} configuration parameter
* for each {@link TableId}
*
* @throws Exception
*/
private void initReplicaIdentity() {
if (this.replicaIdentityMapper.isPresent()) {
LOGGER.info("Updating Replica Identity");
Set<TableId> tablesCaptured;
try {
tablesCaptured = determineCapturedTables();
}
catch (Exception e) {
throw new DebeziumException("Unable to get Captured tables", e);
}
tablesCaptured.forEach(tableId -> {
try {
Optional<ReplicaIdentityInfo> newReplicaIdentity = this.replicaIdentityMapper
.get()
.findReplicaIdentity(tableId);
if (newReplicaIdentity.isPresent()) {
ReplicaIdentityInfo currentReplicaIdentity = null;
try {
currentReplicaIdentity = jdbcConnection.readReplicaIdentityInfo(tableId);
if (currentReplicaIdentity.getReplicaIdentity() == ReplicaIdentityInfo.ReplicaIdentity.INDEX) {
currentReplicaIdentity.setIndexName(jdbcConnection.readIndexOfReplicaIdentity(tableId));
}
}
catch (SQLException e) {
LOGGER.error("Cannot determine REPLICA IDENTITY information for table {}", tableId);
}
if (currentReplicaIdentity != null
&& !currentReplicaIdentity.toString().equals(newReplicaIdentity.get().toString())) {
jdbcConnection.setReplicaIdentityForTable(tableId, newReplicaIdentity.get());
LOGGER.info("Replica identity set to {} for table '{}'",
newReplicaIdentity.get(), tableId);
}
else {
LOGGER.info("Replica identity for table '{}' is already {}",
tableId, currentReplicaIdentity);
}
}
else {
LOGGER.debug(
"Replica identity for table '{}' will not be updated because Replica Identity is not defined on REPLICA_IDENTITY_AUTOSET_VALUES property",
tableId);
}
}
catch (Exception e) {
LOGGER.error("Unable to update Replica Identity for table {}", tableId, e);
}
});
}
}
private Set<TableId> determineCapturedTables() throws Exception { private Set<TableId> determineCapturedTables() throws Exception {
Set<TableId> allTableIds = jdbcConnection.getAllTableIds(connectorConfig.databaseName()); Set<TableId> allTableIds = jdbcConnection.getAllTableIds(connectorConfig.databaseName());
@ -407,6 +468,9 @@ public void initConnection() throws SQLException, InterruptedException {
// See https://www.postgresql.org/docs/current/logical-replication-quick-setup.html // See https://www.postgresql.org/docs/current/logical-replication-quick-setup.html
// For pgoutput specifically, the publication must be created before the slot. // For pgoutput specifically, the publication must be created before the slot.
initPublication(); initPublication();
initReplicaIdentity();
if (!hasInitedSlot) { if (!hasInitedSlot) {
initReplicationSlot(); initReplicationSlot();
} }

View File

@ -0,0 +1,85 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.connection;
/**
* Class to encapsulate {@link ReplicaIdentityInfo.ReplicaIdentity} enumerator, adding the name of the index
* in case of the Replica Identity was set to `INDEX`
* @author Ben White, Miguel Sotomayor
*/
public class ReplicaIdentityInfo {
private String indexName;
private final ReplicaIdentity replicaIdentity;
public ReplicaIdentityInfo(ReplicaIdentity replicaIdentity) {
this.replicaIdentity = replicaIdentity;
this.indexName = null;
}
public ReplicaIdentityInfo(ReplicaIdentity replicaIdentity, String indexName) {
this(replicaIdentity);
this.indexName = indexName;
}
public void setIndexName(String indexName) {
this.indexName = indexName;
}
public String description() {
return this.replicaIdentity.description();
}
public ReplicaIdentity getReplicaIdentity() {
return replicaIdentity;
}
@Override
public String toString() {
return indexName != null ? String.format("USING INDEX %s", indexName) : this.replicaIdentity.name();
}
/**
* Table REPLICA IDENTITY information.
*/
public enum ReplicaIdentity {
NOTHING("UPDATE and DELETE events will not contain any old values"),
FULL("UPDATE AND DELETE events will contain the previous values of all the columns"),
DEFAULT("UPDATE and DELETE events will contain previous values only for PK columns"),
INDEX("UPDATE and DELETE events will contain previous values only for columns present in the REPLICA IDENTITY index"),
UNKNOWN("Unknown REPLICA IDENTITY");
private final String description;
/**
* Returns a textual description of the replica identity
*
* @return a description, never null
*/
public String description() {
return this.description;
}
ReplicaIdentity(String description) {
this.description = description;
}
static ReplicaIdentityInfo.ReplicaIdentity parseFromDB(String s) {
switch (s) {
case "n":
return NOTHING;
case "d":
return DEFAULT;
case "i":
return INDEX;
case "f":
return FULL;
default:
return UNKNOWN;
}
}
}
}

View File

@ -95,48 +95,6 @@ public String toString() {
return "user '" + username + "' connected to database '" + database + "' on " + server + " with roles:" + lineSeparator + roles; return "user '" + username + "' connected to database '" + database + "' on " + server + " with roles:" + lineSeparator + roles;
} }
/**
* Table REPLICA IDENTITY information.
*/
public enum ReplicaIdentity {
NOTHING("UPDATE and DELETE events will not contain any old values"),
FULL("UPDATE AND DELETE events will contain the previous values of all the columns"),
DEFAULT("UPDATE and DELETE events will contain previous values only for PK columns"),
INDEX("UPDATE and DELETE events will contain previous values only for columns present in the REPLICA IDENTITY index"),
UNKNOWN("Unknown REPLICA IDENTITY");
private String description;
/**
* Returns a textual description of the replica identity
*
* @return a description, never null
*/
public String description() {
return this.description;
}
ReplicaIdentity(String description) {
this.description = description;
}
protected static ReplicaIdentity parseFromDB(String s) {
switch (s) {
case "n":
return NOTHING;
case "d":
return DEFAULT;
case "i":
return INDEX;
case "f":
return FULL;
default:
return UNKNOWN;
}
}
}
/** /**
* Information about a server replication slot * Information about a server replication slot
*/ */

View File

@ -5,11 +5,52 @@
*/ */
package io.debezium.connector.postgresql; package io.debezium.connector.postgresql;
import static org.assertj.core.api.Assertions.assertThat;
import org.junit.Test;
import io.debezium.config.ConfigDefinitionMetadataTest; import io.debezium.config.ConfigDefinitionMetadataTest;
import io.debezium.config.Configuration;
public class PostgresConnectorConfigDefTest extends ConfigDefinitionMetadataTest { public class PostgresConnectorConfigDefTest extends ConfigDefinitionMetadataTest {
public PostgresConnectorConfigDefTest() { public PostgresConnectorConfigDefTest() {
super(new PostgresConnector()); super(new PostgresConnector());
} }
@Test
public void shouldSetReplicaAutoSetValidValue() {
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.REPLICA_IDENTITY_AUTOSET_VALUES, "testSchema_1.testTable_1:FULL,testSchema_2.testTable_2:DEFAULT");
int problemCount = PostgresConnectorConfig.validateReplicaAutoSetField(
configBuilder.build(), PostgresConnectorConfig.REPLICA_IDENTITY_AUTOSET_VALUES, (field, value, problemMessage) -> System.out.println(problemMessage));
assertThat((problemCount == 0)).isTrue();
}
@Test
public void shouldSetReplicaAutoSetInvalidValue() {
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.REPLICA_IDENTITY_AUTOSET_VALUES, "testSchema_1.testTable_1;FULL,testSchema_2.testTable_2;;DEFAULT");
int problemCount = PostgresConnectorConfig.validateReplicaAutoSetField(
configBuilder.build(), PostgresConnectorConfig.REPLICA_IDENTITY_AUTOSET_VALUES, (field, value, problemMessage) -> System.out.println(problemMessage));
assertThat((problemCount == 2)).isTrue();
}
@Test
public void shouldSetReplicaAutoSetRegExValue() {
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.REPLICA_IDENTITY_AUTOSET_VALUES, ".*.test.*:FULL,testSchema_2.*:DEFAULT");
int problemCount = PostgresConnectorConfig.validateReplicaAutoSetField(
configBuilder.build(), PostgresConnectorConfig.REPLICA_IDENTITY_AUTOSET_VALUES, (field, value, problemMessage) -> System.out.println(problemMessage));
assertThat((problemCount == 0)).isTrue();
}
} }

View File

@ -73,6 +73,7 @@
import io.debezium.connector.postgresql.connection.AbstractMessageDecoder; import io.debezium.connector.postgresql.connection.AbstractMessageDecoder;
import io.debezium.connector.postgresql.connection.PostgresConnection; import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.PostgresReplicationConnection; import io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
import io.debezium.connector.postgresql.connection.ReplicaIdentityInfo;
import io.debezium.connector.postgresql.connection.ReplicationConnection; import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder; import io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder;
import io.debezium.connector.postgresql.junit.SkipTestDependingOnDecoderPluginNameRule; import io.debezium.connector.postgresql.junit.SkipTestDependingOnDecoderPluginNameRule;
@ -98,6 +99,7 @@
import io.debezium.relational.RelationalDatabaseConnectorConfig; import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.RelationalDatabaseSchema; import io.debezium.relational.RelationalDatabaseSchema;
import io.debezium.relational.RelationalSnapshotChangeEventSource; import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.schema.DatabaseSchema; import io.debezium.schema.DatabaseSchema;
import io.debezium.util.Strings; import io.debezium.util.Strings;
import io.debezium.util.Testing; import io.debezium.util.Testing;
@ -1078,6 +1080,225 @@ public void shouldRecoverFromRetriableException() throws Exception {
assertRecordsAfterInsert(2, 3, 3); assertRecordsAfterInsert(2, 3, 3);
} }
@Test
public void shouldUpdateReplicaIdentity() throws Exception {
// This captures all logged messages, allowing us to verify log message was written.
final LogInterceptor logInterceptor = new LogInterceptor(PostgresReplicationConnection.class);
String setupStmt = SETUP_TABLES_STMT;
TestHelper.execute(setupStmt);
Configuration config = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE)
.with(PostgresConnectorConfig.REPLICA_IDENTITY_AUTOSET_VALUES, "s1.a:FULL,s2.a:DEFAULT")
.build();
start(PostgresConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning();
// Waiting for Replica Identity is updated
waitForAvailableRecords(5, TimeUnit.SECONDS);
try (PostgresConnection connection = TestHelper.create()) {
TableId tableIds1 = new TableId("", "s1", "a");
TableId tableIds2 = new TableId("", "s2", "a");
assertEquals(ReplicaIdentityInfo.ReplicaIdentity.FULL, connection.readReplicaIdentityInfo(tableIds1).getReplicaIdentity());
assertEquals(ReplicaIdentityInfo.ReplicaIdentity.DEFAULT, connection.readReplicaIdentityInfo(tableIds2).getReplicaIdentity());
assertThat(logInterceptor.containsMessage(String.format("Replica identity set to FULL for table '%s'", tableIds1))).isTrue();
assertThat(logInterceptor.containsMessage(String.format("Replica identity for table '%s' is already DEFAULT", tableIds2))).isTrue();
}
}
@Test
public void shouldUpdateReplicaIdentityWithRegExp() throws Exception {
TestHelper.executeDDL("postgres_create_multiple_tables.ddl");
Configuration config = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE)
.with(PostgresConnectorConfig.REPLICA_IDENTITY_AUTOSET_VALUES, "(.*).a:FULL,s2.*:NOTHING")
.build();
start(PostgresConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning();
// Waiting for Replica Identity is updated
waitForAvailableRecords(5, TimeUnit.SECONDS);
try (PostgresConnection connection = TestHelper.create()) {
TableId tableIds1a = new TableId("", "s1", "a");
TableId tableIds2b = new TableId("", "s2", "b");
TableId tableIds2c = new TableId("", "s2", "c");
TableId tableIds3a = new TableId("", "s3", "a");
assertEquals(ReplicaIdentityInfo.ReplicaIdentity.FULL, connection.readReplicaIdentityInfo(tableIds1a).getReplicaIdentity());
assertEquals(ReplicaIdentityInfo.ReplicaIdentity.FULL, connection.readReplicaIdentityInfo(tableIds3a).getReplicaIdentity());
assertEquals(ReplicaIdentityInfo.ReplicaIdentity.NOTHING, connection.readReplicaIdentityInfo(tableIds2b).getReplicaIdentity());
assertEquals(ReplicaIdentityInfo.ReplicaIdentity.NOTHING, connection.readReplicaIdentityInfo(tableIds2c).getReplicaIdentity());
}
}
@Test
public void shouldNotUpdateReplicaIdentityWithRegExpDuplicated() throws Exception {
// This captures all logged messages, allowing us to verify log message was written.
final LogInterceptor logInterceptor = new LogInterceptor(PostgresReplicationConnection.class);
TestHelper.executeDDL("postgres_create_multiple_tables.ddl");
Configuration config = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE)
.with(PostgresConnectorConfig.REPLICA_IDENTITY_AUTOSET_VALUES, "s.*:FULL,s2.*:NOTHING")
.build();
start(PostgresConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning();
// Waiting for Replica Identity is updated
waitForAvailableRecords(10, TimeUnit.SECONDS);
assertThat(logInterceptor.containsErrorMessage("Unable to update Replica Identity for table s2.c")).isTrue();
assertThat(logInterceptor.containsStacktraceElement("More than one Regular expressions matched table s2.c")).isTrue();
assertThat(logInterceptor.containsErrorMessage("Unable to update Replica Identity for table s2.b")).isTrue();
assertThat(logInterceptor.containsStacktraceElement("More than one Regular expressions matched table s2.b")).isTrue();
}
@Test
public void shouldUpdateReplicaIdentityWithOneTable() throws Exception {
// This captures all logged messages, allowing us to verify log message was written.
final LogInterceptor logInterceptor = new LogInterceptor(PostgresReplicationConnection.class);
String setupStmt = SETUP_TABLES_STMT;
TestHelper.execute(setupStmt);
Configuration config = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE)
.with(PostgresConnectorConfig.REPLICA_IDENTITY_AUTOSET_VALUES, "s1.a:FULL")
.build();
start(PostgresConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning();
// Waiting for Replica Identity is updated
waitForAvailableRecords(5, TimeUnit.SECONDS);
try (PostgresConnection connection = TestHelper.create()) {
TableId tableIds1 = new TableId("", "s1", "a");
assertEquals(ReplicaIdentityInfo.ReplicaIdentity.FULL.toString(), connection.readReplicaIdentityInfo(tableIds1).toString());
assertThat(logInterceptor.containsMessage(String.format("Replica identity set to FULL for table '%s'", tableIds1))).isTrue();
}
}
@Test
public void shouldUpdateReplicaIdentityUsingIndex() throws Exception {
// This captures all logged messages, allowing us to verify log message was written.
final LogInterceptor logInterceptor = new LogInterceptor(PostgresReplicationConnection.class);
String setupStmt = SETUP_TABLES_STMT;
TestHelper.execute(setupStmt);
Configuration config = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE)
.with(PostgresConnectorConfig.REPLICA_IDENTITY_AUTOSET_VALUES, "s1.a:FULL,s2.a:INDEX a_pkey")
.build();
start(PostgresConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning();
// Waiting for Replica Identity is updated
waitForAvailableRecords(5, TimeUnit.SECONDS);
try (PostgresConnection connection = TestHelper.create()) {
TableId tableIds1 = new TableId("", "s1", "a");
TableId tableIds2 = new TableId("", "s2", "a");
String index_name = connection.readIndexOfReplicaIdentity(tableIds2);
ReplicaIdentityInfo replicaIdentityTable2 = connection.readReplicaIdentityInfo(tableIds2);
replicaIdentityTable2.setIndexName(index_name);
assertEquals(ReplicaIdentityInfo.ReplicaIdentity.FULL.toString(), connection.readReplicaIdentityInfo(tableIds1).toString());
ReplicaIdentityInfo replicaIdentityIndex = new ReplicaIdentityInfo(ReplicaIdentityInfo.ReplicaIdentity.INDEX, "a_pkey");
assertEquals(replicaIdentityIndex.toString(), replicaIdentityTable2.toString());
assertThat(logInterceptor.containsMessage(String.format("Replica identity set to FULL for table '%s'", tableIds1))).isTrue();
assertThat(logInterceptor.containsMessage(String.format("Replica identity set to USING INDEX %s for table '%s'", index_name, tableIds2))).isTrue();
}
}
@Test
public void shouldLogOwnershipErrorForReplicaIdentityUpdate() throws Exception {
// This captures all logged messages, allowing us to verify log message was written.
final LogInterceptor logInterceptor = new LogInterceptor(PostgresConnection.class);
TestHelper.executeDDL("postgres_create_role_specific_tables.ddl");
Configuration config = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE)
.with(PostgresConnectorConfig.REPLICA_IDENTITY_AUTOSET_VALUES, "s1.a:FULL,s2.a:DEFAULT")
.with(PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE, "DISABLED")
.with("database.user", "role_2")
.with("database.password", "role_2_pass")
.build();
start(PostgresConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning();
// Waiting for Replica Identity is updated
waitForAvailableRecords(5, TimeUnit.SECONDS);
assertThat(logInterceptor.containsMessage(String.format("Replica identity could not be updated because of lack of privileges"))).isTrue();
}
@Test
public void shouldCheckTablesToUpdateReplicaIdentityAreCaptured() throws Exception {
// This captures all logged messages, allowing us to verify log message was written.
final LogInterceptor logInterceptor = new LogInterceptor(PostgresReplicationConnection.class);
String setupStmt = SETUP_TABLES_STMT;
TestHelper.execute(setupStmt);
Configuration config = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE)
.with(PostgresConnectorConfig.REPLICA_IDENTITY_AUTOSET_VALUES, "s1.a:FULL,s2.b:DEFAULT")
.build();
start(PostgresConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning();
// Waiting for Replica Identity is updated
waitForAvailableRecords(5, TimeUnit.SECONDS);
try (PostgresConnection connection = TestHelper.create()) {
TableId tableIds1 = new TableId("", "s1", "a");
assertEquals(ReplicaIdentityInfo.ReplicaIdentity.FULL.toString(), connection.readReplicaIdentityInfo(tableIds1).toString());
assertThat(logInterceptor.containsMessage(String.format("Replica identity set to FULL for table '%s'", tableIds1))).isTrue();
assertThat(logInterceptor
.containsMessage(
"Replica identity for table 's2.a' will not be updated because Replica Identity is not defined on REPLICA_IDENTITY_AUTOSET_VALUES property"))
.isTrue();
}
}
@Test @Test
public void shouldTakeExcludeListFiltersIntoAccount() throws Exception { public void shouldTakeExcludeListFiltersIntoAccount() throws Exception {
String setupStmt = SETUP_TABLES_STMT + String setupStmt = SETUP_TABLES_STMT +

View File

@ -0,0 +1,47 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql;
import static org.junit.Assert.assertEquals;
import org.junit.Test;
import io.debezium.connector.postgresql.connection.ReplicaIdentityInfo;
import io.debezium.relational.TableId;
public class ReplicaIdentityTestMapperTest {
@Test
public void shouldSetReplicaAutoSetValidValue() {
TableId tableId1 = new TableId("", "testSchema_1", "testTable_1");
TableId tableId2 = new TableId("", "testSchema_2", "testTable_2");
String replicaAutosetTypeField = "testSchema_1.testTable_1:FULL,testSchema_2.testTable_2:DEFAULT";
ReplicaIdentityMapper replicaIdentityMapper = new ReplicaIdentityMapper(replicaAutosetTypeField);
assertEquals(replicaIdentityMapper.findReplicaIdentity(tableId1).get().toString(), ReplicaIdentityInfo.ReplicaIdentity.FULL.toString());
assertEquals(replicaIdentityMapper.findReplicaIdentity(tableId2).get().toString(), ReplicaIdentityInfo.ReplicaIdentity.DEFAULT.toString());
}
@Test
public void shouldSetReplicaAutoSetIndexValue() {
TableId tableId1 = new TableId("", "testSchema_1", "testTable_1");
TableId tableId2 = new TableId("", "testSchema_2", "testTable_2");
String replicaAutosetTypeField = "testSchema_1.testTable_1:FULL,testSchema_2.testTable_2:INDEX idx_pk";
ReplicaIdentityMapper replicaIdentityMapper = new ReplicaIdentityMapper(replicaAutosetTypeField);
assertEquals(replicaIdentityMapper.findReplicaIdentity(tableId1).get().toString(), ReplicaIdentityInfo.ReplicaIdentity.FULL.toString());
ReplicaIdentityInfo replicaIdentityIndex = new ReplicaIdentityInfo(ReplicaIdentityInfo.ReplicaIdentity.INDEX, "idx_pk");
assertEquals(replicaIdentityMapper.findReplicaIdentity(tableId2).get().toString(), replicaIdentityIndex.toString());
}
}

View File

@ -97,7 +97,7 @@ public void shouldPrintReplicateIdentityInfo() throws Exception {
"CREATE TABLE test(pk serial, PRIMARY KEY (pk));"; "CREATE TABLE test(pk serial, PRIMARY KEY (pk));";
TestHelper.execute(statement); TestHelper.execute(statement);
try (PostgresConnection connection = TestHelper.create()) { try (PostgresConnection connection = TestHelper.create()) {
assertEquals(ServerInfo.ReplicaIdentity.DEFAULT, connection.readReplicaIdentityInfo(TableId.parse("public.test"))); assertEquals(ReplicaIdentityInfo.ReplicaIdentity.DEFAULT.toString(), connection.readReplicaIdentityInfo(TableId.parse("public.test")).toString());
} }
} }

View File

@ -0,0 +1,14 @@
DROP SCHEMA IF EXISTS s1 CASCADE;
DROP SCHEMA IF EXISTS s2 CASCADE;
DROP SCHEMA IF EXISTS s3 CASCADE;
CREATE SCHEMA s1;
CREATE SCHEMA s2;
CREATE SCHEMA s3;
CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));
CREATE TABLE s3.a (pk SERIAL, aa integer, PRIMARY KEY(pk));
CREATE TABLE s2.b (pk SERIAL, aa integer, bb varchar(20), PRIMARY KEY(pk));
CREATE TABLE s2.c (pk SERIAL, aa integer, bb varchar(20), PRIMARY KEY(pk));
ALTER TABLE s1.a REPLICA IDENTITY DEFAULT;
ALTER TABLE s3.a REPLICA IDENTITY DEFAULT;
ALTER TABLE s2.b REPLICA IDENTITY DEFAULT;
ALTER TABLE s2.c REPLICA IDENTITY DEFAULT;

View File

@ -0,0 +1,58 @@
-- Reset test environment
DROP PUBLICATION IF EXISTS dbz_publication;
DROP SCHEMA IF EXISTS s1 CASCADE;
DROP SCHEMA IF EXISTS s2 CASCADE;
-- ...drop role_1
DO
$do$
BEGIN
IF EXISTS (
SELECT FROM pg_catalog.pg_roles
WHERE rolname = 'role_1') THEN
REASSIGN OWNED BY role_1 TO postgres;
DROP OWNED BY role_1;
DROP ROLE IF EXISTS role_1;
END IF;
END
$do$;
-- ... drop role_2
DO
$do$
BEGIN
IF EXISTS (
SELECT FROM pg_catalog.pg_roles
WHERE rolname = 'role_2') THEN
REASSIGN OWNED BY role_2 TO postgres;
DROP OWNED BY role_2;
DROP ROLE IF EXISTS role_2;
END IF;
END
$do$;
-- Create schema
CREATE SCHEMA s1;
CREATE SCHEMA s2;
-- Create roles
CREATE ROLE role_1;
GRANT ALL ON SCHEMA s1 TO role_1;
GRANT ALL ON SCHEMA s2 TO role_1;
GRANT CREATE ON DATABASE postgres TO role_1;
CREATE ROLE role_2 WITH REPLICATION LOGIN PASSWORD 'role_2_pass';
GRANT ALL ON SCHEMA s1 TO role_2;
GRANT ALL ON SCHEMA s2 TO role_2;
GRANT CONNECT ON DATABASE postgres TO role_2;
-- Create tables using r1
SET ROLE role_1;
CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));
CREATE TABLE s2.a (pk SERIAL, aa integer, bb varchar(20), PRIMARY KEY(pk));
CREATE PUBLICATION dbz_publication FOR TABLE s1.a, s2.a;
RESET ROLE;

View File

@ -2997,6 +2997,28 @@ For example: `CREATE PUBLICATION <publication_name> FOR TABLE <tbl1, tbl2, tbl3>
If the publication exists, the connector updates the publication for tables that match the current filter configuration. If the publication exists, the connector updates the publication for tables that match the current filter configuration.
For example: `ALTER PUBLICATION <publication_name> SET TABLE <tbl1, tbl2, tbl3>`. For example: `ALTER PUBLICATION <publication_name> SET TABLE <tbl1, tbl2, tbl3>`.
|[[postgresql-replica-autoset-type]]<<postgresql-replica-autoset-type, `+replica.identity.autoset.values+`>>
|_empty string_
|The setting determines the value for link:https://www.postgresql.org/docs/current/sql-altertable.html#SQL-ALTERTABLE-REPLICA-IDENTITY[replica identity] at table level. +
+
This option will overwrite the existing value in database.
A comma-separated list of regular expressions that match fully-qualified tables and replica identity value to be used in the table. +
+
Each expression must match the pattern '<fully-qualified table name>:<replica identity>', where the table name could be defined as (`SCHEMA_NAME.TABLE_NAME`), and the replica identity values are: +
+
`DEFAULT` - Records the old values of the columns of the primary key, if any. This is the default for non-system tables. +
+
`INDEX index_name` - Records the old values of the columns covered by the named index, that must be unique, not partial, not deferrable, and include only columns marked NOT NULL. If this index is dropped, the behavior is the same as NOTHING. +
+
`FULL` - Records the old values of all columns in the row. +
+
`NOTHING` - Records no information about the old row. This is the default for system tables. +
+
For example,
schema1.*:FULL,schema2.table2:NOTHING,schema2.table3:INDEX idx_name
|[[postgresql-property-binary-handling-mode]]<<postgresql-property-binary-handling-mode, `+binary.handling.mode+`>> |[[postgresql-property-binary-handling-mode]]<<postgresql-property-binary-handling-mode, `+binary.handling.mode+`>>
|bytes |bytes
|Specifies how binary (`bytea`) columns should be represented in change events: + |Specifies how binary (`bytea`) columns should be represented in change events: +

View File

@ -203,5 +203,6 @@ hiscat,My Lang Pangzi
bpaquet,Bertrand Paquet bpaquet,Bertrand Paquet
brenoavm,Breno Moreira brenoavm,Breno Moreira
ddsr-ops,ddsr-ops ddsr-ops,ddsr-ops
eizners,Sergey Eizner
subkanthi,Kanthi Subramanian subkanthi,Kanthi Subramanian
benw-at-birdie,Ben White
miguelbirdie, Miguel Angel Sotomayor