diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java index 58ed1ff8f..d1b09a108 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java @@ -219,7 +219,13 @@ private TableDescriptor alterTableIfNeeded(TableId tableId, SinkRecordDescriptor } private void write(TableDescriptor table, SinkRecordDescriptor record) throws SQLException { - if (!record.isDelete()) { + if (record.isDelete()) { + writeDelete(dialect.getDeleteStatement(table, record), record); + } + else if (record.isTruncate()) { + writeTruncate(dialect.getTruncateStatement(table), record); + } + else { switch (config.getInsertMode()) { case INSERT: writeInsert(dialect.getInsertStatement(table, record), record); @@ -235,9 +241,6 @@ private void write(TableDescriptor table, SinkRecordDescriptor record) throws SQ break; } } - else { - writeDelete(dialect.getDeleteStatement(table, record), record); - } } private void writeInsert(String sql, SinkRecordDescriptor record) throws SQLException { @@ -315,6 +318,25 @@ private void writeDelete(String sql, SinkRecordDescriptor record) throws SQLExce } } + private void writeTruncate(String sql, SinkRecordDescriptor record) throws SQLException { + if (!config.isTruncateEnabled()) { + LOGGER.debug("Truncates are not enabled, skipping truncate for topic '{}'", record.getTopicName()); + return; + } + final Transaction transaction = session.beginTransaction(); + try { + LOGGER.trace("SQL: {}", sql); + final NativeQuery query = session.createNativeQuery(sql); + + query.executeUpdate(); + transaction.commit(); + } + catch (Exception e) { + transaction.rollback(); + throw e; + } + } + private int bindKeyValuesToQuery(SinkRecordDescriptor record, NativeQuery query, int index) { if (Objects.requireNonNull(config.getPrimaryKeyMode()) == JdbcSinkConnectorConfig.PrimaryKeyMode.KAFKA) { diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcSinkConnectorConfig.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcSinkConnectorConfig.java index 66aa214ca..cb2a26318 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcSinkConnectorConfig.java +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcSinkConnectorConfig.java @@ -51,6 +51,7 @@ public class JdbcSinkConnectorConfig { public static final String INSERT_MODE = "insert.mode"; public static final String DELETE_ENABLED = "delete.enabled"; + public static final String TRUNCATE_ENABLED = "truncate.enabled"; public static final String TABLE_NAME_FORMAT = "table.name.format"; public static final String PRIMARY_KEY_MODE = "primary.key.mode"; public static final String PRIMARY_KEY_FIELDS = "primary.key.fields"; @@ -150,6 +151,16 @@ public class JdbcSinkConnectorConfig { .withValidation(JdbcSinkConnectorConfig::validateDeleteEnabled) .withDescription("Whether to treat `null` record values as deletes. Requires primary.key.mode to be `record.key`."); + public static final Field TRUNCATE_ENABLED_FIELD = Field.create(TRUNCATE_ENABLED) + .withDisplayName("Controls whether records can be truncated by the connector") + .withType(Type.BOOLEAN) + .withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 2)) + .withWidth(ConfigDef.Width.SHORT) + .withImportance(ConfigDef.Importance.LOW) + .withDefault(false) + .withValidation(JdbcSinkConnectorConfig::validateDeleteEnabled) + .withDescription("Whether to process debezium event `t` as truncate statement."); + public static final Field TABLE_NAME_FORMAT_FIELD = Field.create(TABLE_NAME_FORMAT) .withDisplayName("A format string for the table") .withType(Type.STRING) @@ -262,6 +273,7 @@ public class JdbcSinkConnectorConfig { CONNECTION_POOL_TIMEOUT_FIELD, INSERT_MODE_FIELD, DELETE_ENABLED_FIELD, + TRUNCATE_ENABLED_FIELD, TABLE_NAME_FORMAT_FIELD, PRIMARY_KEY_MODE_FIELD, PRIMARY_KEY_FIELDS_FIELD, @@ -432,6 +444,7 @@ public String getValue() { private final InsertMode insertMode; private final boolean deleteEnabled; + private final boolean truncateEnabled; private final String tableNameFormat; private final PrimaryKeyMode primaryKeyMode; private final Set primaryKeyFields; @@ -449,6 +462,7 @@ public JdbcSinkConnectorConfig(Map props) { config = Configuration.from(props); this.insertMode = InsertMode.parse(config.getString(INSERT_MODE)); this.deleteEnabled = config.getBoolean(DELETE_ENABLED_FIELD); + this.truncateEnabled = config.getBoolean(TRUNCATE_ENABLED_FIELD); this.tableNameFormat = config.getString(TABLE_NAME_FORMAT_FIELD); this.primaryKeyMode = PrimaryKeyMode.parse(config.getString(PRIMARY_KEY_MODE_FIELD)); this.primaryKeyFields = Strings.setOf(config.getString(PRIMARY_KEY_FIELDS_FIELD), String::new); @@ -491,6 +505,10 @@ public boolean isDeleteEnabled() { return deleteEnabled; } + public boolean isTruncateEnabled() { + return truncateEnabled; + } + public String getTableNameFormat() { return tableNameFormat; } diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/SinkRecordDescriptor.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/SinkRecordDescriptor.java index 95e8ed021..f968a2cde 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/SinkRecordDescriptor.java +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/SinkRecordDescriptor.java @@ -100,6 +100,14 @@ else if (record.value() != null) { return false; } + public boolean isTruncate() { + if (isDebeziumSinkRecord()) { + final Struct value = (Struct) record.value(); + return Operation.TRUNCATE.equals(Operation.forCode(value.getString(Envelope.FieldName.OPERATION))); + } + return false; + } + public Struct getKeyStruct(PrimaryKeyMode primaryKeyMode) { if (!getKeyFieldNames().isEmpty()) { switch (primaryKeyMode) { @@ -281,9 +289,12 @@ public SinkRecordDescriptor build() { Objects.requireNonNull(primaryKeyMode, "The primary key mode must be provided."); Objects.requireNonNull(sinkRecord, "The sink record must be provided."); - final boolean flattened = !isTombstone(sinkRecord) && isFlattened(sinkRecord); - readSinkRecordKeyData(sinkRecord, flattened); - readSinkRecordNonKeyData(sinkRecord, flattened); + boolean flattened = false; + if (!isTruncateEvent(sinkRecord)) { + flattened = !isTombstone(sinkRecord) && isFlattened(sinkRecord); + readSinkRecordKeyData(sinkRecord, flattened); + readSinkRecordNonKeyData(sinkRecord, flattened); + } return new SinkRecordDescriptor(sinkRecord, sinkRecord.topic(), keyFieldNames, nonKeyFieldNames, allFields, flattened); } @@ -297,6 +308,14 @@ private boolean isTombstone(SinkRecord record) { return record.value() == null && record.valueSchema() == null; } + private boolean isTruncateEvent(SinkRecord record) { + if (!isFlattened(record)) { + final Struct value = (Struct) record.value(); + return Operation.TRUNCATE.equals(Operation.forCode(value.getString(Envelope.FieldName.OPERATION))); + } + return false; + } + private void readSinkRecordKeyData(SinkRecord record, boolean flattened) { switch (primaryKeyMode) { case NONE: diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/DatabaseDialect.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/DatabaseDialect.java index 6d7652a82..7cb1c5699 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/DatabaseDialect.java +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/DatabaseDialect.java @@ -167,6 +167,14 @@ public interface DatabaseDialect { */ String getDeleteStatement(TableDescriptor table, SinkRecordDescriptor record); + /** + * Construct a {@code TRUNCATE} statement specific for this dialect. + * + * @param table the current relational table model, should not be {@code null} + * @return the truncate SQL statement to be executed, never {@code null} + */ + String getTruncateStatement(TableDescriptor table); + /** * Returns the SQL binding fragment for a column, schema, and type mapping. * diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/GeneralDatabaseDialect.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/GeneralDatabaseDialect.java index b11fc309d..00a553d63 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/GeneralDatabaseDialect.java +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/GeneralDatabaseDialect.java @@ -397,6 +397,15 @@ public String getDeleteStatement(TableDescriptor table, SinkRecordDescriptor rec return builder.build(); } + @Override + public String getTruncateStatement(TableDescriptor table) { + final SqlStatementBuilder builder = new SqlStatementBuilder(); + builder.append("TRUNCATE TABLE "); + builder.append(getQualifiedTableName(table.getId())); + + return builder.build(); + } + @Override public String getQueryBindingWithValueCast(ColumnDescriptor column, Schema schema, Type type) { return "?";