From 532c17f8aa62fb79bdeb6d79e72060a33ec531c0 Mon Sep 17 00:00:00 2001 From: Gunnar Morling Date: Mon, 14 Jan 2019 15:46:43 +0100 Subject: [PATCH] DBZ-1064 Using dedicated functional interface instead of Consumer; * Commenting * Moving type from "util" to "jdbc" --- .../postgresql/PostgresValueConverter.java | 1 + .../io/debezium/jdbc/JdbcValueConverters.java | 37 +++++++++++++++---- .../{util => jdbc}/ResultReceiver.java | 25 +++++++------ .../jdbc/ValueConversionCallback.java | 19 ++++++++++ 4 files changed, 62 insertions(+), 20 deletions(-) rename debezium-core/src/main/java/io/debezium/{util => jdbc}/ResultReceiver.java (73%) create mode 100644 debezium-core/src/main/java/io/debezium/jdbc/ValueConversionCallback.java diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java index 46e70b4aa..1bb3e041c 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java @@ -730,6 +730,7 @@ protected int getTimePrecision(Column column) { * @return the converted value, or null if the conversion could not be made and the column allows nulls * @throws IllegalArgumentException if the value could not be converted but the column does not allow nulls */ + @Override protected Object convertBinary(Column column, Field fieldDefn, Object data) { return super.convertBinary(column, fieldDefn, (data instanceof PGobject) ? ((PGobject)data).getValue() : data); diff --git a/debezium-core/src/main/java/io/debezium/jdbc/JdbcValueConverters.java b/debezium-core/src/main/java/io/debezium/jdbc/JdbcValueConverters.java index 195c57f2e..fb7ffcf77 100644 --- a/debezium-core/src/main/java/io/debezium/jdbc/JdbcValueConverters.java +++ b/debezium-core/src/main/java/io/debezium/jdbc/JdbcValueConverters.java @@ -5,8 +5,8 @@ */ package io.debezium.jdbc; -import static io.debezium.util.NumberConversions.BYTE_ZERO; import static io.debezium.util.NumberConversions.BYTE_BUFFER_ZERO; +import static io.debezium.util.NumberConversions.BYTE_ZERO; import static io.debezium.util.NumberConversions.SHORT_FALSE; import java.math.BigDecimal; @@ -23,7 +23,6 @@ import java.time.temporal.TemporalAdjuster; import java.util.BitSet; import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; import java.util.function.Supplier; import org.apache.kafka.connect.data.Field; @@ -48,7 +47,6 @@ import io.debezium.time.ZonedTime; import io.debezium.time.ZonedTimestamp; import io.debezium.util.NumberConversions; -import io.debezium.util.ResultReceiver; /** * A provider of {@link ValueConverter}s and {@link SchemaBuilder}s for various column types. This implementation is aware @@ -747,7 +745,7 @@ else if (data instanceof String) { */ protected Object convertInteger(Column column, Field fieldDefn, Object data) { return convertValue(column, fieldDefn, data, 0, (r) -> { - if (data instanceof Integer) { + if (data instanceof Integer) { r.deliver(data); } else if (data instanceof Number) { @@ -892,7 +890,7 @@ protected Object convertDecimal(Column column, Field fieldDefn, Object data) { protected Object toBigDecimal(Column column, Field fieldDefn, Object data) { return convertValue(column, fieldDefn, data, BigDecimal.ZERO, (r) -> { if (data instanceof BigDecimal) { - r.deliver((BigDecimal)data); + r.deliver(data); } else if (data instanceof Boolean) { r.deliver(NumberConversions.getBigDecimal((Boolean)data)); @@ -1133,7 +1131,29 @@ protected int getTimePrecision(Column column) { return column.length(); } - protected Object convertValue(Column column, Field fieldDefn, Object data, Object fallback, Consumer> convert) { + /** + * Converts the given value for the given column/field. + * + * @param column + * describing the {@code data} value; never null + * @param fieldDefn + * the field definition; never null + * @param data + * the data object to be converted into a {@link Date Kafka Connect date} type + * @param fallback + * value that will be applied in case the column is defined as NOT NULL without a default value, but we + * still received no value; may happen e.g. when enabling MySQL's non-strict mode + * @param callback + * conversion routine that will be invoked in case the value is not null + * + * @return The converted value. Will be {@code null} if the inbound value was {@code null} and the column is + * optional. Will be the column's default value (converted to the corresponding KC type, if the inbound + * value was {@code null}, the column is non-optional and has a default value. Will be {@code fallback} if + * the inbound value was {@code null}, the column is non-optional and has no default value. Otherwise, it + * will be the value produced by {@code callback} and lastly the result returned by + * {@link #handleUnknownData(Column, Field, Object)}. + */ + protected Object convertValue(Column column, Field fieldDefn, Object data, Object fallback, ValueConversionCallback callback) { if (data == null) { if (column.isOptional()) { return null; @@ -1144,8 +1164,9 @@ protected Object convertValue(Column column, Field fieldDefn, Object data, Objec } return schemaDefault != null ? schemaDefault : fallback; } - final ResultReceiver r = ResultReceiver.create(); - convert.accept(r); + + final ResultReceiver r = ResultReceiver.create(); + callback.convert(r); return r.hasReceived() ? r.get() : handleUnknownData(column, fieldDefn, data); } } diff --git a/debezium-core/src/main/java/io/debezium/util/ResultReceiver.java b/debezium-core/src/main/java/io/debezium/jdbc/ResultReceiver.java similarity index 73% rename from debezium-core/src/main/java/io/debezium/util/ResultReceiver.java rename to debezium-core/src/main/java/io/debezium/jdbc/ResultReceiver.java index 58542b5ef..f75772070 100644 --- a/debezium-core/src/main/java/io/debezium/util/ResultReceiver.java +++ b/debezium-core/src/main/java/io/debezium/jdbc/ResultReceiver.java @@ -3,22 +3,20 @@ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 */ -package io.debezium.util; +package io.debezium.jdbc; /** * This interface allows the code to optionally pass a value between two parts of the application. - * - * @author Jiri Pechanec * - * @param type of the value to pass + * @author Jiri Pechanec */ -public interface ResultReceiver { +public interface ResultReceiver { /** * Send the object to the receiver. * @param o - object to be delivered */ - public void deliver(T o); + public void deliver(Object o); /** * @return true if a value has been sent to the receiver @@ -28,26 +26,29 @@ public interface ResultReceiver { /** * @return the object sent to the receiver */ - public T get(); + public Object get(); /** * @return default, not thread-safe implementation of the receiver */ - public static ResultReceiver create() { - return new ResultReceiver() { + public static ResultReceiver create() { + return new ResultReceiver() { private boolean received = false; - private T object = null; + private Object object = null; - public void deliver(T o) { + @Override + public void deliver(Object o) { received = true; object = o; } + @Override public boolean hasReceived() { return received; } - public T get() { + @Override + public Object get() { return object; } diff --git a/debezium-core/src/main/java/io/debezium/jdbc/ValueConversionCallback.java b/debezium-core/src/main/java/io/debezium/jdbc/ValueConversionCallback.java new file mode 100644 index 000000000..50960890b --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/jdbc/ValueConversionCallback.java @@ -0,0 +1,19 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.jdbc; + +/** + * Invoked to convert incoming SQL column values into Kafka Connect values. The callback approach is used in order to + * tell apart the case where a conversion returned {@code null} from it returning no value at all (because the incoming + * value type is unsupported): + * + * @author Gunnar Morling + */ +@FunctionalInterface +public interface ValueConversionCallback { + + void convert(ResultReceiver receiver); +}