From 740b6b21f698b393e43dff5366691f5dfb6acf37 Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Fri, 10 Dec 2021 13:43:30 +0100 Subject: [PATCH] DBZ-4298 Support non-JDBC logical values in keys for incremental snapshots Incremental snapshot source gets the PK values after they are processed by value converter. This is usally correct solution. The values are passed to chunk queries. There are few datatypes like unconstrained NUMERIC in PostgreSQL that must be converted to a Debezium specific class to keep and enrich the value. Such class is not known to JDBC driver. This commit intorduce a marker interface that enables the code to identify the value not recognized by JDBC and provides a method to obtain the original value. --- .../postgresql/IncrementalSnapshotIT.java | 26 +++++++++++++++++++ .../io/debezium/data/SpecialValueDecimal.java | 7 ++++- .../java/io/debezium/data/ValueWrapper.java | 17 ++++++++++++ ...tIncrementalSnapshotChangeEventSource.java | 6 ++++- 4 files changed, 54 insertions(+), 2 deletions(-) create mode 100644 debezium-core/src/main/java/io/debezium/data/ValueWrapper.java diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/IncrementalSnapshotIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/IncrementalSnapshotIT.java index f173f6b27..ed84f2bc6 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/IncrementalSnapshotIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/IncrementalSnapshotIT.java @@ -18,6 +18,7 @@ import io.debezium.config.Configuration; import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode; +import io.debezium.data.VariableScaleDecimal; import io.debezium.jdbc.JdbcConnection; import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotTest; import io.debezium.relational.RelationalDatabaseConnectorConfig; @@ -31,6 +32,7 @@ public class IncrementalSnapshotIT extends AbstractIncrementalSnapshotTest ((Struct) record.value()).getStruct("after").getInt32(valueFieldName() } } + @Test + public void insertsNumericPk() throws Exception { + // Testing.Print.enable(); + + try (final JdbcConnection connection = databaseConnection()) { + populateTable(connection, "s1.anumeric"); + } + startConnector(); + + sendAdHocSnapshotSignal("s1.anumeric"); + + final int expectedRecordCount = ROW_COUNT; + final Map dbChanges = consumeMixedWithIncrementalSnapshot( + expectedRecordCount, + x -> true, + k -> VariableScaleDecimal.toLogical(k.getStruct("pk")).getWrappedValue().intValue(), + record -> ((Struct) record.value()).getStruct("after").getInt32(valueFieldName()), + "test_server.s1.anumeric", + null); + for (int i = 0; i < expectedRecordCount; i++) { + Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i)); + } + } + protected void populate4PkTable() throws SQLException { try (final JdbcConnection connection = databaseConnection()) { populate4PkTable(connection, "s1.a4"); diff --git a/debezium-core/src/main/java/io/debezium/data/SpecialValueDecimal.java b/debezium-core/src/main/java/io/debezium/data/SpecialValueDecimal.java index eb69b278a..324eef107 100644 --- a/debezium-core/src/main/java/io/debezium/data/SpecialValueDecimal.java +++ b/debezium-core/src/main/java/io/debezium/data/SpecialValueDecimal.java @@ -23,7 +23,7 @@ * @author Jiri Pechanec * */ -public class SpecialValueDecimal implements Serializable { +public class SpecialValueDecimal implements Serializable, ValueWrapper { private static final long serialVersionUID = 1L; @@ -190,4 +190,9 @@ public static Object fromLogical(SpecialValueDecimal value, DecimalMode mode, St + "If you need to support it then set decimal handling mode to 'string'."); } } + + @Override + public BigDecimal getWrappedValue() { + return decimalValue; + } } diff --git a/debezium-core/src/main/java/io/debezium/data/ValueWrapper.java b/debezium-core/src/main/java/io/debezium/data/ValueWrapper.java new file mode 100644 index 000000000..aa6c6710e --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/data/ValueWrapper.java @@ -0,0 +1,17 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.data; + +/** + * Provides the access to the original encapsulated value obtained for example from JDBC. + * + * @author Jiri Pechanec + * + */ +public interface ValueWrapper { + + T getWrappedValue(); +} diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotChangeEventSource.java b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotChangeEventSource.java index e744f762d..efdcacf17 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotChangeEventSource.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotChangeEventSource.java @@ -25,6 +25,7 @@ import io.debezium.DebeziumException; import io.debezium.annotation.NotThreadSafe; +import io.debezium.data.ValueWrapper; import io.debezium.jdbc.JdbcConnection; import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.source.spi.DataChangeEventListener; @@ -546,6 +547,7 @@ private Timer getTableScanLogTimer() { return Threads.timer(clock, RelationalSnapshotChangeEventSource.LOG_INTERVAL); } + @SuppressWarnings("unchecked") private Object[] keyFromRow(Object[] row) { if (row == null) { return null; @@ -553,7 +555,9 @@ private Object[] keyFromRow(Object[] row) { final List keyColumns = getKeyMapper().getKeyKolumns(currentTable); final Object[] key = new Object[keyColumns.size()]; for (int i = 0; i < keyColumns.size(); i++) { - key[i] = row[keyColumns.get(i).position() - 1]; + final Object fieldValue = row[keyColumns.get(i).position() - 1]; + key[i] = fieldValue instanceof ValueWrapper ? ((ValueWrapper) fieldValue).getWrappedValue() + : fieldValue; } return key; }