From 75882f35ac427beb4560a09a190c627f86d1b66d Mon Sep 17 00:00:00 2001 From: Naveen Kumar KR Date: Wed, 13 Jan 2021 20:33:59 +0530 Subject: [PATCH] DBZ-2382 Support emitting TRUNCATE events in PG11+ PGOUTPUT plugin default - TruncateHandlingMode.SKIP supported configs - TruncateHandlingMode.SKIP / TruncateHandlingMode.INCLUDE --- .../PostgresChangeRecordEmitter.java | 9 ++ .../postgresql/PostgresConnectorConfig.java | 115 ++++++++++++++++++ .../postgresql/PostgresTaskContext.java | 1 + .../connection/MessageDecoderConfig.java | 11 +- .../PostgresReplicationConnection.java | 16 ++- .../connection/ReplicationConnection.java | 9 ++ .../connection/ReplicationMessage.java | 1 + .../pgoutput/PgOutputMessageDecoder.java | 100 ++++++++++++--- .../PgOutputTruncateReplicationMessage.java | 26 ++++ .../postgresql/RecordsStreamProducerIT.java | 65 +++++++++- .../main/java/io/debezium/data/Envelope.java | 21 +++- .../RelationalChangeRecordEmitter.java | 7 ++ .../java/io/debezium/data/VerifyRecord.java | 16 +++ 13 files changed, 369 insertions(+), 28 deletions(-) create mode 100644 debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputTruncateReplicationMessage.java diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeRecordEmitter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeRecordEmitter.java index ef38fecd0..d786c5388 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeRecordEmitter.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeRecordEmitter.java @@ -17,6 +17,7 @@ import java.util.Set; import java.util.stream.Collectors; +import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.ConnectException; import org.postgresql.core.BaseConnection; @@ -77,6 +78,8 @@ protected Operation getOperation() { return Operation.UPDATE; case DELETE: return Operation.DELETE; + case TRUNCATE: + return Operation.TRUNCATE; default: throw new IllegalArgumentException("Received event of unexpected command type: " + message.getOperation()); } @@ -88,6 +91,12 @@ public void emitChangeRecords(DataCollectionSchema schema, Receiver receiver) th super.emitChangeRecords(schema, receiver); } + @Override + protected void emitTruncateRecord(Receiver receiver, TableSchema tableSchema) throws InterruptedException { + Struct envelope = tableSchema.getEnvelopeSchema().truncate(getOffset().getSourceInfo(), getClock().currentTimeAsInstant()); + receiver.changeRecord(tableSchema, Operation.TRUNCATE, null, envelope, getOffset(), null); + } + @Override protected Object[] getOldColumnValues() { try { diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java index f22eaccc2..9c6ff761f 100755 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java @@ -10,6 +10,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; @@ -359,6 +360,11 @@ public MessageDecoder messageDecoder(MessageDecoderConfig config) { public String getPostgresPluginName() { return getValue(); } + + @Override + public boolean supportsTruncate() { + return true; + } }, DECODERBUFS("decoderbufs") { @Override @@ -370,6 +376,11 @@ public MessageDecoder messageDecoder(MessageDecoderConfig config) { public String getPostgresPluginName() { return getValue(); } + + @Override + public boolean supportsTruncate() { + return false; + } }, WAL2JSON_STREAMING("wal2json_streaming") { @Override @@ -382,6 +393,11 @@ public String getPostgresPluginName() { return "wal2json"; } + @Override + public boolean supportsTruncate() { + return false; + } + @Override public boolean hasUnchangedToastColumnMarker() { return false; @@ -408,6 +424,11 @@ public String getPostgresPluginName() { return "wal2json"; } + @Override + public boolean supportsTruncate() { + return false; + } + @Override public boolean hasUnchangedToastColumnMarker() { return false; @@ -429,6 +450,11 @@ public String getPostgresPluginName() { return "wal2json"; } + @Override + public boolean supportsTruncate() { + return false; + } + @Override public boolean hasUnchangedToastColumnMarker() { return false; @@ -455,6 +481,11 @@ public String getPostgresPluginName() { return "wal2json"; } + @Override + public boolean supportsTruncate() { + return false; + } + @Override public boolean hasUnchangedToastColumnMarker() { return false; @@ -496,6 +527,48 @@ public String getValue() { } public abstract String getPostgresPluginName(); + + public abstract boolean supportsTruncate(); + } + + /** + * The set of predefined TruncateHandlingMode options or aliases + */ + public enum TruncateHandlingMode implements EnumeratedValue { + + /** + * Skip TRUNCATE messages + */ + SKIP("skip"), + + /** + * Handle & Include TRUNCATE messages + */ + INCLUDE("include"); + + private final String value; + + TruncateHandlingMode(String value) { + this.value = value; + } + + @Override + public String getValue() { + return value; + } + + public static TruncateHandlingMode parse(String value) { + if (value == null) { + return null; + } + value = value.trim(); + for (TruncateHandlingMode truncateHandlingMode : TruncateHandlingMode.values()) { + if (truncateHandlingMode.getValue().equalsIgnoreCase(value)) { + return truncateHandlingMode; + } + } + return null; + } } /** @@ -792,6 +865,16 @@ public static AutoCreateMode parse(String value, String defaultValue) { "When 'snapshot.mode' is set as custom, this setting must be set to specify a fully qualified class name to load (via the default class loader)." + "This class must implement the 'Snapshotter' interface and is called on each app boot to determine whether to do a snapshot and how to build queries."); + public static final Field TRUNCATE_HANDLING_MODE = Field.create("truncate.handling.mode") + .withDisplayName("Truncate handling mode") + .withEnum(TruncateHandlingMode.class, TruncateHandlingMode.SKIP) + .withWidth(Width.MEDIUM) + .withImportance(Importance.MEDIUM) + .withValidation(PostgresConnectorConfig::validateTruncateHandlingMode) + .withDescription("Specify how TRUNCATE operations are handled for change events (supported only on pg11+ pgoutput plugin), including: " + + "'skip' to skip / ignore TRUNCATE events (default), " + + "'include' to handle and include TRUNCATE events"); + public static final Field HSTORE_HANDLING_MODE = Field.create("hstore.handling.mode") .withDisplayName("HStore Handling") .withEnum(HStoreHandlingMode.class, HStoreHandlingMode.JSON) @@ -875,6 +958,7 @@ public static AutoCreateMode parse(String value, String defaultValue) { "the original value is a toasted value not provided by the database. " + "If starts with 'hex:' prefix it is expected that the rest of the string repesents hexadecimally encoded octets."); + private final TruncateHandlingMode truncateHandlingMode; private final HStoreHandlingMode hStoreHandlingMode; private final IntervalHandlingMode intervalHandlingMode; private final SnapshotMode snapshotMode; @@ -888,6 +972,7 @@ public PostgresConnectorConfig(Configuration config) { x -> x.schema() + "." + x.table(), DEFAULT_SNAPSHOT_FETCH_SIZE); + this.truncateHandlingMode = TruncateHandlingMode.parse(config.getString(PostgresConnectorConfig.TRUNCATE_HANDLING_MODE)); String hstoreHandlingModeStr = config.getString(PostgresConnectorConfig.HSTORE_HANDLING_MODE); this.hStoreHandlingMode = HStoreHandlingMode.parse(hstoreHandlingModeStr); this.intervalHandlingMode = IntervalHandlingMode.parse(config.getString(PostgresConnectorConfig.INTERVAL_HANDLING_MODE)); @@ -947,6 +1032,10 @@ protected Duration statusUpdateInterval() { return Duration.ofMillis(getConfig().getLong(PostgresConnectorConfig.STATUS_UPDATE_INTERVAL_MS)); } + protected TruncateHandlingMode truncateHandlingMode() { + return truncateHandlingMode; + } + protected HStoreHandlingMode hStoreHandlingMode() { return hStoreHandlingMode; } @@ -1059,6 +1148,32 @@ private static int validateReplicationSlotName(Configuration config, Field field return errors; } + private static int validateTruncateHandlingMode(Configuration config, Field field, Field.ValidationOutput problems) { + final String value = config.getString(field); + int errors = 0; + if (value != null) { + TruncateHandlingMode truncateHandlingMode = TruncateHandlingMode.parse(value); + if (truncateHandlingMode == null) { + List validModes = Arrays.stream(TruncateHandlingMode.values()).map(TruncateHandlingMode::getValue).collect(Collectors.toList()); + String message = String.format("Valid values for %s are %s, but got '%s'", field.name(), validModes, value); + problems.accept(field, value, message); + errors++; + return errors; + } + if (truncateHandlingMode == TruncateHandlingMode.INCLUDE) { + LogicalDecoder logicalDecoder = config.getInstance(PLUGIN_NAME, LogicalDecoder.class); + if (!logicalDecoder.supportsTruncate()) { + String message = String.format( + "%s '%s' is not supported with configuration %s '%s'", + field.name(), truncateHandlingMode.getValue(), PLUGIN_NAME.name(), logicalDecoder.getValue()); + problems.accept(field, value, message); + errors++; + } + } + } + return errors; + } + @Override public String getContextName() { return Module.contextName(); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java index 83b5847ae..8277b33ce 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java @@ -110,6 +110,7 @@ protected ReplicationConnection createReplicationConnection(boolean exportSnapsh .withTableFilter(config.getTableFilters()) .withPublicationAutocreateMode(config.publicationAutocreateMode()) .withPlugin(config.plugin()) + .withTruncateHandlingMode(config.truncateHandlingMode()) .dropSlotOnClose(dropSlotOnStop) .streamParams(config.streamParams()) .statusUpdateInterval(config.statusUpdateInterval()) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/MessageDecoderConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/MessageDecoderConfig.java index 1da286cc2..897a710a1 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/MessageDecoderConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/MessageDecoderConfig.java @@ -6,6 +6,7 @@ package io.debezium.connector.postgresql.connection; import io.debezium.config.Configuration; +import io.debezium.connector.postgresql.PostgresConnectorConfig; import io.debezium.connector.postgresql.PostgresSchema; /** @@ -20,13 +21,16 @@ public class MessageDecoderConfig { private final String publicationName; private final boolean exportedSnapshot; private final boolean doSnapshot; + private final PostgresConnectorConfig.TruncateHandlingMode truncateHandlingMode; - public MessageDecoderConfig(Configuration configuration, PostgresSchema schema, String publicationName, boolean exportedSnapshot, boolean doSnapshot) { + public MessageDecoderConfig(Configuration configuration, PostgresSchema schema, String publicationName, boolean exportedSnapshot, boolean doSnapshot, + PostgresConnectorConfig.TruncateHandlingMode truncateHandlingMode) { this.configuration = configuration; this.schema = schema; this.publicationName = publicationName; this.exportedSnapshot = exportedSnapshot; this.doSnapshot = doSnapshot; + this.truncateHandlingMode = truncateHandlingMode; } public Configuration getConfiguration() { @@ -48,4 +52,9 @@ public boolean exportedSnapshot() { public boolean doSnapshot() { return doSnapshot; } + + public PostgresConnectorConfig.TruncateHandlingMode getTruncateHandlingMode() { + return truncateHandlingMode; + } + } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java index 1d331be28..20bfb0d37 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java @@ -83,6 +83,7 @@ public class PostgresReplicationConnection extends JdbcConnection implements Rep * @param tableFilter the tables to watch of the DB publication for logical replication; may not be null * @param publicationAutocreateMode the mode for publication autocreation; may not be null * @param plugin decoder matching the server side plug-in used for streaming changes; may not be null + * @param truncateHandlingMode the mode for truncate handling; may not be null * @param dropSlotOnClose whether the replication slot should be dropped once the connection is closed * @param statusUpdateInterval the interval at which the replication connection should periodically send status * @param exportSnapshot whether the replication should export a snapshot when created @@ -99,6 +100,7 @@ private PostgresReplicationConnection(Configuration config, RelationalTableFilters tableFilter, PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode, PostgresConnectorConfig.LogicalDecoder plugin, + PostgresConnectorConfig.TruncateHandlingMode truncateHandlingMode, boolean dropSlotOnClose, boolean exportSnapshot, boolean doSnapshot, @@ -117,7 +119,7 @@ private PostgresReplicationConnection(Configuration config, this.dropSlotOnClose = dropSlotOnClose; this.statusUpdateInterval = statusUpdateInterval; this.exportSnapshot = exportSnapshot; - this.messageDecoder = plugin.messageDecoder(new MessageDecoderConfig(config, schema, publicationName, exportSnapshot, doSnapshot)); + this.messageDecoder = plugin.messageDecoder(new MessageDecoderConfig(config, schema, publicationName, exportSnapshot, doSnapshot, truncateHandlingMode)); this.typeRegistry = typeRegistry; this.streamParams = streamParams; this.slotCreationInfo = null; @@ -626,6 +628,7 @@ protected static class ReplicationConnectionBuilder implements Builder { private RelationalTableFilters tableFilter; private PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode = PostgresConnectorConfig.AutoCreateMode.ALL_TABLES; private PostgresConnectorConfig.LogicalDecoder plugin = PostgresConnectorConfig.LogicalDecoder.DECODERBUFS; + private PostgresConnectorConfig.TruncateHandlingMode truncateHandlingMode; private boolean dropSlotOnClose = DEFAULT_DROP_SLOT_ON_CLOSE; private Duration statusUpdateIntervalVal; private boolean exportSnapshot = DEFAULT_EXPORT_SNAPSHOT; @@ -674,6 +677,13 @@ public ReplicationConnectionBuilder withPlugin(final PostgresConnectorConfig.Log return this; } + @Override + public Builder withTruncateHandlingMode(PostgresConnectorConfig.TruncateHandlingMode truncateHandlingMode) { + assert truncateHandlingMode != null; + this.truncateHandlingMode = truncateHandlingMode; + return this; + } + @Override public ReplicationConnectionBuilder dropSlotOnClose(final boolean dropSlotOnClose) { this.dropSlotOnClose = dropSlotOnClose; @@ -719,8 +729,8 @@ public Builder doSnapshot(boolean doSnapshot) { @Override public ReplicationConnection build() { assert plugin != null : "Decoding plugin name is not set"; - return new PostgresReplicationConnection(config, slotName, publicationName, tableFilter, publicationAutocreateMode, plugin, dropSlotOnClose, exportSnapshot, - doSnapshot, statusUpdateIntervalVal, typeRegistry, slotStreamParams, schema); + return new PostgresReplicationConnection(config, slotName, publicationName, tableFilter, publicationAutocreateMode, plugin, truncateHandlingMode, + dropSlotOnClose, exportSnapshot, doSnapshot, statusUpdateIntervalVal, typeRegistry, slotStreamParams, schema); } @Override diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationConnection.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationConnection.java index 58d87fd7c..122968699 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationConnection.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationConnection.java @@ -152,6 +152,15 @@ interface Builder { */ Builder withPlugin(final PostgresConnectorConfig.LogicalDecoder plugin); + /** + * Sets the instance for the Truncate handling mode + * + * @param truncateHandlingMode Truncate handling mode, may not be null. + * @return this instance + * @see io.debezium.connector.postgresql.PostgresConnectorConfig.TruncateHandlingMode + */ + Builder withTruncateHandlingMode(final PostgresConnectorConfig.TruncateHandlingMode truncateHandlingMode); + /** * Whether or not to drop the replication slot once the replication connection closes * diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationMessage.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationMessage.java index 85eabd305..fc6cb6a47 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationMessage.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationMessage.java @@ -43,6 +43,7 @@ public enum Operation { INSERT, UPDATE, DELETE, + TRUNCATE, BEGIN, COMMIT, NOOP diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java index 7d019bfba..2563aa694 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java @@ -16,6 +16,7 @@ import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -27,6 +28,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.debezium.connector.postgresql.PostgresConnectorConfig; import io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.PgConnectionSupplier; import io.debezium.connector.postgresql.PostgresType; import io.debezium.connector.postgresql.TypeRegistry; @@ -118,23 +120,6 @@ public boolean shouldMessageBeSkipped(ByteBuffer buffer, Lsn lastReceivedLsn, Ls LOGGER.trace("Message Type: {}", type); final boolean candidateForSkipping = super.shouldMessageBeSkipped(buffer, lastReceivedLsn, startLsn, walPosition); switch (type) { - case TRUNCATE: - // @formatter:off - // For now we plan to gracefully skip TRUNCATE messages. - // We may decide in the future that these may be emitted differently, see DBZ-1052. - // - // As of PG11, the Truncate message format is as described: - // Byte Message Type (Always 'T') - // Int32 number of relations described by the truncate message - // Int8 flags for truncate; 1=CASCADE, 2=RESTART IDENTITY - // Int32[] Array of number of relation ids - // - // In short this message tells us how many relations are impacted by the truncate - // call, whether its cascaded or not and then all table relation ids involved. - // It seems the protocol guarantees to send the most up-to-date `R` relation - // messages for the tables prior to the `T` truncation message, even if in the - // same session a `R` message was followed by an insert/update/delete message. - // @formatter:on case COMMIT: case BEGIN: case RELATION: @@ -150,7 +135,7 @@ public boolean shouldMessageBeSkipped(ByteBuffer buffer, Lsn lastReceivedLsn, Ls LOGGER.trace("{} messages are always reprocessed", type); return false; default: - // INSERT/UPDATE/DELETE/TYPE/ORIGIN + // INSERT/UPDATE/DELETE/TRUNCATE/TYPE/ORIGIN // These should be excluded based on the normal behavior, delegating to default method return candidateForSkipping; } @@ -196,6 +181,14 @@ public void processNotEmptyMessage(ByteBuffer buffer, ReplicationMessageProcesso case DELETE: decodeDelete(buffer, typeRegistry, processor); break; + case TRUNCATE: + if (config.getTruncateHandlingMode() == PostgresConnectorConfig.TruncateHandlingMode.INCLUDE) { + decodeTruncate(buffer, typeRegistry, processor); + } + else { + LOGGER.trace("Message Type {} skipped, not processed.", messageType); + } + break; default: LOGGER.trace("Message Type {} skipped, not processed.", messageType); break; @@ -467,6 +460,77 @@ private void decodeDelete(ByteBuffer buffer, TypeRegistry typeRegistry, Replicat } } + /** + * Callback handler for the 'T' truncate replication stream message. + * + * @param buffer The replication stream buffer + * @param typeRegistry The postgres type registry + * @param processor The replication message processor + */ + private void decodeTruncate(ByteBuffer buffer, TypeRegistry typeRegistry, ReplicationMessageProcessor processor) throws SQLException, InterruptedException { + // As of PG11, the Truncate message format is as described: + // Byte Message Type (Always 'T') + // Int32 number of relations described by the truncate message + // Int8 flags for truncate; 1=CASCADE, 2=RESTART IDENTITY + // Int32[] Array of number of relation ids + // + // In short this message tells us how many relations are impacted by the truncate + // call, whether its cascaded or not and then all table relation ids involved. + // It seems the protocol guarantees to send the most up-to-date `R` relation + // messages for the tables prior to the `T` truncation message, even if in the + // same session a `R` message was followed by an insert/update/delete message. + + int numberOfRelations = buffer.getInt(); + int optionBits = buffer.get(); + // ignored / unused + List truncateOptions = getTruncateOptions(optionBits); + int[] relationIds = new int[numberOfRelations]; + for (int i = 0; i < numberOfRelations; i++) { + relationIds[i] = buffer.getInt(); + } + + List tables = new ArrayList<>(); + for (int relationId : relationIds) { + Optional
resolvedTable = resolveRelation(relationId); + resolvedTable.ifPresent(tables::add); + } + + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Event: {}, RelationIds: {}, OptionBits: {}", MessageType.TRUNCATE, Arrays.toString(relationIds), optionBits); + } + + int noOfResolvedTables = tables.size(); + for (int i = 0; i < noOfResolvedTables; i++) { + Table table = tables.get(i); + boolean lastTableInTruncate = (i + 1) == noOfResolvedTables; + processor.process(new PgOutputTruncateReplicationMessage( + Operation.TRUNCATE, + table.id().toDoubleQuotedString(), + commitTimestamp, + transactionId, + lastTableInTruncate)); + } + } + + /** + * Convert truncate option bits to postgres syntax truncate options + * + * @param flag truncate option bits + * @return truncate flags + */ + private List getTruncateOptions(int flag) { + switch (flag) { + case 1: + return Collections.singletonList("CASCADE"); + case 2: + return Collections.singletonList("RESTART IDENTITY"); + case 3: + return Arrays.asList("RESTART IDENTITY", "CASCADE"); + default: + return null; + } + } + /** * Resolves a given replication message relation identifier to a {@link Table}. * diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputTruncateReplicationMessage.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputTruncateReplicationMessage.java new file mode 100644 index 000000000..25980afe4 --- /dev/null +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputTruncateReplicationMessage.java @@ -0,0 +1,26 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.debezium.connector.postgresql.connection.pgoutput; + +import java.time.Instant; + +public class PgOutputTruncateReplicationMessage extends PgOutputReplicationMessage { + + private final boolean lastTableInTruncate; + + public PgOutputTruncateReplicationMessage(Operation op, String table, Instant commitTimestamp, long transactionId, + boolean lastTableInTruncate) { + super(op, table, commitTimestamp, transactionId, null, null); + this.lastTableInTruncate = lastTableInTruncate; + } + + @Override + public boolean isLastEventForLsn() { + return lastTableInTruncate; + } + +} diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java index f1e0fce87..50a31a2b9 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java @@ -19,6 +19,7 @@ import static org.fest.assertions.Fail.fail; import static org.fest.assertions.MapAssert.entry; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import java.math.BigDecimal; import java.time.Instant; @@ -83,7 +84,9 @@ import io.debezium.jdbc.JdbcValueConverters.DecimalMode; import io.debezium.jdbc.TemporalPrecisionMode; import io.debezium.junit.ConditionalFail; +import io.debezium.junit.EqualityCheck; import io.debezium.junit.ShouldFailWhen; +import io.debezium.junit.SkipWhenDatabaseVersion; import io.debezium.junit.logging.LogInterceptor; import io.debezium.relational.RelationalChangeRecordEmitter; import io.debezium.relational.RelationalDatabaseConnectorConfig.DecimalHandlingMode; @@ -1881,9 +1884,11 @@ public void stopInTheMiddleOfTxAndRestart() throws Exception { } @Test - @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Tests specifically that pgoutput gracefully skips these messages") - public void shouldGracefullySkipTruncateMessages() throws Exception { - startConnector(); + @SkipWhenDatabaseVersion(check = EqualityCheck.LESS_THAN, major = 11, reason = "TRUNCATE events only supported in PG11+ PGOUTPUT Plugin") + @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Tests specifically that pgoutput handles TRUNCATE messages") + public void shouldProcessTruncateMessages() throws Exception { + startConnector(builder -> builder + .with(PostgresConnectorConfig.TRUNCATE_HANDLING_MODE, PostgresConnectorConfig.TruncateHandlingMode.INCLUDE)); waitForStreamingToStart(); consumer = testConsumer(1); @@ -1893,12 +1898,62 @@ public void shouldGracefullySkipTruncateMessages() throws Exception { assertEquals(TestHelper.topicName("public.test_table"), record.topic()); VerifyRecord.isValidInsert(record, PK_FIELD, 2); - consumer.expects(0); - TestHelper.execute("TRUNCATE TABLE public.test_table;"); + consumer.expects(1); + TestHelper.execute("TRUNCATE TABLE public.test_table RESTART IDENTITY CASCADE;"); consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS); + + assertFalse(consumer.isEmpty()); + SourceRecord truncateRecord = consumer.remove(); + assertNotNull(truncateRecord); + VerifyRecord.isValidTruncate(truncateRecord); assertTrue(consumer.isEmpty()); } + @Test + @SkipWhenDatabaseVersion(check = EqualityCheck.LESS_THAN, major = 11, reason = "TRUNCATE events only supported in PG11+ PGOUTPUT Plugin") + @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Tests specifically that pgoutput handled TRUNCATE these messages") + public void shouldProcessTruncateMessagesForMultipleTableTruncateStatement() throws Exception { + TestHelper.execute("CREATE TABLE test_table_2 (pk SERIAL, text TEXT, PRIMARY KEY(pk));"); + + startConnector(builder -> builder.with(PostgresConnectorConfig.TRUNCATE_HANDLING_MODE, PostgresConnectorConfig.TruncateHandlingMode.INCLUDE)); + waitForStreamingToStart(); + + consumer = testConsumer(1); + executeAndWait("INSERT INTO test_table (text) values ('TRUNCATE TEST');"); + + SourceRecord record = consumer.remove(); + assertEquals(TestHelper.topicName("public.test_table"), record.topic()); + VerifyRecord.isValidInsert(record, PK_FIELD, 2); + + executeAndWait("INSERT INTO test_table_2 (text) values ('TRUNCATE TEST 2');"); + + SourceRecord record_2 = consumer.remove(); + assertEquals(TestHelper.topicName("public.test_table_2"), record_2.topic()); + VerifyRecord.isValidInsert(record_2, PK_FIELD, 1); + + consumer.expects(2); + TestHelper.execute("TRUNCATE TABLE public.test_table, public.test_table_2;"); + consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS); + + assertFalse(consumer.isEmpty()); + + SourceRecord truncateRecord = consumer.remove(); + assertNotNull(truncateRecord); + VerifyRecord.isValidTruncate(truncateRecord); + + SourceRecord truncateRecord_2 = consumer.remove(); + assertNotNull(truncateRecord_2); + VerifyRecord.isValidTruncate(truncateRecord_2); + assertTrue(consumer.isEmpty()); + + assertEquals(truncateRecord.sourceOffset().get("lsn_commit"), truncateRecord_2.sourceOffset().get("lsn_commit")); + assertEquals(truncateRecord.sourceOffset().get("lsn"), truncateRecord_2.sourceOffset().get("lsn")); + assertEquals(truncateRecord.sourceOffset().get("txId"), truncateRecord_2.sourceOffset().get("txId")); + + consumer = testConsumer(1); + executeAndWait("INSERT INTO test_table (text) values ('TRUNCATE TEST');"); + } + @Test @FixFor("DBZ-1413") public void shouldStreamChangesForDataTypeAlias() throws Exception { diff --git a/debezium-core/src/main/java/io/debezium/data/Envelope.java b/debezium-core/src/main/java/io/debezium/data/Envelope.java index 97a334fba..d940c8db2 100644 --- a/debezium-core/src/main/java/io/debezium/data/Envelope.java +++ b/debezium-core/src/main/java/io/debezium/data/Envelope.java @@ -46,7 +46,11 @@ public static enum Operation { /** * An operation that resulted in an existing record being removed from or deleted in the source. */ - DELETE("d"); + DELETE("d"), + /** + * An operation that resulted in an existing table being truncated in the source. + */ + TRUNCATE("t"); private final String code; @@ -345,6 +349,21 @@ public Struct delete(Object before, Struct source, Instant timestamp) { return struct; } + /** + * Generate an {@link Operation#TRUNCATE truncate} message with the given information. + * + * @param source the information about the source where the truncate occurred; never null + * @param timestamp the timestamp for this message; never null + * @return the truncate message; never null + */ + public Struct truncate(Struct source, Instant timestamp) { + Struct struct = new Struct(schema); + struct.put(FieldName.OPERATION, Operation.TRUNCATE.code()); + struct.put(FieldName.SOURCE, source); + struct.put(FieldName.TIMESTAMP, timestamp.toEpochMilli()); + return struct; + } + /** * Obtain the operation for the given source record. * diff --git a/debezium-core/src/main/java/io/debezium/relational/RelationalChangeRecordEmitter.java b/debezium-core/src/main/java/io/debezium/relational/RelationalChangeRecordEmitter.java index 11eb1b790..4ad92a795 100644 --- a/debezium-core/src/main/java/io/debezium/relational/RelationalChangeRecordEmitter.java +++ b/debezium-core/src/main/java/io/debezium/relational/RelationalChangeRecordEmitter.java @@ -53,6 +53,9 @@ public void emitChangeRecords(DataCollectionSchema schema, Receiver receiver) th case DELETE: emitDeleteRecord(receiver, tableSchema); break; + case TRUNCATE: + emitTruncateRecord(receiver, tableSchema); + break; default: throw new IllegalArgumentException("Unsupported operation: " + operation); } @@ -138,6 +141,10 @@ protected void emitDeleteRecord(Receiver receiver, TableSchema tableSchema) thro receiver.changeRecord(tableSchema, Operation.DELETE, oldKey, envelope, getOffset(), null); } + protected void emitTruncateRecord(Receiver receiver, TableSchema schema) throws InterruptedException { + throw new UnsupportedOperationException("TRUNCATE not supported"); + } + /** * Returns the operation done by the represented change. */ diff --git a/debezium-core/src/test/java/io/debezium/data/VerifyRecord.java b/debezium-core/src/test/java/io/debezium/data/VerifyRecord.java index 7797f27b7..3a279a3e9 100644 --- a/debezium-core/src/test/java/io/debezium/data/VerifyRecord.java +++ b/debezium-core/src/test/java/io/debezium/data/VerifyRecord.java @@ -291,6 +291,22 @@ public static void isValidDelete(SourceRecord record, String pkField, int pk) { isValidDelete(record, true); } + /** + * Verify that the given {@link SourceRecord} is a {@link Operation#TRUNCATE TRUNCATE} record. + * + * @param record the source record; may not be null + */ + public static void isValidTruncate(SourceRecord record) { + assertThat(record.key()).isNull(); + + assertThat(record.valueSchema()).isNotNull(); + Struct value = (Struct) record.value(); + assertThat(value).isNotNull(); + assertThat(value.getString(FieldName.OPERATION)).isEqualTo(Operation.TRUNCATE.code()); + assertThat(value.get(FieldName.BEFORE)).isNull(); + assertThat(value.get(FieldName.AFTER)).isNull(); + } + /** * Verify that the given {@link SourceRecord} is a valid tombstone, meaning it has a valid non-null key with key schema * but null value and value schema.