DBZ-4298 Support non-JDBC logical values in keys for incremental snapshots

Incremental snapshot source gets the PK values after they are processed
by value converter. This is usally correct solution. The values are
passed to chunk queries. There are few datatypes like unconstrained
NUMERIC in PostgreSQL that must be converted to a Debezium specific
class to keep and enrich the value. Such class is not known to JDBC
driver. This commit intorduce a marker interface that enables the code
to identify the value not recognized by JDBC and provides a method to
obtain the original value.
This commit is contained in:
Jiri Pechanec 2021-12-10 13:43:30 +01:00 committed by Gunnar Morling
parent 882bb9c4d4
commit 740b6b21f6
4 changed files with 54 additions and 2 deletions

View File

@ -18,6 +18,7 @@
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotTest;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
@ -31,6 +32,7 @@ public class IncrementalSnapshotIT extends AbstractIncrementalSnapshotTest<Postg
+ "CREATE SCHEMA s2; " + "CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));"
+ "CREATE TABLE s1.a4 (pk1 integer, pk2 integer, pk3 integer, pk4 integer, aa integer, PRIMARY KEY(pk1, pk2, pk3, pk4));"
+ "CREATE TABLE s1.a42 (pk1 integer, pk2 integer, pk3 integer, pk4 integer, aa integer);"
+ "CREATE TABLE s1.anumeric (pk numeric, aa integer, PRIMARY KEY(pk));"
+ "CREATE TABLE s1.debezium_signal (id varchar(64), type varchar(32), data varchar(2048));";
@Before
@ -157,6 +159,30 @@ record -> ((Struct) record.value()).getStruct("after").getInt32(valueFieldName()
}
}
@Test
public void insertsNumericPk() throws Exception {
// Testing.Print.enable();
try (final JdbcConnection connection = databaseConnection()) {
populateTable(connection, "s1.anumeric");
}
startConnector();
sendAdHocSnapshotSignal("s1.anumeric");
final int expectedRecordCount = ROW_COUNT;
final Map<Integer, Integer> dbChanges = consumeMixedWithIncrementalSnapshot(
expectedRecordCount,
x -> true,
k -> VariableScaleDecimal.toLogical(k.getStruct("pk")).getWrappedValue().intValue(),
record -> ((Struct) record.value()).getStruct("after").getInt32(valueFieldName()),
"test_server.s1.anumeric",
null);
for (int i = 0; i < expectedRecordCount; i++) {
Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i));
}
}
protected void populate4PkTable() throws SQLException {
try (final JdbcConnection connection = databaseConnection()) {
populate4PkTable(connection, "s1.a4");

View File

@ -23,7 +23,7 @@
* @author Jiri Pechanec
*
*/
public class SpecialValueDecimal implements Serializable {
public class SpecialValueDecimal implements Serializable, ValueWrapper<BigDecimal> {
private static final long serialVersionUID = 1L;
@ -190,4 +190,9 @@ public static Object fromLogical(SpecialValueDecimal value, DecimalMode mode, St
+ "If you need to support it then set decimal handling mode to 'string'.");
}
}
@Override
public BigDecimal getWrappedValue() {
return decimalValue;
}
}

View File

@ -0,0 +1,17 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.data;
/**
* Provides the access to the original encapsulated value obtained for example from JDBC.
*
* @author Jiri Pechanec
*
*/
public interface ValueWrapper<T> {
T getWrappedValue();
}

View File

@ -25,6 +25,7 @@
import io.debezium.DebeziumException;
import io.debezium.annotation.NotThreadSafe;
import io.debezium.data.ValueWrapper;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
@ -546,6 +547,7 @@ private Timer getTableScanLogTimer() {
return Threads.timer(clock, RelationalSnapshotChangeEventSource.LOG_INTERVAL);
}
@SuppressWarnings("unchecked")
private Object[] keyFromRow(Object[] row) {
if (row == null) {
return null;
@ -553,7 +555,9 @@ private Object[] keyFromRow(Object[] row) {
final List<Column> keyColumns = getKeyMapper().getKeyKolumns(currentTable);
final Object[] key = new Object[keyColumns.size()];
for (int i = 0; i < keyColumns.size(); i++) {
key[i] = row[keyColumns.get(i).position() - 1];
final Object fieldValue = row[keyColumns.get(i).position() - 1];
key[i] = fieldValue instanceof ValueWrapper<?> ? ((ValueWrapper<Object>) fieldValue).getWrappedValue()
: fieldValue;
}
return key;
}