Merge pull request #119 from rhauch/dbz-126

DBZ-126 Added support for MySQL JSON type
This commit is contained in:
Randall Hauch 2016-10-19 08:49:54 -05:00 committed by GitHub
commit 028d6442e2
6 changed files with 407 additions and 14 deletions

View File

@ -5,6 +5,7 @@
*/
package io.debezium.connector.mysql;
import java.io.IOException;
import java.nio.ByteOrder;
import java.nio.charset.Charset;
import java.nio.charset.IllegalCharsetNameException;
@ -17,12 +18,15 @@
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.json.JsonBinary;
import com.mysql.jdbc.CharsetMapping;
import io.debezium.annotation.Immutable;
import io.debezium.data.Json;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.relational.Column;
import io.debezium.relational.ValueConverter;
@ -83,6 +87,9 @@ protected ByteOrder byteOrderOfBitType() {
public SchemaBuilder schemaBuilder(Column column) {
// Handle a few MySQL-specific types based upon how they are handled by the MySQL binlog client ...
String typeName = column.typeName().toUpperCase();
if (matches(typeName, "JSON")) {
return Json.builder();
}
if (matches(typeName, "YEAR")) {
return Year.builder();
}
@ -102,6 +109,9 @@ public SchemaBuilder schemaBuilder(Column column) {
public ValueConverter converter(Column column, Field fieldDefn) {
// Handle a few MySQL-specific types based upon how they are handled by the MySQL binlog client ...
String typeName = column.typeName().toUpperCase();
if (matches(typeName, "JSON")) {
return (data) -> convertJson(column, fieldDefn, data);
}
if (matches(typeName, "YEAR")) {
return (data) -> convertYearToInt(column, fieldDefn, data);
}
@ -168,6 +178,40 @@ protected Charset charsetFor(Column column) {
return null;
}
/**
* Convert the {@link String} {@code byte[]} value to a string value used in a {@link SourceRecord}.
*
* @param column the column in which the value appears
* @param fieldDefn the field definition for the {@link SourceRecord}'s {@link Schema}; never null
* @param data the data; may be null
* @return the converted value, or null if the conversion could not be made and the column allows nulls
* @throws IllegalArgumentException if the value could not be converted but the column does not allow nulls
*/
protected Object convertJson(Column column, Field fieldDefn, Object data) {
if (data == null) {
data = fieldDefn.schema().defaultValue();
}
if (data == null) {
if (column.isOptional()) return null;
return "{}";
}
if (data instanceof byte[]) {
// The BinlogReader sees these JSON values as binary encoded, so we use the binlog client library's utility
// to parse MySQL's internal binary representation into a JSON string, using the standard formatter.
try {
String json = JsonBinary.parseAsString((byte[])data);
return json;
} catch ( IOException e) {
throw new ConnectException("Failed to parse and read a JSON value on " + column + ": " + e.getMessage(), e);
}
}
if (data instanceof String) {
// The SnapshotReader sees JSON values as UTF-8 encoded strings.
return data;
}
return handleUnknownData(column, fieldDefn, data);
}
/**
* Convert the {@link String} or {@code byte[]} value to a string value used in a {@link SourceRecord}.
*

View File

@ -101,6 +101,11 @@ protected Serializable deserializeTimestamp(ByteArrayInputStream inputStream) th
protected Serializable deserializeTimestampV2(int meta, ByteArrayInputStream inputStream) throws IOException {
return RowDeserializers.deserializeTimestampV2(meta, inputStream);
}
@Override
protected Serializable deserializeYear(ByteArrayInputStream inputStream) throws IOException {
return RowDeserializers.deserializeYear(inputStream);
}
}
/**
@ -158,6 +163,11 @@ protected Serializable deserializeTimestamp(ByteArrayInputStream inputStream) th
protected Serializable deserializeTimestampV2(int meta, ByteArrayInputStream inputStream) throws IOException {
return RowDeserializers.deserializeTimestampV2(meta, inputStream);
}
@Override
protected Serializable deserializeYear(ByteArrayInputStream inputStream) throws IOException {
return RowDeserializers.deserializeYear(inputStream);
}
}
/**

View File

@ -180,7 +180,6 @@ VALUES (default, '2016-01-16', 1001, 1, 102),
-- The integration test for this database expects to scans all of the binlog events associated with this database
-- without error or problems. The integration test does not modify any records in this database, so this script
-- must contain all operations to these tables.
#
CREATE DATABASE regression_test;
USE regression_test;
@ -274,3 +273,146 @@ INSERT INTO dbz_123_bitvaluetest VALUES (b'1',b'10',b'01000000',b'10110111000001
-- DBZ-104 handle create table like ...
CREATE TABLE dbz_104_customers LIKE connector_test.customers;
INSERT INTO dbz_104_customers SELECT * FROM connector_test.customers;
-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: json_test
-- ----------------------------------------------------------------------------------------------------------------
-- The integration test for this database expects to scans all of the binlog events associated with this database
-- without error or problems. The integration test does not modify any records in this database, so this script
-- must contain all operations to these tables.
--
-- This relies upon MySQL 5.7's JSON datatype.
CREATE DATABASE json_test;
USE json_test;
-- DBZ-126 handle JSON column types ...
CREATE TABLE dbz_126_jsontable (
id INT AUTO_INCREMENT NOT NULL,
json JSON,
expectedJdbcStr VARCHAR(256), -- value that we get back from JDBC
expectedBinlogStr VARCHAR(256), -- value we parse from the binlog
PRIMARY KEY(id)
) DEFAULT CHARSET=utf8;
INSERT INTO dbz_126_jsontable VALUES (default,NULL,
NULL,
NULL);
INSERT INTO dbz_126_jsontable VALUES (default,'{"a": 2}',
'{"a": 2}',
'{"a":2}');
INSERT INTO dbz_126_jsontable VALUES (default,'[1, 2]',
'[1, 2]',
'[1,2]');
INSERT INTO dbz_126_jsontable VALUES (default,'{"key1": "value1", "key2": "value2"}',
'{"key1": "value1", "key2": "value2"}',
'{"key1":"value1","key2":"value2"}');
INSERT INTO dbz_126_jsontable VALUES (default,'["a", "b",1]',
'["a", "b",1]',
'["a","b",1]');
INSERT INTO dbz_126_jsontable VALUES (default,'{"k1": "v1", "k2": {"k21": "v21", "k22": "v22"}, "k3": ["a", "b", 1]}',
'{"k1": "v1", "k2": {"k21": "v21", "k22": "v22"}, "k3": ["a", "b", 1]}',
'{"k1":"v1","k2":{"k21":"v21","k22":"v22"},"k3":["a","b",1]}');
INSERT INTO dbz_126_jsontable VALUES (default,'{"a": "b", "c": "d", "ab": "abc", "bc": ["x", "y"]}',
'{"a": "b", "c": "d", "ab": "abc", "bc": ["x", "y"]}',
'{"a":"b","c":"d","ab":"abc","bc":["x","y"]}');
INSERT INTO dbz_126_jsontable VALUES (default,'["here", ["I", "am"], "!!!"]',
'["here", ["I", "am"], "!!!"]',
'["here",["I","am"],"!!!"]');
INSERT INTO dbz_126_jsontable VALUES (default,'"scalar string"',
'"scalar string"',
'"scalar string"');
INSERT INTO dbz_126_jsontable VALUES (default,'true',
'true',
'true');
INSERT INTO dbz_126_jsontable VALUES (default,'false',
'false',
'false');
INSERT INTO dbz_126_jsontable VALUES (default,'null',
'null',
'null');
INSERT INTO dbz_126_jsontable VALUES (default,'-1',
'-1',
'-1');
INSERT INTO dbz_126_jsontable VALUES (default,CAST(CAST(1 AS UNSIGNED) AS JSON),
'1',
'1');
INSERT INTO dbz_126_jsontable VALUES (default,'32767',
'32767',
'32767');
INSERT INTO dbz_126_jsontable VALUES (default,'32768',
'32768',
'32768');
INSERT INTO dbz_126_jsontable VALUES (default,'-32768',
'-32768',
'-32768');
INSERT INTO dbz_126_jsontable VALUES (default,'2147483647', -- INT32
'2147483647',
'2147483647');
INSERT INTO dbz_126_jsontable VALUES (default,'2147483648', -- INT64
'2147483648',
'2147483648');
INSERT INTO dbz_126_jsontable VALUES (default,'-2147483648', -- INT32
'-2147483648',
'-2147483648');
INSERT INTO dbz_126_jsontable VALUES (default,'-2147483649', -- INT64
'-2147483649',
'-2147483649');
INSERT INTO dbz_126_jsontable VALUES (default,'18446744073709551615', -- INT64
'18446744073709551615',
'18446744073709551615');
INSERT INTO dbz_126_jsontable VALUES (default,'18446744073709551616', -- BigInteger
'18446744073709551616',
'18446744073709551616');
INSERT INTO dbz_126_jsontable VALUES (default,'3.14',
'3.14',
'3.14');
INSERT INTO dbz_126_jsontable VALUES (default,'{}',
'{}',
'{}');
INSERT INTO dbz_126_jsontable VALUES (default,'[]',
'[]',
'[]');
INSERT INTO dbz_126_jsontable VALUES (default,CAST(CAST('2015-01-15 23:24:25' AS DATETIME) AS JSON),
'"2015-01-15 23:24:25"',
'"2015-01-15 23:24:25"');
INSERT INTO dbz_126_jsontable VALUES (default,CAST(CAST('2015-01-15 23:24:25.12' AS DATETIME(3)) AS JSON),
'"2015-01-15 23:24:25.12"',
'"2015-01-15 23:24:25.12"');
INSERT INTO dbz_126_jsontable VALUES (default,CAST(CAST('2015-01-15 23:24:25.0237' AS DATETIME(3)) AS JSON),
'"2015-01-15 23:24:25.024"',
'"2015-01-15 23:24:25.024"');
INSERT INTO dbz_126_jsontable VALUES (default,CAST(CAST('23:24:25' AS TIME) AS JSON),
'"23:24:25"',
'"23:24:25"');
INSERT INTO dbz_126_jsontable VALUES (default,CAST(CAST('23:24:25.12' AS TIME(3)) AS JSON),
'"23:24:25.12"',
'"23:24:25.12"');
INSERT INTO dbz_126_jsontable VALUES (default,CAST(CAST('23:24:25.0237' AS TIME(3)) AS JSON),
'"23:24:25.024"',
'"23:24:25.024"');
INSERT INTO dbz_126_jsontable VALUES (default,CAST(CAST('2015-01-15' AS DATE) AS JSON),
'"2015-01-15"',
'"2015-01-15"');
INSERT INTO dbz_126_jsontable VALUES (default,CAST(TIMESTAMP'2015-01-15 23:24:25' AS JSON),
'"2015-01-15 23:24:25"',
'"2015-01-15 23:24:25"');
INSERT INTO dbz_126_jsontable VALUES (default,CAST(TIMESTAMP'2015-01-15 23:24:25.12' AS JSON),
'"2015-01-15 23:24:25.12"',
'"2015-01-15 23:24:25.12"');
INSERT INTO dbz_126_jsontable VALUES (default,CAST(TIMESTAMP'2015-01-15 23:24:25.0237' AS JSON),
'"2015-01-15 23:24:25.0237"',
'"2015-01-15 23:24:25.0237"');
INSERT INTO dbz_126_jsontable VALUES (default,CAST(UNIX_TIMESTAMP('2015-01-15 23:24:25') AS JSON),
'1421364265',
'1421364265');
INSERT INTO dbz_126_jsontable VALUES (default,CAST(ST_GeomFromText('POINT(1 1)') AS JSON),
'{\"type\": \"Point\", \"coordinates\": [1.0, 1.0]}',
'{\"type\":\"Point\",\"coordinates\":[1.0,1.0]}');
INSERT INTO dbz_126_jsontable VALUES (default,CAST('[]' AS CHAR CHARACTER SET 'ascii'),
'[]',
'[]');
INSERT INTO dbz_126_jsontable VALUES (default,CAST(x'cafe' AS JSON), -- BLOB as Base64
'"yv4="',
'"yv4="');
INSERT INTO dbz_126_jsontable VALUES (default,CAST(x'cafebabe' AS JSON), -- BLOB as Base64
'"yv66vg=="',
'"yv66vg=="');

View File

@ -226,23 +226,32 @@ public void shouldValidateAcceptableConfiguration() {
Recommender dbNameRecommender = MySqlConnectorConfig.DATABASE_WHITELIST.recommender();
List<Object> dbNames = dbNameRecommender.validValues(MySqlConnectorConfig.DATABASE_WHITELIST, config);
Testing.debug("List of dbNames: " + dbNames);
assertThat(dbNames).containsOnly("connector_test", "readbinlog_test", "regression_test",
assertThat(dbNames).containsOnly("connector_test", "readbinlog_test", "regression_test", "json_test",
"connector_test_ro", "emptydb");
Recommender tableNameRecommender = MySqlConnectorConfig.TABLE_WHITELIST.recommender();
List<Object> tableNames = tableNameRecommender.validValues(MySqlConnectorConfig.TABLE_WHITELIST, config);
assertThat(tableNames).contains("connector_test.customers",
"connector_test.orders",
"connector_test.products",
"connector_test.products_on_hand",
"connector_test_ro.customers",
"connector_test_ro.orders",
"connector_test_ro.products",
"connector_test_ro.products_on_hand",
"regression_test.t1464075356413_testtable6",
"regression_test.dbz_85_fractest",
"regression_test.dbz84_integer_types_table");
Testing.debug("List of tableNames: " + tableNames);
assertThat(tableNames).containsOnly("readbinlog_test.product",
"readbinlog_test.purchased",
"readbinlog_test.person",
"connector_test.customers",
"connector_test.orders",
"connector_test.products",
"connector_test.products_on_hand",
"connector_test_ro.customers",
"connector_test_ro.orders",
"connector_test_ro.products",
"connector_test_ro.products_on_hand",
"regression_test.t1464075356413_testtable6",
"regression_test.dbz_85_fractest",
"regression_test.dbz84_integer_types_table",
"regression_test.dbz_100_enumsettest",
"regression_test.dbz_102_charsettest",
"regression_test.dbz_114_zerovaluetest",
"regression_test.dbz_123_bitvaluetest",
"regression_test.dbz_104_customers",
"json_test.dbz_126_jsontable");
// Now set the whitelist to two databases ...
Configuration config2 = config.edit()

View File

@ -0,0 +1,188 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import static org.junit.Assert.fail;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import org.apache.kafka.connect.data.Struct;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.fest.assertions.Assertions.assertThat;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
import io.debezium.data.Envelope;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.util.Testing;
/**
* @author Randall Hauch
*/
public class MySqlConnectorJsonIT extends AbstractConnectorTest {
private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-json.txt").toAbsolutePath();
private Configuration config;
@Before
public void beforeEach() {
stopConnector();
initializeConnectorTestFramework();
Testing.Files.delete(DB_HISTORY_PATH);
}
@After
public void afterEach() {
try {
stopConnector();
} finally {
Testing.Files.delete(DB_HISTORY_PATH);
}
}
@Test
@FixFor("DBZ-126")
public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshot() throws SQLException, InterruptedException {
// Use the DB configuration to define the connector's configuration ...
config = Configuration.create()
.with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.hostname"))
.with(MySqlConnectorConfig.PORT, System.getProperty("database.port"))
.with(MySqlConnectorConfig.USER, "snapper")
.with(MySqlConnectorConfig.PASSWORD, "snapperpass")
.with(MySqlConnectorConfig.SSL_MODE, SecureConnectionMode.DISABLED.name().toLowerCase())
.with(MySqlConnectorConfig.SERVER_ID, 18765)
.with(MySqlConnectorConfig.SERVER_NAME, "jsonit_binlog")
.with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10)
.with(MySqlConnectorConfig.DATABASE_WHITELIST, "json_test")
.with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class)
.with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.toString())
.with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH)
.build();
// Start the connector ...
start(MySqlConnector.class, config);
// ---------------------------------------------------------------------------------------------------------------
// Consume all of the events due to startup and initialization of the database
// ---------------------------------------------------------------------------------------------------------------
// Testing.Debug.enable();
int numCreateDatabase = 1;
int numCreateTables = 1;
int numDataRecords = 1;
SourceRecords records = consumeRecordsByTopic(numCreateDatabase + numCreateTables + numDataRecords);
stopConnector();
assertThat(records).isNotNull();
assertThat(records.recordsForTopic("jsonit_binlog").size()).isEqualTo(numCreateDatabase + numCreateTables);
assertThat(records.recordsForTopic("jsonit_binlog.json_test.dbz_126_jsontable").size()).isEqualTo(1);
assertThat(records.topics().size()).isEqualTo(1 + numCreateTables);
assertThat(records.databaseNames().size()).isEqualTo(1);
assertThat(records.ddlRecordsForDatabase("json_test").size()).isEqualTo(numCreateDatabase + numCreateTables);
assertThat(records.ddlRecordsForDatabase("regression_test")).isNull();
assertThat(records.ddlRecordsForDatabase("connector_test")).isNull();
assertThat(records.ddlRecordsForDatabase("readbinlog_test")).isNull();
records.ddlRecordsForDatabase("json_test").forEach(this::print);
// Check that all records are valid, can be serialized and deserialized ...
records.forEach(this::validate);
List<String> errors = new ArrayList<>();
records.forEach(record -> {
Struct value = (Struct) record.value();
if (record.topic().endsWith("dbz_126_jsontable")) {
Struct after = value.getStruct(Envelope.FieldName.AFTER);
Integer i = after.getInt32("id");
assertThat(i).isNotNull();
String json = after.getString("json");
String expectedBinlog = after.getString("expectedBinlogStr");
check(json,expectedBinlog,errors::add);
}
});
if (!errors.isEmpty()) {
fail("" + errors.size() + " errors with JSON records..." + System.lineSeparator() +
String.join(System.lineSeparator(), errors));
}
}
@Test
public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLException, InterruptedException {
// Use the DB configuration to define the connector's configuration ...
config = Configuration.create()
.with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.hostname"))
.with(MySqlConnectorConfig.PORT, System.getProperty("database.port"))
.with(MySqlConnectorConfig.USER, "snapper")
.with(MySqlConnectorConfig.PASSWORD, "snapperpass")
.with(MySqlConnectorConfig.SSL_MODE, SecureConnectionMode.DISABLED.name().toLowerCase())
.with(MySqlConnectorConfig.SERVER_ID, 18765)
.with(MySqlConnectorConfig.SERVER_NAME, "jsonit_snap")
.with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10)
.with(MySqlConnectorConfig.DATABASE_WHITELIST, "json_test")
.with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class)
.with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH)
.build();
// Start the connector ...
start(MySqlConnector.class, config);
// ---------------------------------------------------------------------------------------------------------------
// Consume all of the events due to startup and initialization of the database
// ---------------------------------------------------------------------------------------------------------------
//Testing.Debug.enable();
int numTables = 1;
int numDataRecords = 1;
int numDdlRecords = numTables * 2 + 3; // for each table (1 drop + 1 create) + for each db (1 create + 1 drop + 1 use)
int numSetVariables = 1;
SourceRecords records = consumeRecordsByTopic(numDdlRecords + numSetVariables + numDataRecords);
stopConnector();
assertThat(records).isNotNull();
assertThat(records.recordsForTopic("jsonit_snap").size()).isEqualTo(numDdlRecords + numSetVariables);
assertThat(records.recordsForTopic("jsonit_snap.json_test.dbz_126_jsontable").size()).isEqualTo(1);
assertThat(records.topics().size()).isEqualTo(numTables + 1);
assertThat(records.databaseNames().size()).isEqualTo(2);
assertThat(records.databaseNames()).containsOnly("json_test", "");
assertThat(records.ddlRecordsForDatabase("json_test").size()).isEqualTo(numDdlRecords);
assertThat(records.ddlRecordsForDatabase("regression_test")).isNull();
assertThat(records.ddlRecordsForDatabase("connector_test")).isNull();
assertThat(records.ddlRecordsForDatabase("readbinlog_test")).isNull();
assertThat(records.ddlRecordsForDatabase("").size()).isEqualTo(1); // SET statement
records.ddlRecordsForDatabase("json_test").forEach(this::print);
// Check that all records are valid, can be serialized and deserialized ...
records.forEach(this::validate);
List<String> errors = new ArrayList<>();
records.forEach(record -> {
Struct value = (Struct) record.value();
if (record.topic().endsWith("dbz_126_jsontable")) {
Struct after = value.getStruct(Envelope.FieldName.AFTER);
Integer i = after.getInt32("id");
assertThat(i).isNotNull();
String json = after.getString("json");
String expectedJdbc = after.getString("expectedJdbcStr");
check(json,expectedJdbc,errors::add);
}
});
if (!errors.isEmpty()) {
fail("" + errors.size() + " errors with JSON records..." + System.lineSeparator() +
String.join(System.lineSeparator(), errors));
}
}
protected void check(String json, String expectedBinlog, Consumer<String> msg ) {
if ((json == null && expectedBinlog != null) || (json != null && !json.equals(expectedBinlog))) {
msg.accept("JSON was: " + json + System.lineSeparator() + "but expected: " + expectedBinlog);
} else {
assertThat(json).isEqualTo(expectedBinlog);
}
}
}

View File

@ -64,7 +64,7 @@
<version.postgresql.server>9.4</version.postgresql.server>
<version.mysql.server>5.7</version.mysql.server>
<version.mysql.driver>5.1.39</version.mysql.driver>
<version.mysql.binlog>0.4.2</version.mysql.binlog>
<version.mysql.binlog>0.5.1</version.mysql.binlog>
<version.mongo.server>3.2.6</version.mongo.server>
<version.mongo.driver>3.2.2</version.mongo.driver>