DBZ-3860 Provide correct boundaries for multi PK tables

This commit is contained in:
Jiri Pechanec 2021-10-12 13:54:43 +02:00 committed by Gunnar Morling
parent 767b271cb5
commit 0ccc83db21
4 changed files with 167 additions and 17 deletions

View File

@ -7,14 +7,19 @@
package io.debezium.connector.postgresql;
import java.sql.SQLException;
import java.util.Map;
import org.fest.assertions.Assertions;
import org.fest.assertions.MapAssert;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotTest;
import io.debezium.util.Testing;
public class IncrementalSnapshotIT extends AbstractIncrementalSnapshotTest<PostgresConnector> {
@ -22,6 +27,7 @@ public class IncrementalSnapshotIT extends AbstractIncrementalSnapshotTest<Postg
private static final String SETUP_TABLES_STMT = "DROP SCHEMA IF EXISTS s1 CASCADE;" + "CREATE SCHEMA s1; "
+ "CREATE SCHEMA s2; " + "CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));"
+ "CREATE TABLE s1.a4 (pk1 integer, pk2 integer, pk3 integer, pk4 integer, aa integer, PRIMARY KEY(pk1, pk2, pk3, pk4));"
+ "CREATE TABLE s1.debezium_signal (id varchar(64), type varchar(32), data varchar(2048));";
@Before
@ -79,4 +85,70 @@ protected void waitForConnectorToStart() {
super.waitForConnectorToStart();
TestHelper.waitForDefaultReplicationSlotBeActive();
}
@Test
public void inserts4Pks() throws Exception {
Testing.Print.enable();
populate4PkTable();
startConnector();
sendAdHocSnapshotSignal("s1.a4");
Thread.sleep(5000);
try (JdbcConnection connection = databaseConnection()) {
connection.setAutoCommit(false);
for (int i = 0; i < ROW_COUNT; i++) {
final int id = i + ROW_COUNT + 1;
final int pk1 = id / 1000;
final int pk2 = (id / 100) % 10;
final int pk3 = (id / 10) % 10;
final int pk4 = id % 10;
connection.executeWithoutCommitting(String.format("INSERT INTO %s (pk1, pk2, pk3, pk4, aa) VALUES (%s, %s, %s, %s, %s)",
"s1.a4",
pk1,
pk2,
pk3,
pk4,
i + ROW_COUNT));
}
connection.commit();
}
final int expectedRecordCount = ROW_COUNT * 2;
final Map<Integer, Integer> dbChanges = consumeMixedWithIncrementalSnapshot(
expectedRecordCount,
x -> true,
k -> k.getInt32("pk1") * 1_000 + k.getInt32("pk2") * 100 + k.getInt32("pk3") * 10 + k.getInt32("pk4"),
"test_server.s1.a4",
null);
for (int i = 0; i < expectedRecordCount; i++) {
Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i));
}
}
protected void populate4PkTable(JdbcConnection connection) throws SQLException {
connection.setAutoCommit(false);
for (int i = 0; i < ROW_COUNT; i++) {
final int id = i + 1;
final int pk1 = id / 1000;
final int pk2 = (id / 100) % 10;
final int pk3 = (id / 10) % 10;
final int pk4 = id % 10;
connection.executeWithoutCommitting(String.format("INSERT INTO %s (pk1, pk2, pk3, pk4, aa) VALUES (%s, %s, %s, %s, %s)",
"s1.a4",
pk1,
pk2,
pk3,
pk4,
i));
}
connection.commit();
}
protected void populate4PkTable() throws SQLException {
try (final JdbcConnection connection = databaseConnection()) {
populate4PkTable(connection);
}
}
}

View File

@ -148,13 +148,10 @@ protected String buildChunkQuery(Table table) {
if (context.isNonInitialChunk()) {
final StringBuilder sql = new StringBuilder();
// Window boundaries
addKeyColumnsToCondition(table, sql, " >= ?");
sql.append(" AND NOT (");
addKeyColumnsToCondition(table, sql, " = ?");
sql.append(")");
addLowerBound(table, sql);
// Table boundaries
sql.append(" AND ");
addKeyColumnsToCondition(table, sql, " <= ?");
sql.append(" AND NOT ");
addLowerBound(table, sql);
condition = sql.toString();
}
final String orderBy = table.primaryKeyColumns().stream()
@ -167,6 +164,41 @@ protected String buildChunkQuery(Table table) {
orderBy);
}
private void addLowerBound(Table table, StringBuilder sql) {
// To make window boundaries working for more than one column it is necessary to calculate
// with independently increasing values in each column independently.
// For one column the condition will be (? will always be the last value seen for the given column)
// (k1 > ?)
// For two columns
// (k1 > ?) OR (k1 = ? AND k2 > ?)
// For four columns
// (k1 > ?) OR (k1 = ? AND k2 > ?) OR (k1 = ? AND k2 = ? AND k3 > ?) OR (k1 = ? AND k2 = ? AND k3 = ? AND k4 > ?)
// etc.
final List<Column> pkColumns = table.primaryKeyColumns();
if (pkColumns.size() > 1) {
sql.append('(');
}
for (int i = 0; i < pkColumns.size(); i++) {
final boolean isLastIterationForI = (i == pkColumns.size() - 1);
sql.append('(');
for (int j = 0; j < i + 1; j++) {
final boolean isLastIterationForJ = (i == j);
sql.append(pkColumns.get(j).name());
sql.append(isLastIterationForJ ? " > ?" : " = ?");
if (!isLastIterationForJ) {
sql.append(" AND ");
}
}
sql.append(")");
if (!isLastIterationForI) {
sql.append(" OR ");
}
}
if (pkColumns.size() > 1) {
sql.append(')');
}
}
protected String buildMaxPrimaryKeyQuery(Table table) {
final String orderBy = table.primaryKeyColumns().stream()
.map(Column::name)
@ -346,7 +378,7 @@ private void createDataEventsForTable() {
}
context.nextChunkPosition(lastKey);
if (lastRow != null) {
LOGGER.debug("\t Next window will resume from '{}'", context.chunkEndPosititon());
LOGGER.debug("\t Next window will resume from {}", (Object) context.chunkEndPosititon());
}
LOGGER.debug("\t Finished exporting {} records for window of table table '{}'; total duration '{}'", rows,
@ -376,10 +408,18 @@ protected PreparedStatement readTableChunkStatement(String sql) throws SQLExcept
if (context.isNonInitialChunk()) {
final Object[] maximumKey = context.maximumKey().get();
final Object[] chunkEndPosition = context.chunkEndPosititon();
// Fill boundaries placeholders
int pos = 0;
for (int i = 0; i < chunkEndPosition.length; i++) {
statement.setObject(i + 1, chunkEndPosition[i]);
statement.setObject(i + 1 + chunkEndPosition.length, chunkEndPosition[i]);
statement.setObject(i + 1 + 2 * chunkEndPosition.length, maximumKey[i]);
for (int j = 0; j < i + 1; j++) {
statement.setObject(++pos, chunkEndPosition[j]);
}
}
// Fill maximum key placeholders
for (int i = 0; i < chunkEndPosition.length; i++) {
for (int j = 0; j < i + 1; j++) {
statement.setObject(++pos, maximumKey[j]);
}
}
}
return statement;

View File

@ -42,7 +42,29 @@ public String getConnectorName() {
}
@Test
public void testBuildQuery() {
public void testBuildQueryOnePkColumn() {
final SignalBasedIncrementalSnapshotChangeEventSource<TableId> source = new SignalBasedIncrementalSnapshotChangeEventSource<>(
config(), new JdbcConnection(config().getConfig(), config -> null, "\"", "\""), null, null, null, SnapshotProgressListener.NO_OP,
DataChangeEventListener.NO_OP);
final IncrementalSnapshotContext<TableId> context = new SignalBasedIncrementalSnapshotContext<>();
source.setContext(context);
final Column pk1 = Column.editor().name("pk1").create();
final Column val1 = Column.editor().name("val1").create();
final Column val2 = Column.editor().name("val2").create();
final Table table = Table.editor().tableId(new TableId(null, "s1", "table1"))
.addColumn(pk1)
.addColumn(val1)
.addColumn(val2)
.setPrimaryKeyNames("pk1").create();
Assertions.assertThat(source.buildChunkQuery(table)).isEqualTo("SELECT * FROM \"s1\".\"table1\" ORDER BY pk1 LIMIT 1024");
context.nextChunkPosition(new Object[]{ 1, 5 });
context.maximumKey(new Object[]{ 10, 50 });
Assertions.assertThat(source.buildChunkQuery(table)).isEqualTo(
"SELECT * FROM \"s1\".\"table1\" WHERE (pk1 > ?) AND NOT (pk1 > ?) ORDER BY pk1 LIMIT 1024");
}
@Test
public void testBuildQueryThreePkColumns() {
final SignalBasedIncrementalSnapshotChangeEventSource<TableId> source = new SignalBasedIncrementalSnapshotChangeEventSource<>(
config(), new JdbcConnection(config().getConfig(), config -> null, "\"", "\""), null, null, null, SnapshotProgressListener.NO_OP,
DataChangeEventListener.NO_OP);
@ -50,15 +72,21 @@ public void testBuildQuery() {
source.setContext(context);
final Column pk1 = Column.editor().name("pk1").create();
final Column pk2 = Column.editor().name("pk2").create();
final Column pk3 = Column.editor().name("pk3").create();
final Column val1 = Column.editor().name("val1").create();
final Column val2 = Column.editor().name("val2").create();
final Table table = Table.editor().tableId(new TableId(null, "s1", "table1")).addColumn(pk1).addColumn(pk2)
.addColumn(val1).addColumn(val2).setPrimaryKeyNames("pk1", "pk2").create();
Assertions.assertThat(source.buildChunkQuery(table)).isEqualTo("SELECT * FROM \"s1\".\"table1\" ORDER BY pk1, pk2 LIMIT 1024");
final Table table = Table.editor().tableId(new TableId(null, "s1", "table1"))
.addColumn(pk1)
.addColumn(pk2)
.addColumn(pk3)
.addColumn(val1)
.addColumn(val2)
.setPrimaryKeyNames("pk1", "pk2", "pk3").create();
Assertions.assertThat(source.buildChunkQuery(table)).isEqualTo("SELECT * FROM \"s1\".\"table1\" ORDER BY pk1, pk2, pk3 LIMIT 1024");
context.nextChunkPosition(new Object[]{ 1, 5 });
context.maximumKey(new Object[]{ 10, 50 });
Assertions.assertThat(source.buildChunkQuery(table)).isEqualTo(
"SELECT * FROM \"s1\".\"table1\" WHERE pk1 >= ? AND pk2 >= ? AND NOT (pk1 = ? AND pk2 = ?) AND pk1 <= ? AND pk2 <= ? ORDER BY pk1, pk2 LIMIT 1024");
"SELECT * FROM \"s1\".\"table1\" WHERE ((pk1 > ?) OR (pk1 = ? AND pk2 > ?) OR (pk1 = ? AND pk2 = ? AND pk3 > ?)) AND NOT ((pk1 > ?) OR (pk1 = ? AND pk2 > ?) OR (pk1 = ? AND pk2 = ? AND pk3 > ?)) ORDER BY pk1, pk2, pk3 LIMIT 1024");
}
@Test

View File

@ -78,11 +78,21 @@ protected Map<Integer, Integer> consumeMixedWithIncrementalSnapshot(int recordCo
protected Map<Integer, Integer> consumeMixedWithIncrementalSnapshot(int recordCount,
Predicate<Map.Entry<Integer, Integer>> dataCompleted, Consumer<List<SourceRecord>> recordConsumer)
throws InterruptedException {
return consumeMixedWithIncrementalSnapshot(recordCount, dataCompleted, k -> k.getInt32(pkFieldName()), topicName(), recordConsumer);
}
protected Map<Integer, Integer> consumeMixedWithIncrementalSnapshot(
int recordCount,
Predicate<Map.Entry<Integer, Integer>> dataCompleted,
Function<Struct, Integer> idCalculator,
String topicName,
Consumer<List<SourceRecord>> recordConsumer)
throws InterruptedException {
final Map<Integer, Integer> dbChanges = new HashMap<>();
int noRecords = 0;
for (;;) {
final SourceRecords records = consumeRecordsByTopic(1);
final List<SourceRecord> dataRecords = records.recordsForTopic(topicName());
final List<SourceRecord> dataRecords = records.recordsForTopic(topicName);
if (records.allRecordsInOrder().isEmpty()) {
noRecords++;
Assertions.assertThat(noRecords).describedAs("Too many no data record results")
@ -94,7 +104,7 @@ protected Map<Integer, Integer> consumeMixedWithIncrementalSnapshot(int recordCo
continue;
}
dataRecords.forEach(record -> {
final int id = ((Struct) record.key()).getInt32(pkFieldName());
final int id = idCalculator.apply((Struct) record.key());
final int value = ((Struct) record.value()).getStruct("after").getInt32(valueFieldName());
dbChanges.put(id, value);
});