DBZ-4605 Upgrade binlog client; test MySQL partial update

Co-authored-by: Gunnar Morling <gunnar.morling@googlemail.com>
This commit is contained in:
Jiri Pechanec 2022-02-02 10:36:56 +01:00 committed by GitHub
parent 3ee24d644e
commit fafcea9fe4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 54 additions and 1 deletions

View File

@ -16,6 +16,7 @@
import java.util.function.Consumer; import java.util.function.Consumer;
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -157,6 +158,58 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio
} }
} }
@Test
@FixFor("DBZ-4605")
public void shouldProcessUpdate() throws SQLException, InterruptedException {
// Use the DB configuration to define the connector's configuration ...
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NEVER)
.build();
// Start the connector ...
start(MySqlConnector.class, config);
// ---------------------------------------------------------------------------------------------------------------
// Consume all of the events due to startup and initialization of the database
// ---------------------------------------------------------------------------------------------------------------
Testing.Debug.enable();
final int numCreateDatabase = 1;
final int numCreateTables = 1;
final int numDataRecords = 41;
final SourceRecords recordsInitial = consumeRecordsByTopic(numCreateDatabase + numCreateTables + numDataRecords);
assertThat(recordsInitial).isNotNull();
assertThat(recordsInitial.recordsForTopic(DATABASE.getServerName()).size()).isEqualTo(numCreateDatabase + numCreateTables);
assertThat(recordsInitial.recordsForTopic(DATABASE.topicForTable("dbz_126_jsontable")).size()).isEqualTo(numDataRecords);
try (MySqlTestConnection conn = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName())) {
conn.execute(
"CREATE TABLE IF NOT EXISTS deals ( id int(11) unsigned NOT NULL AUTO_INCREMENT, company_id int(11) unsigned DEFAULT NULL, title varchar(255) DEFAULT NULL, custom_fields json DEFAULT NULL, PRIMARY KEY (id), KEY idx_company_id (company_id)) ENGINE=InnoDB DEFAULT CHARSET=utf8",
"INSERT INTO deals (title, custom_fields) VALUES ('test', '"
+ "{"
+ "\"17fc9889474028063990914001f6854f6b8b5784\":\"test_field_for_remove_fields_behaviour_2\","
+ "\"1f3a2ea5bc1f60258df20521bee9ac636df69a3a\":{\"currency\":\"USD\"},"
+ "\"4f4d99a438f334d7dbf83a1816015b361b848b3b\":{\"currency\":\"USD\"},"
+ "\"9021162291be72f5a8025480f44bf44d5d81d07c\":\"test_field_for_remove_fields_behaviour_3_will_be_removed\","
+ "\"9b0ed11532efea688fdf12b28f142b9eb08a80c5\":{\"currency\":\"USD\"},"
+ "\"e65ad0762c259b05b4866f7249eabecabadbe577\":\"test_field_for_remove_fields_behaviour_1_updated\","
+ "\"ff2c07edcaa3e987c23fb5cc4fe860bb52becf00\":{\"currency\":\"USD\"}"
+ "}')",
"UPDATE deals SET custom_fields = JSON_REMOVE(custom_fields, '$.\"17fc9889474028063990914001f6854f6b8b5784\"')");
}
final SourceRecords records = consumeRecordsByTopic(3);
assertThat(records.recordsForTopic(DATABASE.topicForTable("deals")).size()).isEqualTo(2);
final SourceRecord update = records.allRecordsInOrder().get(2);
assertThat(((Struct) update.value()).getStruct("after").getString("custom_fields")).isEqualTo(
"{"
+ "\"1f3a2ea5bc1f60258df20521bee9ac636df69a3a\":{\"currency\":\"USD\"},"
+ "\"4f4d99a438f334d7dbf83a1816015b361b848b3b\":{\"currency\":\"USD\"},"
+ "\"9021162291be72f5a8025480f44bf44d5d81d07c\":\"test_field_for_remove_fields_behaviour_3_will_be_removed\","
+ "\"9b0ed11532efea688fdf12b28f142b9eb08a80c5\":{\"currency\":\"USD\"},"
+ "\"e65ad0762c259b05b4866f7249eabecabadbe577\":\"test_field_for_remove_fields_behaviour_1_updated\","
+ "\"ff2c07edcaa3e987c23fb5cc4fe860bb52becf00\":{\"currency\":\"USD\"}"
+ "}");
stopConnector();
}
protected void check(String json, String expectedBinlog, Consumer<String> msg) { protected void check(String json, String expectedBinlog, Consumer<String> msg) {
if ((json == null && expectedBinlog != null) || (json != null && !json.equals(expectedBinlog))) { if ((json == null && expectedBinlog != null) || (json != null && !json.equals(expectedBinlog))) {
msg.accept("JSON was: " + json + System.lineSeparator() + "but expected: " + expectedBinlog); msg.accept("JSON was: " + json + System.lineSeparator() + "but expected: " + expectedBinlog);

View File

@ -121,7 +121,7 @@
<!-- Database drivers, should align with databases --> <!-- Database drivers, should align with databases -->
<version.postgresql.driver>42.3.2</version.postgresql.driver> <version.postgresql.driver>42.3.2</version.postgresql.driver>
<version.mysql.driver>8.0.27</version.mysql.driver> <version.mysql.driver>8.0.27</version.mysql.driver>
<version.mysql.binlog>0.25.4</version.mysql.binlog> <version.mysql.binlog>0.25.5</version.mysql.binlog>
<version.mongo.driver>4.3.3</version.mongo.driver> <version.mongo.driver>4.3.3</version.mongo.driver>
<version.sqlserver.driver>9.4.1.jre8</version.sqlserver.driver> <version.sqlserver.driver>9.4.1.jre8</version.sqlserver.driver>
<version.oracle.driver>21.1.0.0</version.oracle.driver> <version.oracle.driver>21.1.0.0</version.oracle.driver>