From 71256cf1bc3fceda0cd0eb53f3f691add25519cb Mon Sep 17 00:00:00 2001 From: mfvitale Date: Tue, 12 Mar 2024 15:57:49 +0100 Subject: [PATCH] DBZ-7616 Honor the message.key.columns order while building query for incremental snapshot --- .../postgresql/IncrementalSnapshotIT.java | 32 +++++++++++++++ .../AbstractChunkQueryBuilder.java | 9 +++- .../main/java/io/debezium/relational/Key.java | 41 +++++++++++-------- .../DefaultChunkQueryBuilderTest.java | 36 ++++++++++++++++ 4 files changed, 101 insertions(+), 17 deletions(-) diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/IncrementalSnapshotIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/IncrementalSnapshotIT.java index c90deedcb..7ebdb9553 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/IncrementalSnapshotIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/IncrementalSnapshotIT.java @@ -15,6 +15,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import org.apache.kafka.connect.data.Struct; import org.junit.After; @@ -259,6 +260,37 @@ record -> ((Struct) record.value()).getStruct("after").getInt32(valueFieldName() } } + @Test + @FixFor("DBZ-7617") + public void incrementalSnapshotMustRespectMessageKeyColumnsOrder() throws Exception { + // Testing.Print.enable(); + + try (JdbcConnection connection = databaseConnection()) { + connection.setAutoCommit(false); + connection.executeWithoutCommitting("INSERT INTO s1.a4 (pk1, pk2, pk3, pk4, aa) VALUES (3, 1, 1, 1, 0)"); + connection.executeWithoutCommitting("INSERT INTO s1.a4 (pk1, pk2, pk3, pk4, aa) VALUES (2, 2, 2, 2, 1)"); + connection.executeWithoutCommitting("INSERT INTO s1.a4 (pk1, pk2, pk3, pk4, aa) VALUES (1, 2, 2, 2, 2)"); + + connection.commit(); + } + + startConnector(builder -> mutableConfig(false, true) + .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s1.a4") + .with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, String.format("%s:%s", "s1.a4", "pk2,pk1"))); + + sendAdHocSnapshotSignal("s1.a4"); + + Thread.sleep(5000); + + SourceRecords sourceRecords = consumeAvailableRecordsByTopic(); + List ordered = sourceRecords.recordsForTopic("test_server.s1.a4").stream() + .map(sourceRecord -> ((Struct) sourceRecord.value()).getStruct("after").getInt32(valueFieldName())) + .collect(Collectors.toList()); + + assertThat(ordered).containsExactly(0, 2, 1); + + } + @Test public void inserts4PksWithKafkaSignal() throws Exception { // Testing.Print.enable(); diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/AbstractChunkQueryBuilder.java b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/AbstractChunkQueryBuilder.java index c2c38784f..8d8f992f7 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/AbstractChunkQueryBuilder.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/AbstractChunkQueryBuilder.java @@ -13,6 +13,9 @@ import java.util.OptionalLong; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.Column; import io.debezium.relational.Key.KeyMapper; @@ -28,6 +31,8 @@ public abstract class AbstractChunkQueryBuilder implements ChunkQueryBuilder { + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractChunkQueryBuilder.class); + protected final RelationalDatabaseConnectorConfig connectorConfig; protected final JdbcConnection jdbcConnection; protected ColumnNameFilter columnFilter; @@ -219,8 +224,10 @@ public String buildMaxPrimaryKeyQuery(IncrementalSnapshotContext context, Tab final String orderBy = getQueryColumns(context, table).stream() .map(c -> jdbcConnection.quotedColumnIdString(c.name())) .collect(Collectors.joining(" DESC, ")) + " DESC"; - return jdbcConnection.buildSelectWithRowLimits(table.id(), 1, buildProjection(table), Optional.empty(), + String selectWithRowLimits = jdbcConnection.buildSelectWithRowLimits(table.id(), 1, buildProjection(table), Optional.empty(), additionalCondition, orderBy); + LOGGER.debug("MaxPrimaryKeyQuery {}", selectWithRowLimits); + return selectWithRowLimits; } private KeyMapper getKeyMapper() { diff --git a/debezium-core/src/main/java/io/debezium/relational/Key.java b/debezium-core/src/main/java/io/debezium/relational/Key.java index 9c8f5ed74..11b811780 100644 --- a/debezium-core/src/main/java/io/debezium/relational/Key.java +++ b/debezium-core/src/main/java/io/debezium/relational/Key.java @@ -118,31 +118,40 @@ public static KeyMapper getInstance(String fullyQualifiedColumnNames, TableIdToS // ex: message.key.columns=inventory.customers:pk1,pk2;(.*).purchaseorders:pk3,pk4 // will become => [inventory.customers.pk1,inventory.customers.pk2,(.*).purchaseorders.pk3,(.*).purchaseorders.pk4] // then joining those values - String regexes = Arrays.stream(PATTERN_SPLIT.split(fullyQualifiedColumnNames)) + List> predicates = new ArrayList<>(Arrays.stream(PATTERN_SPLIT.split(fullyQualifiedColumnNames)) .map(TABLE_SPLIT::split) .collect( ArrayList::new, (m, p) -> Arrays.asList(COLUMN_SPLIT.split(p[1])).forEach(c -> m.add(p[0] + "." + c)), - ArrayList::addAll) + ArrayList::addAll)) .stream() - .collect(Collectors.joining(",")); - - Predicate delegate = Predicates.includes(regexes, ColumnId::toString); + .map(regex -> Predicates.includes(regex, ColumnId::toString)) + .collect(Collectors.toList()); return (table) -> { - List candidates = table.columns() - .stream() - .filter(c -> { - final TableId tableId = table.id(); - if (tableIdMapper == null) { - return delegate.test(new ColumnId(tableId.catalog(), tableId.schema(), tableId.table(), c.name())); - } - return delegate.test(new ColumnId(tableId.catalog(), tableId.schema(), tableId.table(), c.name())) - || delegate.test(new ColumnId(new TableId(tableId.catalog(), tableId.schema(), tableId.table(), tableIdMapper), c.name())); - }) - .collect(Collectors.toList()); + List candidates = new ArrayList<>(); + for (Predicate predicate : predicates) { + + candidates.addAll( + table.columns() + .stream() + .filter(c -> matchColumn(tableIdMapper, table, predicate, c)) + .collect(Collectors.toList())); + } + return candidates.isEmpty() ? table.primaryKeyColumns() : candidates; }; } } + + private static boolean matchColumn(TableIdToStringMapper tableIdMapper, Table table, Predicate predicate, Column c) { + + final TableId tableId = table.id(); + if (tableIdMapper == null) { + return predicate.test(new ColumnId(tableId.catalog(), tableId.schema(), tableId.table(), c.name())); + } + return predicate.test(new ColumnId(tableId.catalog(), tableId.schema(), tableId.table(), c.name())) + || predicate.test( + new ColumnId(new TableId(tableId.catalog(), tableId.schema(), tableId.table(), tableIdMapper), c.name())); + } } diff --git a/debezium-core/src/test/java/io/debezium/pipeline/source/snapshot/incremental/DefaultChunkQueryBuilderTest.java b/debezium-core/src/test/java/io/debezium/pipeline/source/snapshot/incremental/DefaultChunkQueryBuilderTest.java index bad27a256..13d0847ab 100644 --- a/debezium-core/src/test/java/io/debezium/pipeline/source/snapshot/incremental/DefaultChunkQueryBuilderTest.java +++ b/debezium-core/src/test/java/io/debezium/pipeline/source/snapshot/incremental/DefaultChunkQueryBuilderTest.java @@ -401,6 +401,42 @@ public void testBuildQueryOptionalKeysNullsLast() { "ORDER BY \"pk1\", \"pk2\", \"pk3\" LIMIT 1024"); } + @Test + @FixFor("DBZ-7617") + public void testBuildQueryThreePkColumnsAndMessageKeyColumnsOrder() { + + RelationalDatabaseConnectorConfig connectorConfig = buildConfig(Configuration.create() + .with(RelationalDatabaseConnectorConfig.SIGNAL_DATA_COLLECTION, "debezium.signal") + .with(RelationalDatabaseConnectorConfig.TOPIC_PREFIX, "core") + .with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, String.format("%s:%s", "s1.table1", "pk2,pk1,pk3")) + .build()); + + final ChunkQueryBuilder chunkQueryBuilder = new DefaultChunkQueryBuilder<>( + connectorConfig, new JdbcConnection(config().getJdbcConfig(), config -> null, "\"", "\"")); + final IncrementalSnapshotContext context = new SignalBasedIncrementalSnapshotContext<>(); + final Column pk1 = Column.editor().name("pk1").optional(false).create(); + final Column pk2 = Column.editor().name("pk2").optional(false).create(); + final Column pk3 = Column.editor().name("pk3").optional(false).create(); + final Column val1 = Column.editor().name("val1").create(); + final Column val2 = Column.editor().name("val2").create(); + final Table table = Table.editor().tableId(new TableId(null, "s1", "table1")) + .addColumn(pk1) + .addColumn(pk2) + .addColumn(pk3) + .addColumn(val1) + .addColumn(val2) + .setPrimaryKeyNames("pk1", "pk2", "pk3").create(); + + assertThat(chunkQueryBuilder.buildChunkQuery(context, table, Optional.empty())).isEqualTo( + "SELECT * FROM \"s1\".\"table1\" ORDER BY \"pk2\", \"pk1\", \"pk3\" LIMIT 1024"); + + context.nextChunkPosition(new Object[]{ 1, 5, 3 }); + context.maximumKey(new Object[]{ 10, 50, 30 }); + + assertThat(chunkQueryBuilder.buildChunkQuery(context, table, Optional.empty())).isEqualTo( + "SELECT * FROM \"s1\".\"table1\" WHERE ((\"pk2\" > ?) OR (\"pk2\" = ? AND \"pk1\" > ?) OR (\"pk2\" = ? AND \"pk1\" = ? AND \"pk3\" > ?)) AND NOT ((\"pk2\" > ?) OR (\"pk2\" = ? AND \"pk1\" > ?) OR (\"pk2\" = ? AND \"pk1\" = ? AND \"pk3\" > ?)) ORDER BY \"pk2\", \"pk1\", \"pk3\" LIMIT 1024"); + } + private Table createTwoPrimaryKeysTable() { final Column pk1 = Column.editor().name("pk1").optional(false).create(); final Column pk2 = Column.editor().name("pk2").optional(false).create();