DBZ-4107 Incremental snapshot doesn't work without primary key

This commit is contained in:
harveyyue 2021-11-01 23:13:52 +08:00 committed by Jiri Pechanec
parent 1226c3b2ab
commit aa3ae05e22
9 changed files with 171 additions and 46 deletions

View File

@ -13,7 +13,6 @@
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.mysql.signal.ExecuteSnapshotKafkaSignal;
import io.debezium.connector.mysql.signal.KafkaSignalThread;
import io.debezium.jdbc.JdbcConnection;
@ -23,6 +22,7 @@
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.DatabaseSchema;
import io.debezium.util.Clock;
@ -81,9 +81,11 @@ public class MySqlReadOnlyIncrementalSnapshotChangeEventSource<T extends DataCol
private final String showMasterStmt = "SHOW MASTER STATUS";
private final KafkaSignalThread<T> kafkaSignal;
public MySqlReadOnlyIncrementalSnapshotChangeEventSource(CommonConnectorConfig config, JdbcConnection jdbcConnection,
public MySqlReadOnlyIncrementalSnapshotChangeEventSource(RelationalDatabaseConnectorConfig config,
JdbcConnection jdbcConnection,
EventDispatcher<T> dispatcher,
DatabaseSchema<?> databaseSchema, Clock clock,
DatabaseSchema<?> databaseSchema,
Clock clock,
SnapshotProgressListener progressListener,
DataChangeEventListener dataChangeEventListener) {
super(config, jdbcConnection, dispatcher, databaseSchema, clock, progressListener, dataChangeEventListener);

View File

@ -31,6 +31,7 @@
import io.debezium.connector.mysql.signal.KafkaSignalThread;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.kafka.KafkaCluster;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.util.Collect;
import io.debezium.util.Testing;
@ -75,7 +76,8 @@ protected Configuration.Builder config() {
.with(MySqlConnectorConfig.TABLE_EXCLUDE_LIST, DATABASE.getDatabaseName() + "." + EXCLUDED_TABLE)
.with(MySqlConnectorConfig.READ_ONLY_CONNECTION, true)
.with(KafkaSignalThread.SIGNAL_TOPIC, getSignalsTopic())
.with(KafkaSignalThread.BOOTSTRAP_SERVERS, kafka.brokerList());
.with(KafkaSignalThread.BOOTSTRAP_SERVERS, kafka.brokerList())
.with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, String.format("%s:%s", DATABASE.qualifiedTableName("a42"), "pk1,pk2,pk3,pk4"));
}
private String getSignalsTopic() {
@ -83,9 +85,13 @@ private String getSignalsTopic() {
}
protected void sendExecuteSnapshotKafkaSignal() throws ExecutionException, InterruptedException {
sendExecuteSnapshotKafkaSignal(tableDataCollectionId());
}
protected void sendExecuteSnapshotKafkaSignal(String fullTableNames) throws ExecutionException, InterruptedException {
String signalValue = String.format(
"{\"type\":\"execute-snapshot\",\"data\": {\"data-collections\": [\"%s\"], \"type\": \"INCREMENTAL\"}}",
tableDataCollectionId());
fullTableNames);
final ProducerRecord<String, String> executeSnapshotSignal = new ProducerRecord<>(getSignalsTopic(), PARTITION_NO, SERVER_NAME, signalValue);
final Configuration signalProducerConfig = Configuration.create()
@ -154,6 +160,48 @@ public void filteredEvents() throws Exception {
}
}
@Test
public void inserts4Pks() throws Exception {
Testing.Print.enable();
populate4PkTable();
startConnector();
sendExecuteSnapshotKafkaSignal(DATABASE.qualifiedTableName("a4"));
final int expectedRecordCount = ROW_COUNT;
final Map<Integer, Integer> dbChanges = consumeMixedWithIncrementalSnapshot(
expectedRecordCount,
x -> true,
k -> k.getInt32("pk1") * 1_000 + k.getInt32("pk2") * 100 + k.getInt32("pk3") * 10 + k.getInt32("pk4"),
DATABASE.topicForTable("a4"),
null);
for (int i = 0; i < expectedRecordCount; i++) {
Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i));
}
}
@Test
public void insertsWithoutPks() throws Exception {
Testing.Print.enable();
populate4WithoutPkTable();
startConnector();
sendExecuteSnapshotKafkaSignal(DATABASE.qualifiedTableName("a42"));
final int expectedRecordCount = ROW_COUNT;
final Map<Integer, Integer> dbChanges = consumeMixedWithIncrementalSnapshot(
expectedRecordCount,
x -> true,
k -> k.getInt32("pk1") * 1_000 + k.getInt32("pk2") * 100 + k.getInt32("pk3") * 10 + k.getInt32("pk4"),
DATABASE.topicForTable("a42"),
null);
for (int i = 0; i < expectedRecordCount; i++) {
Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i));
}
}
@Test(expected = ConnectException.class)
@SkipWhenGtidModeIs(value = SkipWhenGtidModeIs.GtidMode.ON, reason = "Read only connection requires GTID_MODE to be ON")
public void shouldFailIfGtidModeIsOff() throws Exception {
@ -168,4 +216,16 @@ public void shouldFailIfGtidModeIsOff() throws Exception {
throw (RuntimeException) e;
}
}
protected void populate4PkTable() throws SQLException {
try (final JdbcConnection connection = databaseConnection()) {
populate4PkTable(connection, "a4");
}
}
protected void populate4WithoutPkTable() throws SQLException {
try (final JdbcConnection connection = databaseConnection()) {
populate4PkTable(connection, "a42");
}
}
}

View File

@ -12,6 +12,23 @@ CREATE TABLE b (
aa INTEGER
) AUTO_INCREMENT = 1;
CREATE TABLE a4 (
pk1 integer,
pk2 integer,
pk3 integer,
pk4 integer,
aa integer,
PRIMARY KEY(pk1, pk2, pk3, pk4)
);
CREATE TABLE a42 (
pk1 integer,
pk2 integer,
pk3 integer,
pk4 integer,
aa integer
);
CREATE TABLE debezium_signal (
id varchar(64),
type varchar(32),

View File

@ -8,13 +8,13 @@
import java.sql.SQLException;
import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
import io.debezium.schema.DatabaseSchema;
import io.debezium.util.Clock;
@ -27,7 +27,7 @@ public class OracleSignalBasedIncrementalSnapshotChangeEventSource extends Signa
private final String pdbName;
private final OracleConnection connection;
public OracleSignalBasedIncrementalSnapshotChangeEventSource(CommonConnectorConfig config,
public OracleSignalBasedIncrementalSnapshotChangeEventSource(RelationalDatabaseConnectorConfig config,
JdbcConnection jdbcConnection,
EventDispatcher<TableId> dispatcher,
DatabaseSchema<?> databaseSchema,

View File

@ -19,6 +19,7 @@
import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotTest;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.util.Testing;
public class IncrementalSnapshotIT extends AbstractIncrementalSnapshotTest<PostgresConnector> {
@ -28,6 +29,7 @@ public class IncrementalSnapshotIT extends AbstractIncrementalSnapshotTest<Postg
private static final String SETUP_TABLES_STMT = "DROP SCHEMA IF EXISTS s1 CASCADE;" + "CREATE SCHEMA s1; "
+ "CREATE SCHEMA s2; " + "CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));"
+ "CREATE TABLE s1.a4 (pk1 integer, pk2 integer, pk3 integer, pk4 integer, aa integer, PRIMARY KEY(pk1, pk2, pk3, pk4));"
+ "CREATE TABLE s1.a42 (pk1 integer, pk2 integer, pk3 integer, pk4 integer, aa integer);"
+ "CREATE TABLE s1.debezium_signal (id varchar(64), type varchar(32), data varchar(2048));";
@Before
@ -52,7 +54,8 @@ protected Configuration.Builder config() {
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
.with(PostgresConnectorConfig.SIGNAL_DATA_COLLECTION, "s1.debezium_signal")
.with(PostgresConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10)
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, false);
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, false)
.with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, "s1.a42:pk1,pk2,pk3,pk4");
}
@Override
@ -127,28 +130,36 @@ public void inserts4Pks() throws Exception {
}
}
protected void populate4PkTable(JdbcConnection connection) throws SQLException {
connection.setAutoCommit(false);
for (int i = 0; i < ROW_COUNT; i++) {
final int id = i + 1;
final int pk1 = id / 1000;
final int pk2 = (id / 100) % 10;
final int pk3 = (id / 10) % 10;
final int pk4 = id % 10;
connection.executeWithoutCommitting(String.format("INSERT INTO %s (pk1, pk2, pk3, pk4, aa) VALUES (%s, %s, %s, %s, %s)",
"s1.a4",
pk1,
pk2,
pk3,
pk4,
i));
@Test
public void insertsWithoutPks() throws Exception {
Testing.Print.enable();
populate4WithoutPkTable();
startConnector();
sendAdHocSnapshotSignal("s1.a42");
final int expectedRecordCount = ROW_COUNT;
final Map<Integer, Integer> dbChanges = consumeMixedWithIncrementalSnapshot(
expectedRecordCount,
x -> true,
k -> k.getInt32("pk1") * 1_000 + k.getInt32("pk2") * 100 + k.getInt32("pk3") * 10 + k.getInt32("pk4"),
"test_server.s1.a42",
null);
for (int i = 0; i < expectedRecordCount; i++) {
Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i));
}
connection.commit();
}
protected void populate4PkTable() throws SQLException {
try (final JdbcConnection connection = databaseConnection()) {
populate4PkTable(connection);
populate4PkTable(connection, "s1.a4");
}
}
protected void populate4WithoutPkTable() throws SQLException {
try (final JdbcConnection connection = databaseConnection()) {
populate4PkTable(connection, "s1.a42");
}
}
}

View File

@ -22,7 +22,6 @@
import io.debezium.DebeziumException;
import io.debezium.annotation.NotThreadSafe;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
@ -31,6 +30,8 @@
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.Column;
import io.debezium.relational.Key.KeyMapper;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.RelationalDatabaseSchema;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.relational.SnapshotChangeRecordEmitter;
@ -53,7 +54,7 @@ public abstract class AbstractIncrementalSnapshotChangeEventSource<T extends Dat
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIncrementalSnapshotChangeEventSource.class);
private final CommonConnectorConfig connectorConfig;
private final RelationalDatabaseConnectorConfig connectorConfig;
private final Clock clock;
private final RelationalDatabaseSchema databaseSchema;
private final SnapshotProgressListener progressListener;
@ -67,8 +68,12 @@ public abstract class AbstractIncrementalSnapshotChangeEventSource<T extends Dat
protected JdbcConnection jdbcConnection;
protected final Map<Struct, Object[]> window = new LinkedHashMap<>();
public AbstractIncrementalSnapshotChangeEventSource(CommonConnectorConfig config, JdbcConnection jdbcConnection, EventDispatcher<T> dispatcher,
DatabaseSchema<?> databaseSchema, Clock clock, SnapshotProgressListener progressListener,
public AbstractIncrementalSnapshotChangeEventSource(RelationalDatabaseConnectorConfig config,
JdbcConnection jdbcConnection,
EventDispatcher<T> dispatcher,
DatabaseSchema<?> databaseSchema,
Clock clock,
SnapshotProgressListener progressListener,
DataChangeEventListener dataChangeEventListener) {
this.connectorConfig = config;
this.jdbcConnection = jdbcConnection;
@ -154,7 +159,7 @@ protected String buildChunkQuery(Table table) {
addLowerBound(table, sql);
condition = sql.toString();
}
final String orderBy = table.primaryKeyColumns().stream()
final String orderBy = getKeyMapper().getKeyKolumns(table).stream()
.map(Column::name)
.collect(Collectors.joining(", "));
return jdbcConnection.buildSelectWithRowLimits(table.id(),
@ -174,7 +179,7 @@ private void addLowerBound(Table table, StringBuilder sql) {
// For four columns
// (k1 > ?) OR (k1 = ? AND k2 > ?) OR (k1 = ? AND k2 = ? AND k3 > ?) OR (k1 = ? AND k2 = ? AND k3 = ? AND k4 > ?)
// etc.
final List<Column> pkColumns = table.primaryKeyColumns();
final List<Column> pkColumns = getKeyMapper().getKeyKolumns(table);
if (pkColumns.size() > 1) {
sql.append('(');
}
@ -200,7 +205,7 @@ private void addLowerBound(Table table, StringBuilder sql) {
}
protected String buildMaxPrimaryKeyQuery(Table table) {
final String orderBy = table.primaryKeyColumns().stream()
final String orderBy = getKeyMapper().getKeyKolumns(table).stream()
.map(Column::name)
.collect(Collectors.joining(" DESC, ")) + " DESC";
return jdbcConnection.buildSelectWithRowLimits(table.id(), 1, "*", Optional.empty(), orderBy);
@ -251,7 +256,7 @@ protected void readChunk() throws InterruptedException {
nextDataCollection();
continue;
}
if (currentTable.primaryKeyColumns().isEmpty()) {
if (getKeyMapper().getKeyKolumns(currentTable).isEmpty()) {
LOGGER.warn("Incremental snapshot for table '{}' skipped cause the table has no primary keys", currentTableId);
nextDataCollection();
continue;
@ -321,7 +326,7 @@ public void addDataCollectionNamesToSnapshot(List<String> dataCollectionIds, Off
}
protected void addKeyColumnsToCondition(Table table, StringBuilder sql, String predicate) {
for (Iterator<Column> i = table.primaryKeyColumns().iterator(); i.hasNext();) {
for (Iterator<Column> i = getKeyMapper().getKeyKolumns(table).iterator(); i.hasNext();) {
final Column key = i.next();
sql.append(key.name()).append(predicate);
if (i.hasNext()) {
@ -433,7 +438,7 @@ private Object[] keyFromRow(Object[] row) {
if (row == null) {
return null;
}
final List<Column> keyColumns = currentTable.primaryKeyColumns();
final List<Column> keyColumns = getKeyMapper().getKeyKolumns(currentTable);
final Object[] key = new Object[keyColumns.size()];
for (int i = 0; i < keyColumns.size(); i++) {
key[i] = row[keyColumns.get(i).position() - 1];
@ -463,4 +468,8 @@ protected void postReadChunk(IncrementalSnapshotContext<T> context) {
protected void postIncrementalSnapshotCompleted() {
// no-op
}
private KeyMapper getKeyMapper() {
return connectorConfig.getKeyMapper() == null ? table -> table.primaryKeyColumns() : connectorConfig.getKeyMapper();
}
}

View File

@ -11,13 +11,13 @@
import org.slf4j.LoggerFactory;
import io.debezium.annotation.NotThreadSafe;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.DatabaseSchema;
import io.debezium.util.Clock;
@ -28,8 +28,10 @@ public class SignalBasedIncrementalSnapshotChangeEventSource<T extends DataColle
private static final Logger LOGGER = LoggerFactory.getLogger(SignalBasedIncrementalSnapshotChangeEventSource.class);
private final String signalWindowStatement;
public SignalBasedIncrementalSnapshotChangeEventSource(CommonConnectorConfig config, JdbcConnection jdbcConnection,
EventDispatcher<T> dispatcher, DatabaseSchema<?> databaseSchema, Clock clock,
public SignalBasedIncrementalSnapshotChangeEventSource(RelationalDatabaseConnectorConfig config,
JdbcConnection jdbcConnection,
EventDispatcher<T> dispatcher, DatabaseSchema<?> databaseSchema,
Clock clock,
SnapshotProgressListener progressListener,
DataChangeEventListener dataChangeEventListener) {
super(config, jdbcConnection, dispatcher, databaseSchema, clock, progressListener, dataChangeEventListener);

View File

@ -8,22 +8,23 @@
import org.fest.assertions.Assertions;
import org.junit.Test;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnFilterMode;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
public class SignalBasedSnapshotChangeEventSourceTest {
protected CommonConnectorConfig config() {
return new CommonConnectorConfig(
Configuration.create().with(CommonConnectorConfig.SIGNAL_DATA_COLLECTION, "debezium.signal").build(),
"core", 0) {
protected RelationalDatabaseConnectorConfig config() {
return new RelationalDatabaseConnectorConfig(
Configuration.create().with(RelationalDatabaseConnectorConfig.SIGNAL_DATA_COLLECTION, "debezium.signal").build(),
"core", null, null, 0, ColumnFilterMode.CATALOG) {
@Override
protected SourceInfoStructMaker<?> getSourceInfoStructMaker(Version version) {
return null;

View File

@ -57,20 +57,43 @@ protected String tableDataCollectionId() {
return tableName();
}
protected void populateTable(JdbcConnection connection) throws SQLException {
protected void populateTable(JdbcConnection connection, String tableName) throws SQLException {
connection.setAutoCommit(false);
for (int i = 0; i < ROW_COUNT; i++) {
connection.executeWithoutCommitting(String.format("INSERT INTO %s (pk, aa) VALUES (%s, %s)", tableName(), i + 1, i));
connection.executeWithoutCommitting(String.format("INSERT INTO %s (pk, aa) VALUES (%s, %s)", tableName, i + 1, i));
}
connection.commit();
}
protected void populateTable(JdbcConnection connection) throws SQLException {
populateTable(connection, tableName());
}
protected void populateTable() throws SQLException {
try (final JdbcConnection connection = databaseConnection()) {
populateTable(connection);
}
}
protected void populate4PkTable(JdbcConnection connection, String tableName) throws SQLException {
connection.setAutoCommit(false);
for (int i = 0; i < ROW_COUNT; i++) {
final int id = i + 1;
final int pk1 = id / 1000;
final int pk2 = (id / 100) % 10;
final int pk3 = (id / 10) % 10;
final int pk4 = id % 10;
connection.executeWithoutCommitting(String.format("INSERT INTO %s (pk1, pk2, pk3, pk4, aa) VALUES (%s, %s, %s, %s, %s)",
tableName,
pk1,
pk2,
pk3,
pk4,
i));
}
connection.commit();
}
protected Map<Integer, Integer> consumeMixedWithIncrementalSnapshot(int recordCount) throws InterruptedException {
return consumeMixedWithIncrementalSnapshot(recordCount, x -> true, null);
}
@ -163,7 +186,7 @@ protected void startConnector(Function<Configuration.Builder, Configuration.Buil
start(connectorClass(), config, callback);
waitForConnectorToStart();
waitForAvailableRecords(1, TimeUnit.SECONDS);
waitForAvailableRecords(5, TimeUnit.SECONDS);
// there shouldn't be any snapshot records
assertNoRecordsToConsume();
}