DBZ-2382 Support emitting TRUNCATE events in PG11+ PGOUTPUT plugin

default - TruncateHandlingMode.SKIP
supported configs - TruncateHandlingMode.SKIP / TruncateHandlingMode.INCLUDE
This commit is contained in:
Naveen Kumar KR 2021-01-13 20:33:59 +05:30 committed by Gunnar Morling
parent ffd5e1ee2b
commit 75882f35ac
13 changed files with 369 additions and 28 deletions

View File

@ -17,6 +17,7 @@
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.postgresql.core.BaseConnection;
@ -77,6 +78,8 @@ protected Operation getOperation() {
return Operation.UPDATE;
case DELETE:
return Operation.DELETE;
case TRUNCATE:
return Operation.TRUNCATE;
default:
throw new IllegalArgumentException("Received event of unexpected command type: " + message.getOperation());
}
@ -88,6 +91,12 @@ public void emitChangeRecords(DataCollectionSchema schema, Receiver receiver) th
super.emitChangeRecords(schema, receiver);
}
@Override
protected void emitTruncateRecord(Receiver receiver, TableSchema tableSchema) throws InterruptedException {
Struct envelope = tableSchema.getEnvelopeSchema().truncate(getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
receiver.changeRecord(tableSchema, Operation.TRUNCATE, null, envelope, getOffset(), null);
}
@Override
protected Object[] getOldColumnValues() {
try {

View File

@ -10,6 +10,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
@ -359,6 +360,11 @@ public MessageDecoder messageDecoder(MessageDecoderConfig config) {
public String getPostgresPluginName() {
return getValue();
}
@Override
public boolean supportsTruncate() {
return true;
}
},
DECODERBUFS("decoderbufs") {
@Override
@ -370,6 +376,11 @@ public MessageDecoder messageDecoder(MessageDecoderConfig config) {
public String getPostgresPluginName() {
return getValue();
}
@Override
public boolean supportsTruncate() {
return false;
}
},
WAL2JSON_STREAMING("wal2json_streaming") {
@Override
@ -382,6 +393,11 @@ public String getPostgresPluginName() {
return "wal2json";
}
@Override
public boolean supportsTruncate() {
return false;
}
@Override
public boolean hasUnchangedToastColumnMarker() {
return false;
@ -408,6 +424,11 @@ public String getPostgresPluginName() {
return "wal2json";
}
@Override
public boolean supportsTruncate() {
return false;
}
@Override
public boolean hasUnchangedToastColumnMarker() {
return false;
@ -429,6 +450,11 @@ public String getPostgresPluginName() {
return "wal2json";
}
@Override
public boolean supportsTruncate() {
return false;
}
@Override
public boolean hasUnchangedToastColumnMarker() {
return false;
@ -455,6 +481,11 @@ public String getPostgresPluginName() {
return "wal2json";
}
@Override
public boolean supportsTruncate() {
return false;
}
@Override
public boolean hasUnchangedToastColumnMarker() {
return false;
@ -496,6 +527,48 @@ public String getValue() {
}
public abstract String getPostgresPluginName();
public abstract boolean supportsTruncate();
}
/**
* The set of predefined TruncateHandlingMode options or aliases
*/
public enum TruncateHandlingMode implements EnumeratedValue {
/**
* Skip TRUNCATE messages
*/
SKIP("skip"),
/**
* Handle & Include TRUNCATE messages
*/
INCLUDE("include");
private final String value;
TruncateHandlingMode(String value) {
this.value = value;
}
@Override
public String getValue() {
return value;
}
public static TruncateHandlingMode parse(String value) {
if (value == null) {
return null;
}
value = value.trim();
for (TruncateHandlingMode truncateHandlingMode : TruncateHandlingMode.values()) {
if (truncateHandlingMode.getValue().equalsIgnoreCase(value)) {
return truncateHandlingMode;
}
}
return null;
}
}
/**
@ -792,6 +865,16 @@ public static AutoCreateMode parse(String value, String defaultValue) {
"When 'snapshot.mode' is set as custom, this setting must be set to specify a fully qualified class name to load (via the default class loader)."
+ "This class must implement the 'Snapshotter' interface and is called on each app boot to determine whether to do a snapshot and how to build queries.");
public static final Field TRUNCATE_HANDLING_MODE = Field.create("truncate.handling.mode")
.withDisplayName("Truncate handling mode")
.withEnum(TruncateHandlingMode.class, TruncateHandlingMode.SKIP)
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withValidation(PostgresConnectorConfig::validateTruncateHandlingMode)
.withDescription("Specify how TRUNCATE operations are handled for change events (supported only on pg11+ pgoutput plugin), including: " +
"'skip' to skip / ignore TRUNCATE events (default), " +
"'include' to handle and include TRUNCATE events");
public static final Field HSTORE_HANDLING_MODE = Field.create("hstore.handling.mode")
.withDisplayName("HStore Handling")
.withEnum(HStoreHandlingMode.class, HStoreHandlingMode.JSON)
@ -875,6 +958,7 @@ public static AutoCreateMode parse(String value, String defaultValue) {
"the original value is a toasted value not provided by the database. " +
"If starts with 'hex:' prefix it is expected that the rest of the string repesents hexadecimally encoded octets.");
private final TruncateHandlingMode truncateHandlingMode;
private final HStoreHandlingMode hStoreHandlingMode;
private final IntervalHandlingMode intervalHandlingMode;
private final SnapshotMode snapshotMode;
@ -888,6 +972,7 @@ public PostgresConnectorConfig(Configuration config) {
x -> x.schema() + "." + x.table(),
DEFAULT_SNAPSHOT_FETCH_SIZE);
this.truncateHandlingMode = TruncateHandlingMode.parse(config.getString(PostgresConnectorConfig.TRUNCATE_HANDLING_MODE));
String hstoreHandlingModeStr = config.getString(PostgresConnectorConfig.HSTORE_HANDLING_MODE);
this.hStoreHandlingMode = HStoreHandlingMode.parse(hstoreHandlingModeStr);
this.intervalHandlingMode = IntervalHandlingMode.parse(config.getString(PostgresConnectorConfig.INTERVAL_HANDLING_MODE));
@ -947,6 +1032,10 @@ protected Duration statusUpdateInterval() {
return Duration.ofMillis(getConfig().getLong(PostgresConnectorConfig.STATUS_UPDATE_INTERVAL_MS));
}
protected TruncateHandlingMode truncateHandlingMode() {
return truncateHandlingMode;
}
protected HStoreHandlingMode hStoreHandlingMode() {
return hStoreHandlingMode;
}
@ -1059,6 +1148,32 @@ private static int validateReplicationSlotName(Configuration config, Field field
return errors;
}
private static int validateTruncateHandlingMode(Configuration config, Field field, Field.ValidationOutput problems) {
final String value = config.getString(field);
int errors = 0;
if (value != null) {
TruncateHandlingMode truncateHandlingMode = TruncateHandlingMode.parse(value);
if (truncateHandlingMode == null) {
List<String> validModes = Arrays.stream(TruncateHandlingMode.values()).map(TruncateHandlingMode::getValue).collect(Collectors.toList());
String message = String.format("Valid values for %s are %s, but got '%s'", field.name(), validModes, value);
problems.accept(field, value, message);
errors++;
return errors;
}
if (truncateHandlingMode == TruncateHandlingMode.INCLUDE) {
LogicalDecoder logicalDecoder = config.getInstance(PLUGIN_NAME, LogicalDecoder.class);
if (!logicalDecoder.supportsTruncate()) {
String message = String.format(
"%s '%s' is not supported with configuration %s '%s'",
field.name(), truncateHandlingMode.getValue(), PLUGIN_NAME.name(), logicalDecoder.getValue());
problems.accept(field, value, message);
errors++;
}
}
}
return errors;
}
@Override
public String getContextName() {
return Module.contextName();

View File

@ -110,6 +110,7 @@ protected ReplicationConnection createReplicationConnection(boolean exportSnapsh
.withTableFilter(config.getTableFilters())
.withPublicationAutocreateMode(config.publicationAutocreateMode())
.withPlugin(config.plugin())
.withTruncateHandlingMode(config.truncateHandlingMode())
.dropSlotOnClose(dropSlotOnStop)
.streamParams(config.streamParams())
.statusUpdateInterval(config.statusUpdateInterval())

View File

@ -6,6 +6,7 @@
package io.debezium.connector.postgresql.connection;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresSchema;
/**
@ -20,13 +21,16 @@ public class MessageDecoderConfig {
private final String publicationName;
private final boolean exportedSnapshot;
private final boolean doSnapshot;
private final PostgresConnectorConfig.TruncateHandlingMode truncateHandlingMode;
public MessageDecoderConfig(Configuration configuration, PostgresSchema schema, String publicationName, boolean exportedSnapshot, boolean doSnapshot) {
public MessageDecoderConfig(Configuration configuration, PostgresSchema schema, String publicationName, boolean exportedSnapshot, boolean doSnapshot,
PostgresConnectorConfig.TruncateHandlingMode truncateHandlingMode) {
this.configuration = configuration;
this.schema = schema;
this.publicationName = publicationName;
this.exportedSnapshot = exportedSnapshot;
this.doSnapshot = doSnapshot;
this.truncateHandlingMode = truncateHandlingMode;
}
public Configuration getConfiguration() {
@ -48,4 +52,9 @@ public boolean exportedSnapshot() {
public boolean doSnapshot() {
return doSnapshot;
}
public PostgresConnectorConfig.TruncateHandlingMode getTruncateHandlingMode() {
return truncateHandlingMode;
}
}

View File

@ -83,6 +83,7 @@ public class PostgresReplicationConnection extends JdbcConnection implements Rep
* @param tableFilter the tables to watch of the DB publication for logical replication; may not be null
* @param publicationAutocreateMode the mode for publication autocreation; may not be null
* @param plugin decoder matching the server side plug-in used for streaming changes; may not be null
* @param truncateHandlingMode the mode for truncate handling; may not be null
* @param dropSlotOnClose whether the replication slot should be dropped once the connection is closed
* @param statusUpdateInterval the interval at which the replication connection should periodically send status
* @param exportSnapshot whether the replication should export a snapshot when created
@ -99,6 +100,7 @@ private PostgresReplicationConnection(Configuration config,
RelationalTableFilters tableFilter,
PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode,
PostgresConnectorConfig.LogicalDecoder plugin,
PostgresConnectorConfig.TruncateHandlingMode truncateHandlingMode,
boolean dropSlotOnClose,
boolean exportSnapshot,
boolean doSnapshot,
@ -117,7 +119,7 @@ private PostgresReplicationConnection(Configuration config,
this.dropSlotOnClose = dropSlotOnClose;
this.statusUpdateInterval = statusUpdateInterval;
this.exportSnapshot = exportSnapshot;
this.messageDecoder = plugin.messageDecoder(new MessageDecoderConfig(config, schema, publicationName, exportSnapshot, doSnapshot));
this.messageDecoder = plugin.messageDecoder(new MessageDecoderConfig(config, schema, publicationName, exportSnapshot, doSnapshot, truncateHandlingMode));
this.typeRegistry = typeRegistry;
this.streamParams = streamParams;
this.slotCreationInfo = null;
@ -626,6 +628,7 @@ protected static class ReplicationConnectionBuilder implements Builder {
private RelationalTableFilters tableFilter;
private PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode = PostgresConnectorConfig.AutoCreateMode.ALL_TABLES;
private PostgresConnectorConfig.LogicalDecoder plugin = PostgresConnectorConfig.LogicalDecoder.DECODERBUFS;
private PostgresConnectorConfig.TruncateHandlingMode truncateHandlingMode;
private boolean dropSlotOnClose = DEFAULT_DROP_SLOT_ON_CLOSE;
private Duration statusUpdateIntervalVal;
private boolean exportSnapshot = DEFAULT_EXPORT_SNAPSHOT;
@ -674,6 +677,13 @@ public ReplicationConnectionBuilder withPlugin(final PostgresConnectorConfig.Log
return this;
}
@Override
public Builder withTruncateHandlingMode(PostgresConnectorConfig.TruncateHandlingMode truncateHandlingMode) {
assert truncateHandlingMode != null;
this.truncateHandlingMode = truncateHandlingMode;
return this;
}
@Override
public ReplicationConnectionBuilder dropSlotOnClose(final boolean dropSlotOnClose) {
this.dropSlotOnClose = dropSlotOnClose;
@ -719,8 +729,8 @@ public Builder doSnapshot(boolean doSnapshot) {
@Override
public ReplicationConnection build() {
assert plugin != null : "Decoding plugin name is not set";
return new PostgresReplicationConnection(config, slotName, publicationName, tableFilter, publicationAutocreateMode, plugin, dropSlotOnClose, exportSnapshot,
doSnapshot, statusUpdateIntervalVal, typeRegistry, slotStreamParams, schema);
return new PostgresReplicationConnection(config, slotName, publicationName, tableFilter, publicationAutocreateMode, plugin, truncateHandlingMode,
dropSlotOnClose, exportSnapshot, doSnapshot, statusUpdateIntervalVal, typeRegistry, slotStreamParams, schema);
}
@Override

View File

@ -152,6 +152,15 @@ interface Builder {
*/
Builder withPlugin(final PostgresConnectorConfig.LogicalDecoder plugin);
/**
* Sets the instance for the Truncate handling mode
*
* @param truncateHandlingMode Truncate handling mode, may not be null.
* @return this instance
* @see io.debezium.connector.postgresql.PostgresConnectorConfig.TruncateHandlingMode
*/
Builder withTruncateHandlingMode(final PostgresConnectorConfig.TruncateHandlingMode truncateHandlingMode);
/**
* Whether or not to drop the replication slot once the replication connection closes
*

View File

@ -43,6 +43,7 @@ public enum Operation {
INSERT,
UPDATE,
DELETE,
TRUNCATE,
BEGIN,
COMMIT,
NOOP

View File

@ -16,6 +16,7 @@
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -27,6 +28,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.PgConnectionSupplier;
import io.debezium.connector.postgresql.PostgresType;
import io.debezium.connector.postgresql.TypeRegistry;
@ -118,23 +120,6 @@ public boolean shouldMessageBeSkipped(ByteBuffer buffer, Lsn lastReceivedLsn, Ls
LOGGER.trace("Message Type: {}", type);
final boolean candidateForSkipping = super.shouldMessageBeSkipped(buffer, lastReceivedLsn, startLsn, walPosition);
switch (type) {
case TRUNCATE:
// @formatter:off
// For now we plan to gracefully skip TRUNCATE messages.
// We may decide in the future that these may be emitted differently, see DBZ-1052.
//
// As of PG11, the Truncate message format is as described:
// Byte Message Type (Always 'T')
// Int32 number of relations described by the truncate message
// Int8 flags for truncate; 1=CASCADE, 2=RESTART IDENTITY
// Int32[] Array of number of relation ids
//
// In short this message tells us how many relations are impacted by the truncate
// call, whether its cascaded or not and then all table relation ids involved.
// It seems the protocol guarantees to send the most up-to-date `R` relation
// messages for the tables prior to the `T` truncation message, even if in the
// same session a `R` message was followed by an insert/update/delete message.
// @formatter:on
case COMMIT:
case BEGIN:
case RELATION:
@ -150,7 +135,7 @@ public boolean shouldMessageBeSkipped(ByteBuffer buffer, Lsn lastReceivedLsn, Ls
LOGGER.trace("{} messages are always reprocessed", type);
return false;
default:
// INSERT/UPDATE/DELETE/TYPE/ORIGIN
// INSERT/UPDATE/DELETE/TRUNCATE/TYPE/ORIGIN
// These should be excluded based on the normal behavior, delegating to default method
return candidateForSkipping;
}
@ -196,6 +181,14 @@ public void processNotEmptyMessage(ByteBuffer buffer, ReplicationMessageProcesso
case DELETE:
decodeDelete(buffer, typeRegistry, processor);
break;
case TRUNCATE:
if (config.getTruncateHandlingMode() == PostgresConnectorConfig.TruncateHandlingMode.INCLUDE) {
decodeTruncate(buffer, typeRegistry, processor);
}
else {
LOGGER.trace("Message Type {} skipped, not processed.", messageType);
}
break;
default:
LOGGER.trace("Message Type {} skipped, not processed.", messageType);
break;
@ -467,6 +460,77 @@ private void decodeDelete(ByteBuffer buffer, TypeRegistry typeRegistry, Replicat
}
}
/**
* Callback handler for the 'T' truncate replication stream message.
*
* @param buffer The replication stream buffer
* @param typeRegistry The postgres type registry
* @param processor The replication message processor
*/
private void decodeTruncate(ByteBuffer buffer, TypeRegistry typeRegistry, ReplicationMessageProcessor processor) throws SQLException, InterruptedException {
// As of PG11, the Truncate message format is as described:
// Byte Message Type (Always 'T')
// Int32 number of relations described by the truncate message
// Int8 flags for truncate; 1=CASCADE, 2=RESTART IDENTITY
// Int32[] Array of number of relation ids
//
// In short this message tells us how many relations are impacted by the truncate
// call, whether its cascaded or not and then all table relation ids involved.
// It seems the protocol guarantees to send the most up-to-date `R` relation
// messages for the tables prior to the `T` truncation message, even if in the
// same session a `R` message was followed by an insert/update/delete message.
int numberOfRelations = buffer.getInt();
int optionBits = buffer.get();
// ignored / unused
List<String> truncateOptions = getTruncateOptions(optionBits);
int[] relationIds = new int[numberOfRelations];
for (int i = 0; i < numberOfRelations; i++) {
relationIds[i] = buffer.getInt();
}
List<Table> tables = new ArrayList<>();
for (int relationId : relationIds) {
Optional<Table> resolvedTable = resolveRelation(relationId);
resolvedTable.ifPresent(tables::add);
}
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Event: {}, RelationIds: {}, OptionBits: {}", MessageType.TRUNCATE, Arrays.toString(relationIds), optionBits);
}
int noOfResolvedTables = tables.size();
for (int i = 0; i < noOfResolvedTables; i++) {
Table table = tables.get(i);
boolean lastTableInTruncate = (i + 1) == noOfResolvedTables;
processor.process(new PgOutputTruncateReplicationMessage(
Operation.TRUNCATE,
table.id().toDoubleQuotedString(),
commitTimestamp,
transactionId,
lastTableInTruncate));
}
}
/**
* Convert truncate option bits to postgres syntax truncate options
*
* @param flag truncate option bits
* @return truncate flags
*/
private List<String> getTruncateOptions(int flag) {
switch (flag) {
case 1:
return Collections.singletonList("CASCADE");
case 2:
return Collections.singletonList("RESTART IDENTITY");
case 3:
return Arrays.asList("RESTART IDENTITY", "CASCADE");
default:
return null;
}
}
/**
* Resolves a given replication message relation identifier to a {@link Table}.
*

View File

@ -0,0 +1,26 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.connection.pgoutput;
import java.time.Instant;
public class PgOutputTruncateReplicationMessage extends PgOutputReplicationMessage {
private final boolean lastTableInTruncate;
public PgOutputTruncateReplicationMessage(Operation op, String table, Instant commitTimestamp, long transactionId,
boolean lastTableInTruncate) {
super(op, table, commitTimestamp, transactionId, null, null);
this.lastTableInTruncate = lastTableInTruncate;
}
@Override
public boolean isLastEventForLsn() {
return lastTableInTruncate;
}
}

View File

@ -19,6 +19,7 @@
import static org.fest.assertions.Fail.fail;
import static org.fest.assertions.MapAssert.entry;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import java.math.BigDecimal;
import java.time.Instant;
@ -83,7 +84,9 @@
import io.debezium.jdbc.JdbcValueConverters.DecimalMode;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.junit.ConditionalFail;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.ShouldFailWhen;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.RelationalChangeRecordEmitter;
import io.debezium.relational.RelationalDatabaseConnectorConfig.DecimalHandlingMode;
@ -1881,9 +1884,11 @@ public void stopInTheMiddleOfTxAndRestart() throws Exception {
}
@Test
@SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Tests specifically that pgoutput gracefully skips these messages")
public void shouldGracefullySkipTruncateMessages() throws Exception {
startConnector();
@SkipWhenDatabaseVersion(check = EqualityCheck.LESS_THAN, major = 11, reason = "TRUNCATE events only supported in PG11+ PGOUTPUT Plugin")
@SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Tests specifically that pgoutput handles TRUNCATE messages")
public void shouldProcessTruncateMessages() throws Exception {
startConnector(builder -> builder
.with(PostgresConnectorConfig.TRUNCATE_HANDLING_MODE, PostgresConnectorConfig.TruncateHandlingMode.INCLUDE));
waitForStreamingToStart();
consumer = testConsumer(1);
@ -1893,12 +1898,62 @@ public void shouldGracefullySkipTruncateMessages() throws Exception {
assertEquals(TestHelper.topicName("public.test_table"), record.topic());
VerifyRecord.isValidInsert(record, PK_FIELD, 2);
consumer.expects(0);
TestHelper.execute("TRUNCATE TABLE public.test_table;");
consumer.expects(1);
TestHelper.execute("TRUNCATE TABLE public.test_table RESTART IDENTITY CASCADE;");
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
assertFalse(consumer.isEmpty());
SourceRecord truncateRecord = consumer.remove();
assertNotNull(truncateRecord);
VerifyRecord.isValidTruncate(truncateRecord);
assertTrue(consumer.isEmpty());
}
@Test
@SkipWhenDatabaseVersion(check = EqualityCheck.LESS_THAN, major = 11, reason = "TRUNCATE events only supported in PG11+ PGOUTPUT Plugin")
@SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Tests specifically that pgoutput handled TRUNCATE these messages")
public void shouldProcessTruncateMessagesForMultipleTableTruncateStatement() throws Exception {
TestHelper.execute("CREATE TABLE test_table_2 (pk SERIAL, text TEXT, PRIMARY KEY(pk));");
startConnector(builder -> builder.with(PostgresConnectorConfig.TRUNCATE_HANDLING_MODE, PostgresConnectorConfig.TruncateHandlingMode.INCLUDE));
waitForStreamingToStart();
consumer = testConsumer(1);
executeAndWait("INSERT INTO test_table (text) values ('TRUNCATE TEST');");
SourceRecord record = consumer.remove();
assertEquals(TestHelper.topicName("public.test_table"), record.topic());
VerifyRecord.isValidInsert(record, PK_FIELD, 2);
executeAndWait("INSERT INTO test_table_2 (text) values ('TRUNCATE TEST 2');");
SourceRecord record_2 = consumer.remove();
assertEquals(TestHelper.topicName("public.test_table_2"), record_2.topic());
VerifyRecord.isValidInsert(record_2, PK_FIELD, 1);
consumer.expects(2);
TestHelper.execute("TRUNCATE TABLE public.test_table, public.test_table_2;");
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
assertFalse(consumer.isEmpty());
SourceRecord truncateRecord = consumer.remove();
assertNotNull(truncateRecord);
VerifyRecord.isValidTruncate(truncateRecord);
SourceRecord truncateRecord_2 = consumer.remove();
assertNotNull(truncateRecord_2);
VerifyRecord.isValidTruncate(truncateRecord_2);
assertTrue(consumer.isEmpty());
assertEquals(truncateRecord.sourceOffset().get("lsn_commit"), truncateRecord_2.sourceOffset().get("lsn_commit"));
assertEquals(truncateRecord.sourceOffset().get("lsn"), truncateRecord_2.sourceOffset().get("lsn"));
assertEquals(truncateRecord.sourceOffset().get("txId"), truncateRecord_2.sourceOffset().get("txId"));
consumer = testConsumer(1);
executeAndWait("INSERT INTO test_table (text) values ('TRUNCATE TEST');");
}
@Test
@FixFor("DBZ-1413")
public void shouldStreamChangesForDataTypeAlias() throws Exception {

View File

@ -46,7 +46,11 @@ public static enum Operation {
/**
* An operation that resulted in an existing record being removed from or deleted in the source.
*/
DELETE("d");
DELETE("d"),
/**
* An operation that resulted in an existing table being truncated in the source.
*/
TRUNCATE("t");
private final String code;
@ -345,6 +349,21 @@ public Struct delete(Object before, Struct source, Instant timestamp) {
return struct;
}
/**
* Generate an {@link Operation#TRUNCATE truncate} message with the given information.
*
* @param source the information about the source where the truncate occurred; never null
* @param timestamp the timestamp for this message; never null
* @return the truncate message; never null
*/
public Struct truncate(Struct source, Instant timestamp) {
Struct struct = new Struct(schema);
struct.put(FieldName.OPERATION, Operation.TRUNCATE.code());
struct.put(FieldName.SOURCE, source);
struct.put(FieldName.TIMESTAMP, timestamp.toEpochMilli());
return struct;
}
/**
* Obtain the operation for the given source record.
*

View File

@ -53,6 +53,9 @@ public void emitChangeRecords(DataCollectionSchema schema, Receiver receiver) th
case DELETE:
emitDeleteRecord(receiver, tableSchema);
break;
case TRUNCATE:
emitTruncateRecord(receiver, tableSchema);
break;
default:
throw new IllegalArgumentException("Unsupported operation: " + operation);
}
@ -138,6 +141,10 @@ protected void emitDeleteRecord(Receiver receiver, TableSchema tableSchema) thro
receiver.changeRecord(tableSchema, Operation.DELETE, oldKey, envelope, getOffset(), null);
}
protected void emitTruncateRecord(Receiver receiver, TableSchema schema) throws InterruptedException {
throw new UnsupportedOperationException("TRUNCATE not supported");
}
/**
* Returns the operation done by the represented change.
*/

View File

@ -291,6 +291,22 @@ public static void isValidDelete(SourceRecord record, String pkField, int pk) {
isValidDelete(record, true);
}
/**
* Verify that the given {@link SourceRecord} is a {@link Operation#TRUNCATE TRUNCATE} record.
*
* @param record the source record; may not be null
*/
public static void isValidTruncate(SourceRecord record) {
assertThat(record.key()).isNull();
assertThat(record.valueSchema()).isNotNull();
Struct value = (Struct) record.value();
assertThat(value).isNotNull();
assertThat(value.getString(FieldName.OPERATION)).isEqualTo(Operation.TRUNCATE.code());
assertThat(value.get(FieldName.BEFORE)).isNull();
assertThat(value.get(FieldName.AFTER)).isNull();
}
/**
* Verify that the given {@link SourceRecord} is a valid tombstone, meaning it has a valid non-null key with key schema
* but null value and value schema.