DBZ-7616 Honor the message.key.columns order while building query for incremental snapshot
This commit is contained in:
parent
9270ff70ad
commit
71256cf1bc
@ -15,6 +15,7 @@
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
import org.junit.After;
|
||||
@ -259,6 +260,37 @@ record -> ((Struct) record.value()).getStruct("after").getInt32(valueFieldName()
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-7617")
|
||||
public void incrementalSnapshotMustRespectMessageKeyColumnsOrder() throws Exception {
|
||||
// Testing.Print.enable();
|
||||
|
||||
try (JdbcConnection connection = databaseConnection()) {
|
||||
connection.setAutoCommit(false);
|
||||
connection.executeWithoutCommitting("INSERT INTO s1.a4 (pk1, pk2, pk3, pk4, aa) VALUES (3, 1, 1, 1, 0)");
|
||||
connection.executeWithoutCommitting("INSERT INTO s1.a4 (pk1, pk2, pk3, pk4, aa) VALUES (2, 2, 2, 2, 1)");
|
||||
connection.executeWithoutCommitting("INSERT INTO s1.a4 (pk1, pk2, pk3, pk4, aa) VALUES (1, 2, 2, 2, 2)");
|
||||
|
||||
connection.commit();
|
||||
}
|
||||
|
||||
startConnector(builder -> mutableConfig(false, true)
|
||||
.with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s1.a4")
|
||||
.with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, String.format("%s:%s", "s1.a4", "pk2,pk1")));
|
||||
|
||||
sendAdHocSnapshotSignal("s1.a4");
|
||||
|
||||
Thread.sleep(5000);
|
||||
|
||||
SourceRecords sourceRecords = consumeAvailableRecordsByTopic();
|
||||
List<Integer> ordered = sourceRecords.recordsForTopic("test_server.s1.a4").stream()
|
||||
.map(sourceRecord -> ((Struct) sourceRecord.value()).getStruct("after").getInt32(valueFieldName()))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
assertThat(ordered).containsExactly(0, 2, 1);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void inserts4PksWithKafkaSignal() throws Exception {
|
||||
// Testing.Print.enable();
|
||||
|
@ -13,6 +13,9 @@
|
||||
import java.util.OptionalLong;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import io.debezium.jdbc.JdbcConnection;
|
||||
import io.debezium.relational.Column;
|
||||
import io.debezium.relational.Key.KeyMapper;
|
||||
@ -28,6 +31,8 @@
|
||||
public abstract class AbstractChunkQueryBuilder<T extends DataCollectionId>
|
||||
implements ChunkQueryBuilder<T> {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractChunkQueryBuilder.class);
|
||||
|
||||
protected final RelationalDatabaseConnectorConfig connectorConfig;
|
||||
protected final JdbcConnection jdbcConnection;
|
||||
protected ColumnNameFilter columnFilter;
|
||||
@ -219,8 +224,10 @@ public String buildMaxPrimaryKeyQuery(IncrementalSnapshotContext<T> context, Tab
|
||||
final String orderBy = getQueryColumns(context, table).stream()
|
||||
.map(c -> jdbcConnection.quotedColumnIdString(c.name()))
|
||||
.collect(Collectors.joining(" DESC, ")) + " DESC";
|
||||
return jdbcConnection.buildSelectWithRowLimits(table.id(), 1, buildProjection(table), Optional.empty(),
|
||||
String selectWithRowLimits = jdbcConnection.buildSelectWithRowLimits(table.id(), 1, buildProjection(table), Optional.empty(),
|
||||
additionalCondition, orderBy);
|
||||
LOGGER.debug("MaxPrimaryKeyQuery {}", selectWithRowLimits);
|
||||
return selectWithRowLimits;
|
||||
}
|
||||
|
||||
private KeyMapper getKeyMapper() {
|
||||
|
@ -118,31 +118,40 @@ public static KeyMapper getInstance(String fullyQualifiedColumnNames, TableIdToS
|
||||
// ex: message.key.columns=inventory.customers:pk1,pk2;(.*).purchaseorders:pk3,pk4
|
||||
// will become => [inventory.customers.pk1,inventory.customers.pk2,(.*).purchaseorders.pk3,(.*).purchaseorders.pk4]
|
||||
// then joining those values
|
||||
String regexes = Arrays.stream(PATTERN_SPLIT.split(fullyQualifiedColumnNames))
|
||||
List<Predicate<ColumnId>> predicates = new ArrayList<>(Arrays.stream(PATTERN_SPLIT.split(fullyQualifiedColumnNames))
|
||||
.map(TABLE_SPLIT::split)
|
||||
.collect(
|
||||
ArrayList<String>::new,
|
||||
(m, p) -> Arrays.asList(COLUMN_SPLIT.split(p[1])).forEach(c -> m.add(p[0] + "." + c)),
|
||||
ArrayList::addAll)
|
||||
ArrayList::addAll))
|
||||
.stream()
|
||||
.collect(Collectors.joining(","));
|
||||
|
||||
Predicate<ColumnId> delegate = Predicates.includes(regexes, ColumnId::toString);
|
||||
.map(regex -> Predicates.includes(regex, ColumnId::toString))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
return (table) -> {
|
||||
List<Column> candidates = table.columns()
|
||||
.stream()
|
||||
.filter(c -> {
|
||||
final TableId tableId = table.id();
|
||||
if (tableIdMapper == null) {
|
||||
return delegate.test(new ColumnId(tableId.catalog(), tableId.schema(), tableId.table(), c.name()));
|
||||
}
|
||||
return delegate.test(new ColumnId(tableId.catalog(), tableId.schema(), tableId.table(), c.name()))
|
||||
|| delegate.test(new ColumnId(new TableId(tableId.catalog(), tableId.schema(), tableId.table(), tableIdMapper), c.name()));
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
List<Column> candidates = new ArrayList<>();
|
||||
for (Predicate<ColumnId> predicate : predicates) {
|
||||
|
||||
candidates.addAll(
|
||||
table.columns()
|
||||
.stream()
|
||||
.filter(c -> matchColumn(tableIdMapper, table, predicate, c))
|
||||
.collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
return candidates.isEmpty() ? table.primaryKeyColumns() : candidates;
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean matchColumn(TableIdToStringMapper tableIdMapper, Table table, Predicate<ColumnId> predicate, Column c) {
|
||||
|
||||
final TableId tableId = table.id();
|
||||
if (tableIdMapper == null) {
|
||||
return predicate.test(new ColumnId(tableId.catalog(), tableId.schema(), tableId.table(), c.name()));
|
||||
}
|
||||
return predicate.test(new ColumnId(tableId.catalog(), tableId.schema(), tableId.table(), c.name()))
|
||||
|| predicate.test(
|
||||
new ColumnId(new TableId(tableId.catalog(), tableId.schema(), tableId.table(), tableIdMapper), c.name()));
|
||||
}
|
||||
}
|
||||
|
@ -401,6 +401,42 @@ public void testBuildQueryOptionalKeysNullsLast() {
|
||||
"ORDER BY \"pk1\", \"pk2\", \"pk3\" LIMIT 1024");
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-7617")
|
||||
public void testBuildQueryThreePkColumnsAndMessageKeyColumnsOrder() {
|
||||
|
||||
RelationalDatabaseConnectorConfig connectorConfig = buildConfig(Configuration.create()
|
||||
.with(RelationalDatabaseConnectorConfig.SIGNAL_DATA_COLLECTION, "debezium.signal")
|
||||
.with(RelationalDatabaseConnectorConfig.TOPIC_PREFIX, "core")
|
||||
.with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, String.format("%s:%s", "s1.table1", "pk2,pk1,pk3"))
|
||||
.build());
|
||||
|
||||
final ChunkQueryBuilder<TableId> chunkQueryBuilder = new DefaultChunkQueryBuilder<>(
|
||||
connectorConfig, new JdbcConnection(config().getJdbcConfig(), config -> null, "\"", "\""));
|
||||
final IncrementalSnapshotContext<TableId> context = new SignalBasedIncrementalSnapshotContext<>();
|
||||
final Column pk1 = Column.editor().name("pk1").optional(false).create();
|
||||
final Column pk2 = Column.editor().name("pk2").optional(false).create();
|
||||
final Column pk3 = Column.editor().name("pk3").optional(false).create();
|
||||
final Column val1 = Column.editor().name("val1").create();
|
||||
final Column val2 = Column.editor().name("val2").create();
|
||||
final Table table = Table.editor().tableId(new TableId(null, "s1", "table1"))
|
||||
.addColumn(pk1)
|
||||
.addColumn(pk2)
|
||||
.addColumn(pk3)
|
||||
.addColumn(val1)
|
||||
.addColumn(val2)
|
||||
.setPrimaryKeyNames("pk1", "pk2", "pk3").create();
|
||||
|
||||
assertThat(chunkQueryBuilder.buildChunkQuery(context, table, Optional.empty())).isEqualTo(
|
||||
"SELECT * FROM \"s1\".\"table1\" ORDER BY \"pk2\", \"pk1\", \"pk3\" LIMIT 1024");
|
||||
|
||||
context.nextChunkPosition(new Object[]{ 1, 5, 3 });
|
||||
context.maximumKey(new Object[]{ 10, 50, 30 });
|
||||
|
||||
assertThat(chunkQueryBuilder.buildChunkQuery(context, table, Optional.empty())).isEqualTo(
|
||||
"SELECT * FROM \"s1\".\"table1\" WHERE ((\"pk2\" > ?) OR (\"pk2\" = ? AND \"pk1\" > ?) OR (\"pk2\" = ? AND \"pk1\" = ? AND \"pk3\" > ?)) AND NOT ((\"pk2\" > ?) OR (\"pk2\" = ? AND \"pk1\" > ?) OR (\"pk2\" = ? AND \"pk1\" = ? AND \"pk3\" > ?)) ORDER BY \"pk2\", \"pk1\", \"pk3\" LIMIT 1024");
|
||||
}
|
||||
|
||||
private Table createTwoPrimaryKeysTable() {
|
||||
final Column pk1 = Column.editor().name("pk1").optional(false).create();
|
||||
final Column pk2 = Column.editor().name("pk2").optional(false).create();
|
||||
|
Loading…
Reference in New Issue
Block a user