DBZ-6970 Debezium jdbc sink process truncate event failure

This commit is contained in:
harveyyue 2023-09-26 13:43:05 +08:00 committed by Fiore Mario Vitale
parent 19bc4acfc0
commit 59c7a0acae
5 changed files with 83 additions and 7 deletions

View File

@ -219,7 +219,13 @@ private TableDescriptor alterTableIfNeeded(TableId tableId, SinkRecordDescriptor
} }
private void write(TableDescriptor table, SinkRecordDescriptor record) throws SQLException { private void write(TableDescriptor table, SinkRecordDescriptor record) throws SQLException {
if (!record.isDelete()) { if (record.isDelete()) {
writeDelete(dialect.getDeleteStatement(table, record), record);
}
else if (record.isTruncate()) {
writeTruncate(dialect.getTruncateStatement(table), record);
}
else {
switch (config.getInsertMode()) { switch (config.getInsertMode()) {
case INSERT: case INSERT:
writeInsert(dialect.getInsertStatement(table, record), record); writeInsert(dialect.getInsertStatement(table, record), record);
@ -235,9 +241,6 @@ private void write(TableDescriptor table, SinkRecordDescriptor record) throws SQ
break; break;
} }
} }
else {
writeDelete(dialect.getDeleteStatement(table, record), record);
}
} }
private void writeInsert(String sql, SinkRecordDescriptor record) throws SQLException { private void writeInsert(String sql, SinkRecordDescriptor record) throws SQLException {
@ -315,6 +318,25 @@ private void writeDelete(String sql, SinkRecordDescriptor record) throws SQLExce
} }
} }
private void writeTruncate(String sql, SinkRecordDescriptor record) throws SQLException {
if (!config.isTruncateEnabled()) {
LOGGER.debug("Truncates are not enabled, skipping truncate for topic '{}'", record.getTopicName());
return;
}
final Transaction transaction = session.beginTransaction();
try {
LOGGER.trace("SQL: {}", sql);
final NativeQuery<?> query = session.createNativeQuery(sql);
query.executeUpdate();
transaction.commit();
}
catch (Exception e) {
transaction.rollback();
throw e;
}
}
private int bindKeyValuesToQuery(SinkRecordDescriptor record, NativeQuery<?> query, int index) { private int bindKeyValuesToQuery(SinkRecordDescriptor record, NativeQuery<?> query, int index) {
if (Objects.requireNonNull(config.getPrimaryKeyMode()) == JdbcSinkConnectorConfig.PrimaryKeyMode.KAFKA) { if (Objects.requireNonNull(config.getPrimaryKeyMode()) == JdbcSinkConnectorConfig.PrimaryKeyMode.KAFKA) {

View File

@ -51,6 +51,7 @@ public class JdbcSinkConnectorConfig {
public static final String INSERT_MODE = "insert.mode"; public static final String INSERT_MODE = "insert.mode";
public static final String DELETE_ENABLED = "delete.enabled"; public static final String DELETE_ENABLED = "delete.enabled";
public static final String TRUNCATE_ENABLED = "truncate.enabled";
public static final String TABLE_NAME_FORMAT = "table.name.format"; public static final String TABLE_NAME_FORMAT = "table.name.format";
public static final String PRIMARY_KEY_MODE = "primary.key.mode"; public static final String PRIMARY_KEY_MODE = "primary.key.mode";
public static final String PRIMARY_KEY_FIELDS = "primary.key.fields"; public static final String PRIMARY_KEY_FIELDS = "primary.key.fields";
@ -150,6 +151,16 @@ public class JdbcSinkConnectorConfig {
.withValidation(JdbcSinkConnectorConfig::validateDeleteEnabled) .withValidation(JdbcSinkConnectorConfig::validateDeleteEnabled)
.withDescription("Whether to treat `null` record values as deletes. Requires primary.key.mode to be `record.key`."); .withDescription("Whether to treat `null` record values as deletes. Requires primary.key.mode to be `record.key`.");
public static final Field TRUNCATE_ENABLED_FIELD = Field.create(TRUNCATE_ENABLED)
.withDisplayName("Controls whether records can be truncated by the connector")
.withType(Type.BOOLEAN)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 2))
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.LOW)
.withDefault(false)
.withValidation(JdbcSinkConnectorConfig::validateDeleteEnabled)
.withDescription("Whether to process debezium event `t` as truncate statement.");
public static final Field TABLE_NAME_FORMAT_FIELD = Field.create(TABLE_NAME_FORMAT) public static final Field TABLE_NAME_FORMAT_FIELD = Field.create(TABLE_NAME_FORMAT)
.withDisplayName("A format string for the table") .withDisplayName("A format string for the table")
.withType(Type.STRING) .withType(Type.STRING)
@ -262,6 +273,7 @@ public class JdbcSinkConnectorConfig {
CONNECTION_POOL_TIMEOUT_FIELD, CONNECTION_POOL_TIMEOUT_FIELD,
INSERT_MODE_FIELD, INSERT_MODE_FIELD,
DELETE_ENABLED_FIELD, DELETE_ENABLED_FIELD,
TRUNCATE_ENABLED_FIELD,
TABLE_NAME_FORMAT_FIELD, TABLE_NAME_FORMAT_FIELD,
PRIMARY_KEY_MODE_FIELD, PRIMARY_KEY_MODE_FIELD,
PRIMARY_KEY_FIELDS_FIELD, PRIMARY_KEY_FIELDS_FIELD,
@ -432,6 +444,7 @@ public String getValue() {
private final InsertMode insertMode; private final InsertMode insertMode;
private final boolean deleteEnabled; private final boolean deleteEnabled;
private final boolean truncateEnabled;
private final String tableNameFormat; private final String tableNameFormat;
private final PrimaryKeyMode primaryKeyMode; private final PrimaryKeyMode primaryKeyMode;
private final Set<String> primaryKeyFields; private final Set<String> primaryKeyFields;
@ -449,6 +462,7 @@ public JdbcSinkConnectorConfig(Map<String, String> props) {
config = Configuration.from(props); config = Configuration.from(props);
this.insertMode = InsertMode.parse(config.getString(INSERT_MODE)); this.insertMode = InsertMode.parse(config.getString(INSERT_MODE));
this.deleteEnabled = config.getBoolean(DELETE_ENABLED_FIELD); this.deleteEnabled = config.getBoolean(DELETE_ENABLED_FIELD);
this.truncateEnabled = config.getBoolean(TRUNCATE_ENABLED_FIELD);
this.tableNameFormat = config.getString(TABLE_NAME_FORMAT_FIELD); this.tableNameFormat = config.getString(TABLE_NAME_FORMAT_FIELD);
this.primaryKeyMode = PrimaryKeyMode.parse(config.getString(PRIMARY_KEY_MODE_FIELD)); this.primaryKeyMode = PrimaryKeyMode.parse(config.getString(PRIMARY_KEY_MODE_FIELD));
this.primaryKeyFields = Strings.setOf(config.getString(PRIMARY_KEY_FIELDS_FIELD), String::new); this.primaryKeyFields = Strings.setOf(config.getString(PRIMARY_KEY_FIELDS_FIELD), String::new);
@ -491,6 +505,10 @@ public boolean isDeleteEnabled() {
return deleteEnabled; return deleteEnabled;
} }
public boolean isTruncateEnabled() {
return truncateEnabled;
}
public String getTableNameFormat() { public String getTableNameFormat() {
return tableNameFormat; return tableNameFormat;
} }

View File

@ -100,6 +100,14 @@ else if (record.value() != null) {
return false; return false;
} }
public boolean isTruncate() {
if (isDebeziumSinkRecord()) {
final Struct value = (Struct) record.value();
return Operation.TRUNCATE.equals(Operation.forCode(value.getString(Envelope.FieldName.OPERATION)));
}
return false;
}
public Struct getKeyStruct(PrimaryKeyMode primaryKeyMode) { public Struct getKeyStruct(PrimaryKeyMode primaryKeyMode) {
if (!getKeyFieldNames().isEmpty()) { if (!getKeyFieldNames().isEmpty()) {
switch (primaryKeyMode) { switch (primaryKeyMode) {
@ -281,9 +289,12 @@ public SinkRecordDescriptor build() {
Objects.requireNonNull(primaryKeyMode, "The primary key mode must be provided."); Objects.requireNonNull(primaryKeyMode, "The primary key mode must be provided.");
Objects.requireNonNull(sinkRecord, "The sink record must be provided."); Objects.requireNonNull(sinkRecord, "The sink record must be provided.");
final boolean flattened = !isTombstone(sinkRecord) && isFlattened(sinkRecord); boolean flattened = false;
readSinkRecordKeyData(sinkRecord, flattened); if (!isTruncateEvent(sinkRecord)) {
readSinkRecordNonKeyData(sinkRecord, flattened); flattened = !isTombstone(sinkRecord) && isFlattened(sinkRecord);
readSinkRecordKeyData(sinkRecord, flattened);
readSinkRecordNonKeyData(sinkRecord, flattened);
}
return new SinkRecordDescriptor(sinkRecord, sinkRecord.topic(), keyFieldNames, nonKeyFieldNames, allFields, flattened); return new SinkRecordDescriptor(sinkRecord, sinkRecord.topic(), keyFieldNames, nonKeyFieldNames, allFields, flattened);
} }
@ -297,6 +308,14 @@ private boolean isTombstone(SinkRecord record) {
return record.value() == null && record.valueSchema() == null; return record.value() == null && record.valueSchema() == null;
} }
private boolean isTruncateEvent(SinkRecord record) {
if (!isFlattened(record)) {
final Struct value = (Struct) record.value();
return Operation.TRUNCATE.equals(Operation.forCode(value.getString(Envelope.FieldName.OPERATION)));
}
return false;
}
private void readSinkRecordKeyData(SinkRecord record, boolean flattened) { private void readSinkRecordKeyData(SinkRecord record, boolean flattened) {
switch (primaryKeyMode) { switch (primaryKeyMode) {
case NONE: case NONE:

View File

@ -167,6 +167,14 @@ public interface DatabaseDialect {
*/ */
String getDeleteStatement(TableDescriptor table, SinkRecordDescriptor record); String getDeleteStatement(TableDescriptor table, SinkRecordDescriptor record);
/**
* Construct a {@code TRUNCATE} statement specific for this dialect.
*
* @param table the current relational table model, should not be {@code null}
* @return the truncate SQL statement to be executed, never {@code null}
*/
String getTruncateStatement(TableDescriptor table);
/** /**
* Returns the SQL binding fragment for a column, schema, and type mapping. * Returns the SQL binding fragment for a column, schema, and type mapping.
* *

View File

@ -397,6 +397,15 @@ public String getDeleteStatement(TableDescriptor table, SinkRecordDescriptor rec
return builder.build(); return builder.build();
} }
@Override
public String getTruncateStatement(TableDescriptor table) {
final SqlStatementBuilder builder = new SqlStatementBuilder();
builder.append("TRUNCATE TABLE ");
builder.append(getQualifiedTableName(table.getId()));
return builder.build();
}
@Override @Override
public String getQueryBindingWithValueCast(ColumnDescriptor column, Schema schema, Type type) { public String getQueryBindingWithValueCast(ColumnDescriptor column, Schema schema, Type type) {
return "?"; return "?";