DBZ-1064 Using dedicated functional interface instead of Consumer;

* Commenting
* Moving type from "util" to "jdbc"
This commit is contained in:
Gunnar Morling 2019-01-14 15:46:43 +01:00
parent bff7eea734
commit 532c17f8aa
4 changed files with 62 additions and 20 deletions

View File

@ -730,6 +730,7 @@ protected int getTimePrecision(Column column) {
* @return the converted value, or null if the conversion could not be made and the column allows nulls * @return the converted value, or null if the conversion could not be made and the column allows nulls
* @throws IllegalArgumentException if the value could not be converted but the column does not allow nulls * @throws IllegalArgumentException if the value could not be converted but the column does not allow nulls
*/ */
@Override
protected Object convertBinary(Column column, Field fieldDefn, Object data) { protected Object convertBinary(Column column, Field fieldDefn, Object data) {
return super.convertBinary(column, fieldDefn, return super.convertBinary(column, fieldDefn,
(data instanceof PGobject) ? ((PGobject)data).getValue() : data); (data instanceof PGobject) ? ((PGobject)data).getValue() : data);

View File

@ -5,8 +5,8 @@
*/ */
package io.debezium.jdbc; package io.debezium.jdbc;
import static io.debezium.util.NumberConversions.BYTE_ZERO;
import static io.debezium.util.NumberConversions.BYTE_BUFFER_ZERO; import static io.debezium.util.NumberConversions.BYTE_BUFFER_ZERO;
import static io.debezium.util.NumberConversions.BYTE_ZERO;
import static io.debezium.util.NumberConversions.SHORT_FALSE; import static io.debezium.util.NumberConversions.SHORT_FALSE;
import java.math.BigDecimal; import java.math.BigDecimal;
@ -23,7 +23,6 @@
import java.time.temporal.TemporalAdjuster; import java.time.temporal.TemporalAdjuster;
import java.util.BitSet; import java.util.BitSet;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier; import java.util.function.Supplier;
import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Field;
@ -48,7 +47,6 @@
import io.debezium.time.ZonedTime; import io.debezium.time.ZonedTime;
import io.debezium.time.ZonedTimestamp; import io.debezium.time.ZonedTimestamp;
import io.debezium.util.NumberConversions; import io.debezium.util.NumberConversions;
import io.debezium.util.ResultReceiver;
/** /**
* A provider of {@link ValueConverter}s and {@link SchemaBuilder}s for various column types. This implementation is aware * A provider of {@link ValueConverter}s and {@link SchemaBuilder}s for various column types. This implementation is aware
@ -892,7 +890,7 @@ protected Object convertDecimal(Column column, Field fieldDefn, Object data) {
protected Object toBigDecimal(Column column, Field fieldDefn, Object data) { protected Object toBigDecimal(Column column, Field fieldDefn, Object data) {
return convertValue(column, fieldDefn, data, BigDecimal.ZERO, (r) -> { return convertValue(column, fieldDefn, data, BigDecimal.ZERO, (r) -> {
if (data instanceof BigDecimal) { if (data instanceof BigDecimal) {
r.deliver((BigDecimal)data); r.deliver(data);
} }
else if (data instanceof Boolean) { else if (data instanceof Boolean) {
r.deliver(NumberConversions.getBigDecimal((Boolean)data)); r.deliver(NumberConversions.getBigDecimal((Boolean)data));
@ -1133,7 +1131,29 @@ protected int getTimePrecision(Column column) {
return column.length(); return column.length();
} }
protected Object convertValue(Column column, Field fieldDefn, Object data, Object fallback, Consumer<ResultReceiver<Object>> convert) { /**
* Converts the given value for the given column/field.
*
* @param column
* describing the {@code data} value; never null
* @param fieldDefn
* the field definition; never null
* @param data
* the data object to be converted into a {@link Date Kafka Connect date} type
* @param fallback
* value that will be applied in case the column is defined as NOT NULL without a default value, but we
* still received no value; may happen e.g. when enabling MySQL's non-strict mode
* @param callback
* conversion routine that will be invoked in case the value is not null
*
* @return The converted value. Will be {@code null} if the inbound value was {@code null} and the column is
* optional. Will be the column's default value (converted to the corresponding KC type, if the inbound
* value was {@code null}, the column is non-optional and has a default value. Will be {@code fallback} if
* the inbound value was {@code null}, the column is non-optional and has no default value. Otherwise, it
* will be the value produced by {@code callback} and lastly the result returned by
* {@link #handleUnknownData(Column, Field, Object)}.
*/
protected Object convertValue(Column column, Field fieldDefn, Object data, Object fallback, ValueConversionCallback callback) {
if (data == null) { if (data == null) {
if (column.isOptional()) { if (column.isOptional()) {
return null; return null;
@ -1144,8 +1164,9 @@ protected Object convertValue(Column column, Field fieldDefn, Object data, Objec
} }
return schemaDefault != null ? schemaDefault : fallback; return schemaDefault != null ? schemaDefault : fallback;
} }
final ResultReceiver<Object> r = ResultReceiver.create();
convert.accept(r); final ResultReceiver r = ResultReceiver.create();
callback.convert(r);
return r.hasReceived() ? r.get() : handleUnknownData(column, fieldDefn, data); return r.hasReceived() ? r.get() : handleUnknownData(column, fieldDefn, data);
} }
} }

View File

@ -3,22 +3,20 @@
* *
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/ */
package io.debezium.util; package io.debezium.jdbc;
/** /**
* This interface allows the code to optionally pass a value between two parts of the application. * This interface allows the code to optionally pass a value between two parts of the application.
* *
* @author Jiri Pechanec * @author Jiri Pechanec
*
* @param <T> type of the value to pass
*/ */
public interface ResultReceiver<T> { public interface ResultReceiver {
/** /**
* Send the object to the receiver. * Send the object to the receiver.
* @param o - object to be delivered * @param o - object to be delivered
*/ */
public void deliver(T o); public void deliver(Object o);
/** /**
* @return true if a value has been sent to the receiver * @return true if a value has been sent to the receiver
@ -28,26 +26,29 @@ public interface ResultReceiver<T> {
/** /**
* @return the object sent to the receiver * @return the object sent to the receiver
*/ */
public T get(); public Object get();
/** /**
* @return default, not thread-safe implementation of the receiver * @return default, not thread-safe implementation of the receiver
*/ */
public static <T> ResultReceiver<T> create() { public static ResultReceiver create() {
return new ResultReceiver<T>() { return new ResultReceiver() {
private boolean received = false; private boolean received = false;
private T object = null; private Object object = null;
public void deliver(T o) { @Override
public void deliver(Object o) {
received = true; received = true;
object = o; object = o;
} }
@Override
public boolean hasReceived() { public boolean hasReceived() {
return received; return received;
} }
public T get() { @Override
public Object get() {
return object; return object;
} }

View File

@ -0,0 +1,19 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.jdbc;
/**
* Invoked to convert incoming SQL column values into Kafka Connect values. The callback approach is used in order to
* tell apart the case where a conversion returned {@code null} from it returning no value at all (because the incoming
* value type is unsupported):
*
* @author Gunnar Morling
*/
@FunctionalInterface
public interface ValueConversionCallback {
void convert(ResultReceiver receiver);
}