DBZ-8018 Use all record fields as primary key columns when no primary key fields are specified

This commit is contained in:
mfvitale 2024-07-18 12:04:09 +02:00 committed by Chris Cranford
parent a4a6b03397
commit 12f485e2c9
2 changed files with 26 additions and 23 deletions

View File

@ -11,6 +11,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
@ -396,30 +397,25 @@ private void applyRecordHeaderAsPrimaryKey(SinkRecord record) {
}
private void applyRecordValueAsPrimaryKey(SinkRecord record, boolean flattened) {
if (primaryKeyFields.isEmpty()) {
throw new ConnectException("At least one " + JdbcSinkConnectorConfig.PRIMARY_KEY_FIELDS +
" field name should be specified when resolving keys from the record's value.");
}
final Schema valueSchema = record.valueSchema();
if (valueSchema == null) {
throw new ConnectException("Configured primary key mode 'record_value' cannot have null schema");
}
else if (flattened) {
for (Field field : record.valueSchema().fields()) {
if (primaryKeyFields.contains(field.name())) {
addKeyField(record.topic(), field);
}
}
Stream<Field> recordFields;
if (flattened) {
recordFields = record.valueSchema().fields().stream();
}
else {
final Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
for (Field field : after.schema().fields()) {
if (primaryKeyFields.contains(field.name())) {
addKeyField(record.topic(), field);
}
recordFields = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER).schema().fields().stream();
}
if (!primaryKeyFields.isEmpty()) {
recordFields = recordFields.filter(field -> primaryKeyFields.contains(field.name()));
}
recordFields.forEach(field -> addKeyField(record.topic(), field));
}
private void applyPrimitiveRecordKeyAsPrimaryKey(Schema keySchema) {

View File

@ -292,13 +292,19 @@ public void testRecordWithPrimaryKeyColumnWithPrimaryKeyModeRecordValueWithNoFie
final String tableName = randomTableName();
final String topicName = topicName("server1", "schema", tableName);
try {
consume(factory.createRecord(topicName));
stopSinkConnector();
}
catch (Exception e) {
assertThat(e.getCause().getCause().getMessage()).contains("At least one primary.key.fields field name should be specified");
}
final SinkRecord createRecord = factory.createRecordNoKey(topicName);
consume(createRecord);
final String destinationTableName = destinationTableName(createRecord);
final TableAssert tableAssert = TestHelper.assertTable(dataSource(), destinationTableName);
tableAssert.exists().hasNumberOfColumns(3);
getSink().assertColumnType(tableAssert, "id", ValueType.NUMBER, (byte) 1);
getSink().assertColumnType(tableAssert, "name", ValueType.TEXT, "John Doe");
getSink().assertColumnType(tableAssert, "nick_name$", ValueType.TEXT, "John Doe$");
assertHasPrimaryKeyColumns(destinationTableName, "id", "name", "nick_name$");
}
@ParameterizedTest
@ -396,6 +402,7 @@ protected void assertHasPrimaryKeyColumns(String tableName, boolean caseInsensit
}
else if (caseInsensitive) {
pkColumnNames = pkColumnNames.stream().map(String::toLowerCase).collect(Collectors.toList());
assertThat(pkColumnNames.size()).isEqualTo(columnNames.length);
for (int columnIndex = 0; columnIndex < columnNames.length; ++columnIndex) {
assertThat(pkColumnNames).contains(columnNames[columnIndex].toLowerCase(), Index.atIndex(columnIndex));
}