diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java index 62f39c506..4b8afa6c2 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java @@ -5,6 +5,7 @@ */ package io.debezium.connector.mysql; +import java.io.IOException; import java.nio.ByteOrder; import java.nio.charset.Charset; import java.nio.charset.IllegalCharsetNameException; @@ -17,12 +18,15 @@ import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.source.SourceRecord; import com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer; +import com.github.shyiko.mysql.binlog.event.deserialization.json.JsonBinary; import com.mysql.jdbc.CharsetMapping; import io.debezium.annotation.Immutable; +import io.debezium.data.Json; import io.debezium.jdbc.JdbcValueConverters; import io.debezium.relational.Column; import io.debezium.relational.ValueConverter; @@ -83,6 +87,9 @@ protected ByteOrder byteOrderOfBitType() { public SchemaBuilder schemaBuilder(Column column) { // Handle a few MySQL-specific types based upon how they are handled by the MySQL binlog client ... String typeName = column.typeName().toUpperCase(); + if (matches(typeName, "JSON")) { + return Json.builder(); + } if (matches(typeName, "YEAR")) { return Year.builder(); } @@ -102,6 +109,9 @@ public SchemaBuilder schemaBuilder(Column column) { public ValueConverter converter(Column column, Field fieldDefn) { // Handle a few MySQL-specific types based upon how they are handled by the MySQL binlog client ... String typeName = column.typeName().toUpperCase(); + if (matches(typeName, "JSON")) { + return (data) -> convertJson(column, fieldDefn, data); + } if (matches(typeName, "YEAR")) { return (data) -> convertYearToInt(column, fieldDefn, data); } @@ -168,6 +178,40 @@ protected Charset charsetFor(Column column) { return null; } + /** + * Convert the {@link String} {@code byte[]} value to a string value used in a {@link SourceRecord}. + * + * @param column the column in which the value appears + * @param fieldDefn the field definition for the {@link SourceRecord}'s {@link Schema}; never null + * @param data the data; may be null + * @return the converted value, or null if the conversion could not be made and the column allows nulls + * @throws IllegalArgumentException if the value could not be converted but the column does not allow nulls + */ + protected Object convertJson(Column column, Field fieldDefn, Object data) { + if (data == null) { + data = fieldDefn.schema().defaultValue(); + } + if (data == null) { + if (column.isOptional()) return null; + return "{}"; + } + if (data instanceof byte[]) { + // The BinlogReader sees these JSON values as binary encoded, so we use the binlog client library's utility + // to parse MySQL's internal binary representation into a JSON string, using the standard formatter. + try { + String json = JsonBinary.parseAsString((byte[])data); + return json; + } catch ( IOException e) { + throw new ConnectException("Failed to parse and read a JSON value on " + column + ": " + e.getMessage(), e); + } + } + if (data instanceof String) { + // The SnapshotReader sees JSON values as UTF-8 encoded strings. + return data; + } + return handleUnknownData(column, fieldDefn, data); + } + /** * Convert the {@link String} or {@code byte[]} value to a string value used in a {@link SourceRecord}. * diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RowDeserializers.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RowDeserializers.java index de7d0514b..90bb99704 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RowDeserializers.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RowDeserializers.java @@ -101,6 +101,11 @@ protected Serializable deserializeTimestamp(ByteArrayInputStream inputStream) th protected Serializable deserializeTimestampV2(int meta, ByteArrayInputStream inputStream) throws IOException { return RowDeserializers.deserializeTimestampV2(meta, inputStream); } + + @Override + protected Serializable deserializeYear(ByteArrayInputStream inputStream) throws IOException { + return RowDeserializers.deserializeYear(inputStream); + } } /** @@ -158,6 +163,11 @@ protected Serializable deserializeTimestamp(ByteArrayInputStream inputStream) th protected Serializable deserializeTimestampV2(int meta, ByteArrayInputStream inputStream) throws IOException { return RowDeserializers.deserializeTimestampV2(meta, inputStream); } + + @Override + protected Serializable deserializeYear(ByteArrayInputStream inputStream) throws IOException { + return RowDeserializers.deserializeYear(inputStream); + } } /** diff --git a/debezium-connector-mysql/src/test/docker/init/setup.sql b/debezium-connector-mysql/src/test/docker/init/setup.sql index c17152724..9db3a2df7 100644 --- a/debezium-connector-mysql/src/test/docker/init/setup.sql +++ b/debezium-connector-mysql/src/test/docker/init/setup.sql @@ -180,7 +180,6 @@ VALUES (default, '2016-01-16', 1001, 1, 102), -- The integration test for this database expects to scans all of the binlog events associated with this database -- without error or problems. The integration test does not modify any records in this database, so this script -- must contain all operations to these tables. -# CREATE DATABASE regression_test; USE regression_test; @@ -274,3 +273,146 @@ INSERT INTO dbz_123_bitvaluetest VALUES (b'1',b'10',b'01000000',b'10110111000001 -- DBZ-104 handle create table like ... CREATE TABLE dbz_104_customers LIKE connector_test.customers; INSERT INTO dbz_104_customers SELECT * FROM connector_test.customers; + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: json_test +-- ---------------------------------------------------------------------------------------------------------------- +-- The integration test for this database expects to scans all of the binlog events associated with this database +-- without error or problems. The integration test does not modify any records in this database, so this script +-- must contain all operations to these tables. +-- +-- This relies upon MySQL 5.7's JSON datatype. +CREATE DATABASE json_test; +USE json_test; + +-- DBZ-126 handle JSON column types ... +CREATE TABLE dbz_126_jsontable ( + id INT AUTO_INCREMENT NOT NULL, + json JSON, + expectedJdbcStr VARCHAR(256), -- value that we get back from JDBC + expectedBinlogStr VARCHAR(256), -- value we parse from the binlog + PRIMARY KEY(id) +) DEFAULT CHARSET=utf8; +INSERT INTO dbz_126_jsontable VALUES (default,NULL, + NULL, + NULL); +INSERT INTO dbz_126_jsontable VALUES (default,'{"a": 2}', + '{"a": 2}', + '{"a":2}'); +INSERT INTO dbz_126_jsontable VALUES (default,'[1, 2]', + '[1, 2]', + '[1,2]'); +INSERT INTO dbz_126_jsontable VALUES (default,'{"key1": "value1", "key2": "value2"}', + '{"key1": "value1", "key2": "value2"}', + '{"key1":"value1","key2":"value2"}'); +INSERT INTO dbz_126_jsontable VALUES (default,'["a", "b",1]', + '["a", "b",1]', + '["a","b",1]'); +INSERT INTO dbz_126_jsontable VALUES (default,'{"k1": "v1", "k2": {"k21": "v21", "k22": "v22"}, "k3": ["a", "b", 1]}', + '{"k1": "v1", "k2": {"k21": "v21", "k22": "v22"}, "k3": ["a", "b", 1]}', + '{"k1":"v1","k2":{"k21":"v21","k22":"v22"},"k3":["a","b",1]}'); +INSERT INTO dbz_126_jsontable VALUES (default,'{"a": "b", "c": "d", "ab": "abc", "bc": ["x", "y"]}', + '{"a": "b", "c": "d", "ab": "abc", "bc": ["x", "y"]}', + '{"a":"b","c":"d","ab":"abc","bc":["x","y"]}'); +INSERT INTO dbz_126_jsontable VALUES (default,'["here", ["I", "am"], "!!!"]', + '["here", ["I", "am"], "!!!"]', + '["here",["I","am"],"!!!"]'); +INSERT INTO dbz_126_jsontable VALUES (default,'"scalar string"', + '"scalar string"', + '"scalar string"'); +INSERT INTO dbz_126_jsontable VALUES (default,'true', + 'true', + 'true'); +INSERT INTO dbz_126_jsontable VALUES (default,'false', + 'false', + 'false'); +INSERT INTO dbz_126_jsontable VALUES (default,'null', + 'null', + 'null'); +INSERT INTO dbz_126_jsontable VALUES (default,'-1', + '-1', + '-1'); +INSERT INTO dbz_126_jsontable VALUES (default,CAST(CAST(1 AS UNSIGNED) AS JSON), + '1', + '1'); +INSERT INTO dbz_126_jsontable VALUES (default,'32767', + '32767', + '32767'); +INSERT INTO dbz_126_jsontable VALUES (default,'32768', + '32768', + '32768'); +INSERT INTO dbz_126_jsontable VALUES (default,'-32768', + '-32768', + '-32768'); +INSERT INTO dbz_126_jsontable VALUES (default,'2147483647', -- INT32 + '2147483647', + '2147483647'); +INSERT INTO dbz_126_jsontable VALUES (default,'2147483648', -- INT64 + '2147483648', + '2147483648'); +INSERT INTO dbz_126_jsontable VALUES (default,'-2147483648', -- INT32 + '-2147483648', + '-2147483648'); +INSERT INTO dbz_126_jsontable VALUES (default,'-2147483649', -- INT64 + '-2147483649', + '-2147483649'); +INSERT INTO dbz_126_jsontable VALUES (default,'18446744073709551615', -- INT64 + '18446744073709551615', + '18446744073709551615'); +INSERT INTO dbz_126_jsontable VALUES (default,'18446744073709551616', -- BigInteger + '18446744073709551616', + '18446744073709551616'); +INSERT INTO dbz_126_jsontable VALUES (default,'3.14', + '3.14', + '3.14'); +INSERT INTO dbz_126_jsontable VALUES (default,'{}', + '{}', + '{}'); +INSERT INTO dbz_126_jsontable VALUES (default,'[]', + '[]', + '[]'); +INSERT INTO dbz_126_jsontable VALUES (default,CAST(CAST('2015-01-15 23:24:25' AS DATETIME) AS JSON), + '"2015-01-15 23:24:25"', + '"2015-01-15 23:24:25"'); +INSERT INTO dbz_126_jsontable VALUES (default,CAST(CAST('2015-01-15 23:24:25.12' AS DATETIME(3)) AS JSON), + '"2015-01-15 23:24:25.12"', + '"2015-01-15 23:24:25.12"'); +INSERT INTO dbz_126_jsontable VALUES (default,CAST(CAST('2015-01-15 23:24:25.0237' AS DATETIME(3)) AS JSON), + '"2015-01-15 23:24:25.024"', + '"2015-01-15 23:24:25.024"'); +INSERT INTO dbz_126_jsontable VALUES (default,CAST(CAST('23:24:25' AS TIME) AS JSON), + '"23:24:25"', + '"23:24:25"'); +INSERT INTO dbz_126_jsontable VALUES (default,CAST(CAST('23:24:25.12' AS TIME(3)) AS JSON), + '"23:24:25.12"', + '"23:24:25.12"'); +INSERT INTO dbz_126_jsontable VALUES (default,CAST(CAST('23:24:25.0237' AS TIME(3)) AS JSON), + '"23:24:25.024"', + '"23:24:25.024"'); +INSERT INTO dbz_126_jsontable VALUES (default,CAST(CAST('2015-01-15' AS DATE) AS JSON), + '"2015-01-15"', + '"2015-01-15"'); +INSERT INTO dbz_126_jsontable VALUES (default,CAST(TIMESTAMP'2015-01-15 23:24:25' AS JSON), + '"2015-01-15 23:24:25"', + '"2015-01-15 23:24:25"'); +INSERT INTO dbz_126_jsontable VALUES (default,CAST(TIMESTAMP'2015-01-15 23:24:25.12' AS JSON), + '"2015-01-15 23:24:25.12"', + '"2015-01-15 23:24:25.12"'); +INSERT INTO dbz_126_jsontable VALUES (default,CAST(TIMESTAMP'2015-01-15 23:24:25.0237' AS JSON), + '"2015-01-15 23:24:25.0237"', + '"2015-01-15 23:24:25.0237"'); +INSERT INTO dbz_126_jsontable VALUES (default,CAST(UNIX_TIMESTAMP('2015-01-15 23:24:25') AS JSON), + '1421364265', + '1421364265'); +INSERT INTO dbz_126_jsontable VALUES (default,CAST(ST_GeomFromText('POINT(1 1)') AS JSON), + '{\"type\": \"Point\", \"coordinates\": [1.0, 1.0]}', + '{\"type\":\"Point\",\"coordinates\":[1.0,1.0]}'); +INSERT INTO dbz_126_jsontable VALUES (default,CAST('[]' AS CHAR CHARACTER SET 'ascii'), + '[]', + '[]'); +INSERT INTO dbz_126_jsontable VALUES (default,CAST(x'cafe' AS JSON), -- BLOB as Base64 + '"yv4="', + '"yv4="'); +INSERT INTO dbz_126_jsontable VALUES (default,CAST(x'cafebabe' AS JSON), -- BLOB as Base64 + '"yv66vg=="', + '"yv66vg=="'); diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java index 231f0c160..c155cfd24 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java @@ -226,23 +226,32 @@ public void shouldValidateAcceptableConfiguration() { Recommender dbNameRecommender = MySqlConnectorConfig.DATABASE_WHITELIST.recommender(); List dbNames = dbNameRecommender.validValues(MySqlConnectorConfig.DATABASE_WHITELIST, config); Testing.debug("List of dbNames: " + dbNames); - assertThat(dbNames).containsOnly("connector_test", "readbinlog_test", "regression_test", + assertThat(dbNames).containsOnly("connector_test", "readbinlog_test", "regression_test", "json_test", "connector_test_ro", "emptydb"); Recommender tableNameRecommender = MySqlConnectorConfig.TABLE_WHITELIST.recommender(); List tableNames = tableNameRecommender.validValues(MySqlConnectorConfig.TABLE_WHITELIST, config); - assertThat(tableNames).contains("connector_test.customers", - "connector_test.orders", - "connector_test.products", - "connector_test.products_on_hand", - "connector_test_ro.customers", - "connector_test_ro.orders", - "connector_test_ro.products", - "connector_test_ro.products_on_hand", - "regression_test.t1464075356413_testtable6", - "regression_test.dbz_85_fractest", - "regression_test.dbz84_integer_types_table"); Testing.debug("List of tableNames: " + tableNames); + assertThat(tableNames).containsOnly("readbinlog_test.product", + "readbinlog_test.purchased", + "readbinlog_test.person", + "connector_test.customers", + "connector_test.orders", + "connector_test.products", + "connector_test.products_on_hand", + "connector_test_ro.customers", + "connector_test_ro.orders", + "connector_test_ro.products", + "connector_test_ro.products_on_hand", + "regression_test.t1464075356413_testtable6", + "regression_test.dbz_85_fractest", + "regression_test.dbz84_integer_types_table", + "regression_test.dbz_100_enumsettest", + "regression_test.dbz_102_charsettest", + "regression_test.dbz_114_zerovaluetest", + "regression_test.dbz_123_bitvaluetest", + "regression_test.dbz_104_customers", + "json_test.dbz_126_jsontable"); // Now set the whitelist to two databases ... Configuration config2 = config.edit() diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorJsonIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorJsonIT.java new file mode 100644 index 000000000..2a89c3b64 --- /dev/null +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorJsonIT.java @@ -0,0 +1,188 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.mysql; + +import static org.junit.Assert.fail; + +import java.nio.file.Path; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; + +import org.apache.kafka.connect.data.Struct; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.fest.assertions.Assertions.assertThat; + +import io.debezium.config.Configuration; +import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode; +import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode; +import io.debezium.data.Envelope; +import io.debezium.doc.FixFor; +import io.debezium.embedded.AbstractConnectorTest; +import io.debezium.relational.history.FileDatabaseHistory; +import io.debezium.util.Testing; + +/** + * @author Randall Hauch + */ +public class MySqlConnectorJsonIT extends AbstractConnectorTest { + + private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-json.txt").toAbsolutePath(); + + private Configuration config; + + @Before + public void beforeEach() { + stopConnector(); + initializeConnectorTestFramework(); + Testing.Files.delete(DB_HISTORY_PATH); + } + + @After + public void afterEach() { + try { + stopConnector(); + } finally { + Testing.Files.delete(DB_HISTORY_PATH); + } + } + + @Test + @FixFor("DBZ-126") + public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshot() throws SQLException, InterruptedException { + // Use the DB configuration to define the connector's configuration ... + config = Configuration.create() + .with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.hostname")) + .with(MySqlConnectorConfig.PORT, System.getProperty("database.port")) + .with(MySqlConnectorConfig.USER, "snapper") + .with(MySqlConnectorConfig.PASSWORD, "snapperpass") + .with(MySqlConnectorConfig.SSL_MODE, SecureConnectionMode.DISABLED.name().toLowerCase()) + .with(MySqlConnectorConfig.SERVER_ID, 18765) + .with(MySqlConnectorConfig.SERVER_NAME, "jsonit_binlog") + .with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10) + .with(MySqlConnectorConfig.DATABASE_WHITELIST, "json_test") + .with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.toString()) + .with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH) + .build(); + // Start the connector ... + start(MySqlConnector.class, config); + + // --------------------------------------------------------------------------------------------------------------- + // Consume all of the events due to startup and initialization of the database + // --------------------------------------------------------------------------------------------------------------- + // Testing.Debug.enable(); + int numCreateDatabase = 1; + int numCreateTables = 1; + int numDataRecords = 1; + SourceRecords records = consumeRecordsByTopic(numCreateDatabase + numCreateTables + numDataRecords); + stopConnector(); + assertThat(records).isNotNull(); + assertThat(records.recordsForTopic("jsonit_binlog").size()).isEqualTo(numCreateDatabase + numCreateTables); + assertThat(records.recordsForTopic("jsonit_binlog.json_test.dbz_126_jsontable").size()).isEqualTo(1); + assertThat(records.topics().size()).isEqualTo(1 + numCreateTables); + assertThat(records.databaseNames().size()).isEqualTo(1); + assertThat(records.ddlRecordsForDatabase("json_test").size()).isEqualTo(numCreateDatabase + numCreateTables); + assertThat(records.ddlRecordsForDatabase("regression_test")).isNull(); + assertThat(records.ddlRecordsForDatabase("connector_test")).isNull(); + assertThat(records.ddlRecordsForDatabase("readbinlog_test")).isNull(); + records.ddlRecordsForDatabase("json_test").forEach(this::print); + + // Check that all records are valid, can be serialized and deserialized ... + records.forEach(this::validate); + List errors = new ArrayList<>(); + records.forEach(record -> { + Struct value = (Struct) record.value(); + if (record.topic().endsWith("dbz_126_jsontable")) { + Struct after = value.getStruct(Envelope.FieldName.AFTER); + Integer i = after.getInt32("id"); + assertThat(i).isNotNull(); + String json = after.getString("json"); + String expectedBinlog = after.getString("expectedBinlogStr"); + check(json,expectedBinlog,errors::add); + } + }); + if (!errors.isEmpty()) { + fail("" + errors.size() + " errors with JSON records..." + System.lineSeparator() + + String.join(System.lineSeparator(), errors)); + } + } + + @Test + public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLException, InterruptedException { + // Use the DB configuration to define the connector's configuration ... + config = Configuration.create() + .with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.hostname")) + .with(MySqlConnectorConfig.PORT, System.getProperty("database.port")) + .with(MySqlConnectorConfig.USER, "snapper") + .with(MySqlConnectorConfig.PASSWORD, "snapperpass") + .with(MySqlConnectorConfig.SSL_MODE, SecureConnectionMode.DISABLED.name().toLowerCase()) + .with(MySqlConnectorConfig.SERVER_ID, 18765) + .with(MySqlConnectorConfig.SERVER_NAME, "jsonit_snap") + .with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10) + .with(MySqlConnectorConfig.DATABASE_WHITELIST, "json_test") + .with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class) + .with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH) + .build(); + // Start the connector ... + start(MySqlConnector.class, config); + + // --------------------------------------------------------------------------------------------------------------- + // Consume all of the events due to startup and initialization of the database + // --------------------------------------------------------------------------------------------------------------- + //Testing.Debug.enable(); + int numTables = 1; + int numDataRecords = 1; + int numDdlRecords = numTables * 2 + 3; // for each table (1 drop + 1 create) + for each db (1 create + 1 drop + 1 use) + int numSetVariables = 1; + SourceRecords records = consumeRecordsByTopic(numDdlRecords + numSetVariables + numDataRecords); + stopConnector(); + assertThat(records).isNotNull(); + assertThat(records.recordsForTopic("jsonit_snap").size()).isEqualTo(numDdlRecords + numSetVariables); + assertThat(records.recordsForTopic("jsonit_snap.json_test.dbz_126_jsontable").size()).isEqualTo(1); + assertThat(records.topics().size()).isEqualTo(numTables + 1); + assertThat(records.databaseNames().size()).isEqualTo(2); + assertThat(records.databaseNames()).containsOnly("json_test", ""); + assertThat(records.ddlRecordsForDatabase("json_test").size()).isEqualTo(numDdlRecords); + assertThat(records.ddlRecordsForDatabase("regression_test")).isNull(); + assertThat(records.ddlRecordsForDatabase("connector_test")).isNull(); + assertThat(records.ddlRecordsForDatabase("readbinlog_test")).isNull(); + assertThat(records.ddlRecordsForDatabase("").size()).isEqualTo(1); // SET statement + records.ddlRecordsForDatabase("json_test").forEach(this::print); + + // Check that all records are valid, can be serialized and deserialized ... + records.forEach(this::validate); + List errors = new ArrayList<>(); + records.forEach(record -> { + Struct value = (Struct) record.value(); + if (record.topic().endsWith("dbz_126_jsontable")) { + Struct after = value.getStruct(Envelope.FieldName.AFTER); + Integer i = after.getInt32("id"); + assertThat(i).isNotNull(); + String json = after.getString("json"); + String expectedJdbc = after.getString("expectedJdbcStr"); + check(json,expectedJdbc,errors::add); + } + }); + if (!errors.isEmpty()) { + fail("" + errors.size() + " errors with JSON records..." + System.lineSeparator() + + String.join(System.lineSeparator(), errors)); + } + } + + protected void check(String json, String expectedBinlog, Consumer msg ) { + if ((json == null && expectedBinlog != null) || (json != null && !json.equals(expectedBinlog))) { + msg.accept("JSON was: " + json + System.lineSeparator() + "but expected: " + expectedBinlog); + } else { + assertThat(json).isEqualTo(expectedBinlog); + } + } + +} diff --git a/pom.xml b/pom.xml index 4f246f390..088736b23 100644 --- a/pom.xml +++ b/pom.xml @@ -64,7 +64,7 @@ 9.4 5.7 5.1.39 - 0.4.2 + 0.5.1 3.2.6 3.2.2