DBZ-7191 Null value will be used instead of default for optional field

This commit is contained in:
mfvitale 2023-11-28 11:35:42 +01:00 committed by Chris Cranford
parent 3b8645a5b8
commit b8a6f13bba
3 changed files with 52 additions and 11 deletions

View File

@ -117,9 +117,17 @@ private int bindNonKeyValuesToQuery(SinkRecordDescriptor record, QueryBinder que
} }
private int bindFieldValuesToQuery(SinkRecordDescriptor record, QueryBinder query, int index, Struct source, List<String> fields) { private int bindFieldValuesToQuery(SinkRecordDescriptor record, QueryBinder query, int index, Struct source, List<String> fields) {
for (String fieldName : fields) { for (String fieldName : fields) {
final SinkRecordDescriptor.FieldDescriptor field = record.getFields().get(fieldName); final SinkRecordDescriptor.FieldDescriptor field = record.getFields().get(fieldName);
List<ValueBindDescriptor> boundValues = dialect.bindValue(field, index, source.get(fieldName));
Object value;
if (field.getSchema().isOptional()) {
value = source.getWithoutDefault(fieldName);
} else {
value = source.get(fieldName);
}
List<ValueBindDescriptor> boundValues = dialect.bindValue(field, index, value);
boundValues.forEach(query::bind); boundValues.forEach(query::bind);
index += boundValues.size(); index += boundValues.size();

View File

@ -7,8 +7,12 @@
import static org.fest.assertions.Assertions.assertThat; import static org.fest.assertions.Assertions.assertThat;
import java.util.Arrays;
import java.util.List;
import java.util.Map; import java.util.Map;
import io.debezium.doc.FixFor;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkRecord;
import org.assertj.db.api.TableAssert; import org.assertj.db.api.TableAssert;
import org.assertj.db.type.ValueType; import org.assertj.db.type.ValueType;
@ -348,4 +352,33 @@ public void testInsertModeUpdateWithPrimaryKeyModeRecordValue(SinkRecordFactory
getSink().assertColumnType(tableAssert, "name", ValueType.TEXT); getSink().assertColumnType(tableAssert, "name", ValueType.TEXT);
getSink().assertColumnType(tableAssert, "nick_name$", ValueType.TEXT); getSink().assertColumnType(tableAssert, "nick_name$", ValueType.TEXT);
} }
@FixFor("DBZ-7191")
@ParameterizedTest
@ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
public void testRecordDefaultValueUsedOnlyWithRequiredFieldWithNullValue(SinkRecordFactory factory) {
final Map<String, String> properties = getDefaultSinkConfig();
properties.put(JdbcSinkConnectorConfig.SCHEMA_EVOLUTION, SchemaEvolutionMode.BASIC.getValue());
properties.put(JdbcSinkConnectorConfig.PRIMARY_KEY_MODE, PrimaryKeyMode.RECORD_KEY.getValue());
properties.put(JdbcSinkConnectorConfig.INSERT_MODE, InsertMode.INSERT.getValue());
startSinkConnector(properties);
assertSinkConnectorIsRunning();
final String tableName = randomTableName();
final String topicName = topicName("server1", "schema", tableName);
final SinkRecord createRecord = factory.createRecordWithSchemaValue(topicName,
(byte) 1,
List.of( "optional_with_default_null_value"),
List.of(SchemaBuilder.string().defaultValue("default").optional().build()),
Arrays.asList(new Object[]{null}));
consume(createRecord);
final TableAssert tableAssert = TestHelper.assertTable(dataSource(), destinationTableName(createRecord));
tableAssert.exists().hasNumberOfRows(1).hasNumberOfColumns(2);
getSink().assertColumnType(tableAssert, "id", ValueType.NUMBER, (byte) 1);
getSink().assertColumnHasNullValue(tableAssert, "optional_with_default_null_value");
}
} }

View File

@ -363,28 +363,28 @@ public void testNonKeyColumnTypeResolutionFromKafkaSchemaTypeWithOptionalsWithDe
getSink().assertColumnType(tableAssert, "id", ValueType.NUMBER, (byte) 1); getSink().assertColumnType(tableAssert, "id", ValueType.NUMBER, (byte) 1);
getSink().assertColumnType(tableAssert, "col_int8", ValueType.NUMBER, (byte) 10); getSink().assertColumnType(tableAssert, "col_int8", ValueType.NUMBER, (byte) 10);
getSink().assertColumnType(tableAssert, "col_int8_optional", ValueType.NUMBER, (byte) 10); getSink().assertColumnHasNullValue(tableAssert, "col_int8_optional");
getSink().assertColumnType(tableAssert, "col_int16", ValueType.NUMBER, (short) 15); getSink().assertColumnType(tableAssert, "col_int16", ValueType.NUMBER, (short) 15);
getSink().assertColumnType(tableAssert, "col_int16_optional", ValueType.NUMBER, (short) 15); getSink().assertColumnHasNullValue(tableAssert, "col_int16_optional");
getSink().assertColumnType(tableAssert, "col_int32", ValueType.NUMBER, 1024); getSink().assertColumnType(tableAssert, "col_int32", ValueType.NUMBER, 1024);
getSink().assertColumnType(tableAssert, "col_int32_optional", ValueType.NUMBER, 1024); getSink().assertColumnHasNullValue(tableAssert, "col_int32_optional");
getSink().assertColumnType(tableAssert, "col_int64", ValueType.NUMBER, 1024L); getSink().assertColumnType(tableAssert, "col_int64", ValueType.NUMBER, 1024L);
getSink().assertColumnType(tableAssert, "col_int64_optional", ValueType.NUMBER, 1024L); getSink().assertColumnHasNullValue(tableAssert, "col_int64_optional");
getSink().assertColumnType(tableAssert, "col_float32", ValueType.NUMBER, 3.14f); getSink().assertColumnType(tableAssert, "col_float32", ValueType.NUMBER, 3.14f);
getSink().assertColumnType(tableAssert, "col_float32_optional", ValueType.NUMBER, 3.14f); getSink().assertColumnHasNullValue(tableAssert, "col_float32_optional");
getSink().assertColumnType(tableAssert, "col_float64", ValueType.NUMBER, 3.14d); getSink().assertColumnType(tableAssert, "col_float64", ValueType.NUMBER, 3.14d);
getSink().assertColumnType(tableAssert, "col_float64_optional", ValueType.NUMBER, 3.14d); getSink().assertColumnHasNullValue(tableAssert, "col_float64_optional");
getSink().assertColumnType(tableAssert, "col_string", ValueType.TEXT, text); getSink().assertColumnType(tableAssert, "col_string", ValueType.TEXT, text);
getSink().assertColumnType(tableAssert, "col_string_optional", ValueType.TEXT, text); getSink().assertColumnHasNullValue(tableAssert, "col_string_optional");
getSink().assertColumnType(tableAssert, "col_bytes", ValueType.BYTES, text.getBytes(StandardCharsets.UTF_8)); getSink().assertColumnType(tableAssert, "col_bytes", ValueType.BYTES, text.getBytes(StandardCharsets.UTF_8));
getSink().assertColumnType(tableAssert, "col_bytes_optional", ValueType.BYTES, text.getBytes(StandardCharsets.UTF_8)); getSink().assertColumnHasNullValue(tableAssert, "col_bytes_optional");
if (getSink().getType().is(SinkType.ORACLE)) { if (getSink().getType().is(SinkType.ORACLE)) {
getSink().assertColumnType(tableAssert, "col_bool", ValueType.NUMBER, 1); getSink().assertColumnType(tableAssert, "col_bool", ValueType.NUMBER, 1);
getSink().assertColumnType(tableAssert, "col_bool_optional", ValueType.NUMBER, true); getSink().assertColumnHasNullValue(tableAssert, "col_bool_optional");
} }
else { else {
getSink().assertColumnType(tableAssert, "col_bool", ValueType.BOOLEAN, true); getSink().assertColumnType(tableAssert, "col_bool", ValueType.BOOLEAN, true);
getSink().assertColumnType(tableAssert, "col_bool_optional", ValueType.BOOLEAN, true); getSink().assertColumnHasNullValue(tableAssert, "col_bool_optional");
} }
} }