DBZ-5089 Use atomic reference

This commit is contained in:
Jiri Pechanec 2022-05-26 11:48:55 +02:00
parent 8693b4f008
commit b85cb275b1
3 changed files with 6 additions and 5 deletions

View File

@ -82,6 +82,7 @@ Claus Guttesen
Clément Loiselet
Cliff Wheadon
Collin Van Dyck
Connor Szczepaniak
Cory Harper
Cyprien Etienne
Cyril Scetbon

View File

@ -12,6 +12,7 @@
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
@ -175,9 +176,7 @@ public R apply(R r, RecordConverter<R> recordConverter) {
final Struct structValue = onlyHeadersInOutputMessage ? null : new Struct(structValueSchema).put(ENVELOPE_PAYLOAD, payload);
var partition = new Object() {
Integer value = null;
};
AtomicReference<Integer> partition = new AtomicReference<>();
additionalFields.forEach((additionalField -> {
switch (additionalField.getPlacement()) {
@ -193,7 +192,7 @@ public R apply(R r, RecordConverter<R> recordConverter) {
eventValueSchema.field(additionalField.getField()).schema());
break;
case PARTITION:
partition.value = eventStruct.getInt32(additionalField.getField());
partition.set(eventStruct.getInt32(additionalField.getField()));
}
}));
@ -219,7 +218,7 @@ else if (onlyHeadersInOutputMessage) {
R newRecord = r.newRecord(
eventStruct.getString(routeByField),
partition.value,
partition.get(),
defineRecordKeySchema(fieldEventKey, eventValueSchema, fallbackPayloadIdField),
recordKey,
updatedSchema,

View File

@ -138,3 +138,4 @@ roeselert,Timo Roeseler
troeselereos,Timo Roeseler
Himanshu-LT,Himanshu Mishra
Chrisss93,Chris Lee
connorszczepaniak-wk,Connor Szczepaniak