DBZ-1772 Trimming values for add.source.fields option

This commit is contained in:
Gunnar Morling 2020-02-14 11:15:02 +01:00 committed by Jiri Pechanec
parent 8c41a2dbdc
commit 1e51f112b2
2 changed files with 19 additions and 9 deletions

View File

@ -70,7 +70,7 @@ public class ExtractNewRecordState<R extends ConnectRecord<R>> implements Transf
private boolean dropTombstones;
private DeleteHandling handleDeletes;
private boolean addOperationHeader;
private String[] addSourceFields;
private List<String> addSourceFields;
private List<FieldReference> additionalHeaders;
private List<FieldReference> additionalFields;
private String routeByField;
@ -96,8 +96,7 @@ public void configure(final Map<String, ?> configs) {
addOperationHeader = config.getBoolean(ExtractNewRecordStateConfigDefinition.OPERATION_HEADER);
addSourceFields = config.getString(ExtractNewRecordStateConfigDefinition.ADD_SOURCE_FIELDS).isEmpty() ? null
: config.getString(ExtractNewRecordStateConfigDefinition.ADD_SOURCE_FIELDS).split(",");
addSourceFields = determineAdditionalSourceField(config.getString(ExtractNewRecordStateConfigDefinition.ADD_SOURCE_FIELDS));
additionalFields = FieldReference.fromConfiguration(config.getString(ExtractNewRecordStateConfigDefinition.ADD_FIELDS));
additionalHeaders = FieldReference.fromConfiguration(config.getString(ExtractNewRecordStateConfigDefinition.ADD_HEADERS));
@ -126,6 +125,17 @@ public void configure(final Map<String, ?> configs) {
schemaUpdateCache = new BoundedConcurrentHashMap<>(SCHEMA_CACHE_SIZE);
}
private static List<String> determineAdditionalSourceField(String addSourceFieldsConfig) {
if (Strings.isNullOrEmpty(addSourceFieldsConfig)) {
return Collections.emptyList();
}
else {
return Arrays.stream(addSourceFieldsConfig.split(","))
.map(String::trim)
.collect(Collectors.toList());
}
}
@Override
public R apply(final R record) {
Envelope.Operation operation;
@ -289,9 +299,9 @@ private Struct updateValue(FieldReference fieldReference, Struct updatedValue, S
return updatedValue.put(fieldReference.getNewFieldName(), fieldReference.getValue(struct));
}
private R addSourceFields(String[] addSourceFields, R originalRecord, R unwrappedRecord) {
private R addSourceFields(List<String> addSourceFields, R originalRecord, R unwrappedRecord) {
// Return if no source fields to add
if (addSourceFields == null) {
if (addSourceFields.isEmpty()) {
return unwrappedRecord;
}
@ -322,7 +332,7 @@ private R addSourceFields(String[] addSourceFields, R originalRecord, R unwrappe
unwrappedRecord.timestamp());
}
private Schema makeUpdatedSchema(Schema schema, Schema sourceSchema, String[] addSourceFields) {
private Schema makeUpdatedSchema(Schema schema, Schema sourceSchema, List<String> addSourceFields) {
final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct());
// Get fields from original schema
for (org.apache.kafka.connect.data.Field field : schema.fields()) {

View File

@ -347,7 +347,7 @@ public void testAddField() {
public void testAddFields() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
props.put(ADD_FIELDS, "op,lsn,id");
props.put(ADD_FIELDS, "op , lsn,id");
transform.configure(props);
final SourceRecord updateRecord = createUpdateRecord();
@ -412,7 +412,7 @@ public void testAddHeader() {
public void testAddHeaders() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
props.put(ADD_HEADERS, "op,lsn,id");
props.put(ADD_HEADERS, "op , lsn,id");
transform.configure(props);
final SourceRecord updateRecord = createUpdateRecord();
@ -513,7 +513,7 @@ public void testDeleteTopicRoutingField() {
public void testAddSourceFields() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
props.put(ADD_SOURCE_FIELDS, "lsn,version");
props.put(ADD_SOURCE_FIELDS, "lsn , version");
transform.configure(props);
final SourceRecord createRecord = createComplexCreateRecord();