diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlChangeRecordEmitter.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlChangeRecordEmitter.java index b22404aae..abb77eee3 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlChangeRecordEmitter.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlChangeRecordEmitter.java @@ -7,10 +7,13 @@ import java.io.Serializable; +import org.apache.kafka.connect.data.Struct; + import io.debezium.data.Envelope; import io.debezium.data.Envelope.Operation; import io.debezium.pipeline.spi.OffsetContext; import io.debezium.relational.RelationalChangeRecordEmitter; +import io.debezium.relational.TableSchema; import io.debezium.util.Clock; /** @@ -53,4 +56,10 @@ protected Object[] getOldColumnValues() { protected Object[] getNewColumnValues() { return after != null ? after : null; } + + @Override + protected void emitTruncateRecord(Receiver receiver, TableSchema tableSchema) throws InterruptedException { + Struct envelope = tableSchema.getEnvelopeSchema().truncate(getOffset().getSourceInfo(), getClock().currentTimeAsInstant()); + receiver.changeRecord(getPartition(), tableSchema, Operation.TRUNCATE, null, envelope, getOffset(), null); + } } diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDatabaseSchema.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDatabaseSchema.java index a6edc9edf..e3cf044b4 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDatabaseSchema.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDatabaseSchema.java @@ -40,6 +40,7 @@ import io.debezium.relational.ddl.DdlParserListener.TableIndexCreatedEvent; import io.debezium.relational.ddl.DdlParserListener.TableIndexDroppedEvent; import io.debezium.relational.ddl.DdlParserListener.TableIndexEvent; +import io.debezium.relational.ddl.DdlParserListener.TableTruncatedEvent; import io.debezium.schema.SchemaChangeEvent; import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType; import io.debezium.spi.topic.TopicNamingStrategy; @@ -259,6 +260,10 @@ else if (event instanceof TableDroppedEvent) { emitChangeEvent(partition, offset, schemaChangeEvents, sanitizedDbName, event, tableId, SchemaChangeEventType.DROP, snapshot); } + else if (event instanceof TableTruncatedEvent) { + emitChangeEvent(partition, offset, schemaChangeEvents, sanitizedDbName, event, tableId, + SchemaChangeEventType.TRUNCATE, snapshot); + } else if (event instanceof SetVariableEvent) { // SET statement with multiple variable emits event for each variable. We want to emit only // one change event diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java index 88d571d5e..11ce89a26 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java @@ -78,6 +78,7 @@ import io.debezium.pipeline.source.spi.StreamingChangeEventSource; import io.debezium.relational.TableId; import io.debezium.schema.SchemaChangeEvent; +import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType; import io.debezium.time.Conversions; import io.debezium.util.Clock; import io.debezium.util.Metronome; @@ -595,6 +596,11 @@ protected void handleQueryEvent(MySqlPartition partition, MySqlOffsetContext off } final TableId tableId = schemaChangeEvent.getTables().isEmpty() ? null : schemaChangeEvent.getTables().iterator().next().id(); + if (tableId != null && !connectorConfig.getSkippedOperations().contains(Operation.TRUNCATE) + && schemaChangeEvent.getType().equals(SchemaChangeEventType.TRUNCATE)) { + eventDispatcher.dispatchDataChangeEvent(partition, tableId, + new MySqlChangeRecordEmitter(partition, offsetContext, clock, Operation.TRUNCATE, null, null)); + } eventDispatcher.dispatchSchemaChangeEvent(partition, tableId, (receiver) -> { try { receiver.schemaChangeEvent(schemaChangeEvent); diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java index 34ff2f506..067746f13 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java @@ -2506,6 +2506,39 @@ public void shouldNotSendTombstonesWhenNotSupportedByHandler() throws Exception stopConnector(); } + @Test + @FixFor("DBZ-5610") + public void shouldEmitTruncateOperation() throws Exception { + config = DATABASE.defaultConfig() + .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(MySqlConnectorConfig.SNAPSHOT_LOCKING_MODE, SnapshotLockingMode.NONE) + .with(MySqlConnectorConfig.SKIPPED_OPERATIONS, "none") + .build(); + + start(MySqlConnector.class, config); + waitForSnapshotToBeCompleted("mysql", DATABASE.getServerName()); + + try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName())) { + try (JdbcConnection connection = db.connect()) { + connection.execute("insert into orders values(1000, '2022-10-09', 1002, 90, 106)"); + connection.execute("truncate table orders;"); + } + } + + SourceRecords records = consumeRecordsByTopic(2); + List changeEvents = records.recordsForTopic(DATABASE.topicForTable("orders")); + Struct truncateStruct = (Struct) changeEvents.get(1).value(); + + assertInsert(changeEvents.get(0), "order_number", 1000); + assertThat(truncateStruct.get("before")).isNull(); + assertThat(truncateStruct.get("after")).isNull(); + assertThat(truncateStruct.get("op")).isEqualTo("t"); + assertThat(changeEvents.size()).isEqualTo(2); + + stopConnector(); + } + private static class NoTombStonesHandler implements DebeziumEngine.ChangeConsumer { protected BlockingQueue recordQueue; diff --git a/debezium-core/src/main/java/io/debezium/schema/SchemaChangeEvent.java b/debezium-core/src/main/java/io/debezium/schema/SchemaChangeEvent.java index a71237746..0c97ed066 100644 --- a/debezium-core/src/main/java/io/debezium/schema/SchemaChangeEvent.java +++ b/debezium-core/src/main/java/io/debezium/schema/SchemaChangeEvent.java @@ -133,14 +133,15 @@ public String toString() { /** * Type describing the content of the event. - * CREATE, ALTER, DROP - corresponds to table operations + * CREATE, ALTER, DROP, TRUNCATE - corresponds to table operations * DATABASE - an event common to the database, like CREATE/DROP DATABASE or SET... */ public static enum SchemaChangeEventType { CREATE, ALTER, DROP, - DATABASE; + TRUNCATE, + DATABASE } /** diff --git a/documentation/modules/ROOT/pages/connectors/cassandra.adoc b/documentation/modules/ROOT/pages/connectors/cassandra.adoc index a7ed7a948..da5d4bb76 100644 --- a/documentation/modules/ROOT/pages/connectors/cassandra.adoc +++ b/documentation/modules/ROOT/pages/connectors/cassandra.adoc @@ -1054,10 +1054,10 @@ refreshing the cached Cassandra table schemas. See xref:{link-avro-serialization}#avro-naming[Avro naming] for more details. |[[cassandra-property-skipped-operations]]<> -|No default -| comma-separated list of operation types that will be skipped during streaming. -The operations include: `c` for inserts/create, `u` for updates, and `d` for deletes. -By default, no operations are skipped. +|`t` +|A comma-separated list of operation types that will be skipped during streaming. +The operations include: `c` for inserts/create, `u` for updates, `d` for deletes, `t` for truncates, and `none` to not skip any operations. +By default, truncate operations are skipped. |[[cassandra-property-topic-naming-strategy]]<> |`io.debezium.schema.SchemaTopicNamingStrategy` diff --git a/documentation/modules/ROOT/pages/connectors/db2.adoc b/documentation/modules/ROOT/pages/connectors/db2.adoc index a4746a2fa..748504126 100644 --- a/documentation/modules/ROOT/pages/connectors/db2.adoc +++ b/documentation/modules/ROOT/pages/connectors/db2.adoc @@ -2383,10 +2383,10 @@ In the resulting snapshot, the connector includes only the records for which `de |Determines whether the connector generates events with transaction boundaries and enriches change event envelopes with transaction metadata. Specify `true` if you want the connector to do this. See xref:{link-db2-connector}#db2-transaction-metadata[Transaction metadata] for details. |[[db2-property-skipped-operations]]<> -|No default -| comma-separated list of operation types that will be skipped during streaming. -The operations include: `c` for inserts/create, `u` for updates, and `d` for deletes. -By default, no operations are skipped. +|`t` +|A comma-separated list of operation types that will be skipped during streaming. +The operations include: `c` for inserts/create, `u` for updates, `d` for deletes, `t` for truncates, and `none` to not skip any operations. +By default, truncate operations are skipped. |[[db2-property-signal-data-collection]]<> |No default diff --git a/documentation/modules/ROOT/pages/connectors/mongodb.adoc b/documentation/modules/ROOT/pages/connectors/mongodb.adoc index 348acfe1b..438b27a29 100644 --- a/documentation/modules/ROOT/pages/connectors/mongodb.adoc +++ b/documentation/modules/ROOT/pages/connectors/mongodb.adoc @@ -1520,10 +1520,10 @@ See xref:{link-avro-serialization}#avro-naming[Avro naming] for more details. endif::community[] |[[mongodb-property-skipped-operations]]<> -|No default -| comma-separated list of operation types that will be skipped during streaming. -The operations include: `c` for inserts/create, `u` for updates, and `d` for deletes. -By default, no operations are skipped. +|`t` +|A comma-separated list of operation types that will be skipped during streaming. +The operations include: `c` for inserts/create, `u` for updates, `d` for deletes, `t` for truncates, and `none` to not skip any operations. +By default, truncate operations are skipped. |[[mongodb-property-snapshot-collection-filter-overrides]]<> |No default diff --git a/documentation/modules/ROOT/pages/connectors/mysql.adoc b/documentation/modules/ROOT/pages/connectors/mysql.adoc index c33c89cdb..8f5759e38 100644 --- a/documentation/modules/ROOT/pages/connectors/mysql.adoc +++ b/documentation/modules/ROOT/pages/connectors/mysql.adoc @@ -908,6 +908,7 @@ The value portion of a change event for a change to this table is described for: * <> * <> * <> +* <> // Type: continue [id="mysql-create-events"] @@ -1370,6 +1371,81 @@ MySQL connector events are designed to work with link:{link-kafka-docs}/#compact === Tombstone events When a row is deleted, the _delete_ event value still works with log compaction, because Kafka can remove all earlier messages that have that same key. However, for Kafka to remove all messages that have that same key, the message value must be `null`. To make this possible, after {prodname}’s MySQL connector emits a _delete_ event, the connector emits a special tombstone event that has the same key but a `null` value. +// Type: continue +[id="mysql-truncate-events"] +=== _truncate_ events +A _truncate_ change event signals that a table has been truncated. +The message key is `null` in this case, the message value looks like this: + +[source,json,indent=0,subs="+attributes"] +---- +{ + "schema": { ... }, + "payload": { + "source": { // <1> + "version": "{debezium-version}", + "name": "mysql-server-1", + "connector": "mysql", + "name": "mysql-server-1", + "ts_ms": 1465581029100, + "snapshot": false, + "db": "inventory", + "table": "customers", + "server_id": 223344, + "gtid": null, + "file": "mysql-bin.000003", + "pos": 484, + "row": 0, + "thread": 7, + "query": "UPDATE customers SET first_name='Anne Marie' WHERE id=1004" + }, + "op": "t", // <2> + "ts_ms": 1465581029523 // <3> + } +} +---- + +.Descriptions of _truncate_ event value fields +[cols="1,2,7",options="header"] +|=== +|Item |Field name |Description + +|1 +|`source` +a|Mandatory field that describes the source metadata for the event. In a _truncate_ event value, the `source` field structure is the same as for _create_, _update_, and _delete_ events for the same table, provides this metadata: + +* {prodname} version +* Connector type and name +* Binlog name where the event was recorded +* Binlog position +* Row within the event +* If the event was part of a snapshot +* Name of the database and table +* ID of the MySQL thread that truncated the event (non-snapshot only) +* MySQL server ID (if available) +* Timestamp for when the change was made in the database + +|2 +|`op` +a|Mandatory string that describes the type of operation. The `op` field value is `t`, signifying that this table was truncated. + +|3 +|`ts_ms` +a|Optional field that displays the time at which the connector processed the event. The time is based on the system clock in the JVM running the Kafka Connect task. + ++ +In the `source` object, `ts_ms` indicates the time that the change was made in the database. By comparing the value for `payload.source.ts_ms` with the value for `payload.ts_ms`, you can determine the lag between the source database update and {prodname}. + +|=== + +In case a single `TRUNCATE` statement applies to multiple tables, +one _truncate_ change event record for each truncated table will be emitted. + +Note that since _truncate_ events represent a change made to an entire table and don't have a message key, +unless you're working with topics with a single partition, +there are no ordering guarantees for the change events pertaining to a table (_create_, _update_, etc.) and _truncate_ events for that table. +For instance a consumer may receive an _update_ event only after a _truncate_ event for that table, +when those events are read from different partitions. + // Type: reference // ModuleID: how-debezium-mysql-connectors-map-data-types // Title: How {prodname} MySQL connectors map data types @@ -2993,8 +3069,10 @@ endif::community[] |Indicates whether field names are sanitized to adhere to xref:{link-avro-serialization}#avro-naming[Avro naming requirements]. |[[mysql-property-skipped-operations]]<> -|No default -|Comma-separated list of operation types to skip during streaming. The following values are possible: `c` for inserts/create, `u` for updates, `d` for deletes. By default, no operations are skipped. +|`t` +|A comma-separated list of operation types that will be skipped during streaming. +The operations include: `c` for inserts/create, `u` for updates, `d` for deletes, `t` for truncates, and `none` to not skip any operations. +By default, truncate operations are skipped. |[[mysql-property-signal-data-collection]]<> |No default value diff --git a/documentation/modules/ROOT/pages/connectors/oracle.adoc b/documentation/modules/ROOT/pages/connectors/oracle.adoc index 488fdc85c..8ff53e0d3 100644 --- a/documentation/modules/ROOT/pages/connectors/oracle.adoc +++ b/documentation/modules/ROOT/pages/connectors/oracle.adoc @@ -3174,7 +3174,7 @@ If you supply a raw JDBC URL for the database by using the xref:oracle-property- |[[oracle-property-skipped-operations]]<> -|No default +|`t` |A comma-separated list of the operation types that you want the connector to skip during streaming. You can configure the connector to skip the following types of operations: diff --git a/documentation/modules/ROOT/pages/connectors/sqlserver.adoc b/documentation/modules/ROOT/pages/connectors/sqlserver.adoc index a6a32de05..e05698462 100644 --- a/documentation/modules/ROOT/pages/connectors/sqlserver.adoc +++ b/documentation/modules/ROOT/pages/connectors/sqlserver.adoc @@ -2550,10 +2550,10 @@ See xref:{link-avro-serialization}#avro-naming[Avro naming] for more details. |The number of milli-seconds to wait before restarting a connector after a retriable error occurs. |[[sqlserver-property-skipped-operations]]<> -|No default -| comma-separated list of operation types that will be skipped during streaming. -The operations include: `c` for inserts/create, `u` for updates, and `d` for deletes. -By default, no operations are skipped. +|`t` +|A comma-separated list of operation types that will be skipped during streaming. +The operations include: `c` for inserts/create, `u` for updates, `d` for deletes, `t` for truncates, and `none` to not skip any operations. +By default, truncate operations are skipped. |[[sqlserver-property-signal-data-collection]]<> |No default value diff --git a/documentation/modules/ROOT/pages/connectors/vitess.adoc b/documentation/modules/ROOT/pages/connectors/vitess.adoc index a85b2fd4d..b324d15e5 100644 --- a/documentation/modules/ROOT/pages/connectors/vitess.adoc +++ b/documentation/modules/ROOT/pages/connectors/vitess.adoc @@ -1370,10 +1370,10 @@ For example, if you set `max.queue.size=1000`, and `max.queue.size.in.bytes=5000 |Indicates whether field names are sanitized to adhere to xref:{link-avro-serialization}#avro-naming[Avro naming requirements]. |[[vitess-property-skipped-operations]]<> -|No default -| comma-separated list of operation types that will be skipped during streaming. -The operations include: `c` for inserts/create, `u` for updates, and `d` for deletes. -By default, no operations are skipped. +|`t` +|A comma-separated list of operation types that will be skipped during streaming. +The operations include: `c` for inserts/create, `u` for updates, `d` for deletes, `t` for truncates, and `none` to not skip any operations. +By default, truncate operations are skipped. |[[vitess-property-provide-transaction-metadata]]<> |`false`