DBZ-61 Improved MySQL connector's handling of binary values

Binary values read from the MySQL binlog may include strings, in which case they need to be converted to binary values.

Interestingly, work on this uncovered [KAFKA-3803](https://issues.apache.org/jira/browse/KAFKA-3803) whereby Kafka Connect's `Struct.equals` method does not properly handle comparing `byte[]` values. Upon researching the problem and potentially supplying a patch, it was discovered that the Kafka Connect codebase and the Avro converter all use `ByteBuffer` objects rather than `byte[]`. Consequently, the Debezium code that converts JDBC values to Kafka Connect values was changed to return `ByteBuffer` objects rather than `byte[]` objects.

Unfortunately, the JSON converter rehydrates objects with just `byte[]`, so that still means that Debezium's `VerifyRecords` logic cannot rely upon `Struct.equals` for comparison, and instead needs custom logic.
This commit is contained in:
Randall Hauch 2016-06-07 17:30:39 -05:00
parent 4f02efc788
commit a143871abd
5 changed files with 643 additions and 120 deletions

View File

@ -172,3 +172,24 @@ VALUES (default, '2016-01-16', 1001, 1, 102),
(default, '2016-02-18', 1004, 3, 109),
(default, '2016-02-19', 1002, 2, 106),
(default, '2016-02-21', 1003, 1, 107);
# ----------------------------------------------------------------------------------------------------------------
# DATABASE: regression_test
# ----------------------------------------------------------------------------------------------------------------
# The integration test for this database expects to scans all of the binlog events associated with this database
# without error or problems. The integration test does not modify any records in this database, so this script
# must contain all operations to these tables.
#
CREATE DATABASE regression_test;
USE regression_test;
# DBZ-61 handle binary value recorded as hex string value
CREATE TABLE t1464075356413_testtable6 (
pk_column int auto_increment NOT NULL,
varbinary_col varbinary(20) NOT NULL,
PRIMARY KEY(pk_column)
);
INSERT INTO t1464075356413_testtable6 (pk_column, varbinary_col)
VALUES(default, 0x4D7953514C)

View File

@ -0,0 +1,89 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import java.nio.file.Path;
import java.sql.SQLException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.fest.assertions.Assertions.assertThat;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.util.Testing;
/**
* @author Randall Hauch
*/
public class MySqlConnectorRegressionIT extends AbstractConnectorTest {
private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-regression.txt").toAbsolutePath();
private Configuration config;
@Before
public void beforeEach() {
stopConnector();
initializeConnectorTestFramework();
Testing.Files.delete(DB_HISTORY_PATH);
}
@After
public void afterEach() {
try {
stopConnector();
} finally {
Testing.Files.delete(DB_HISTORY_PATH);
}
}
@Test
public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshot() throws SQLException, InterruptedException {
// Use the DB configuration to define the connector's configuration ...
config = Configuration.create()
.with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.hostname"))
.with(MySqlConnectorConfig.PORT, System.getProperty("database.port"))
.with(MySqlConnectorConfig.USER, "snapper")
.with(MySqlConnectorConfig.PASSWORD, "snapperpass")
.with(MySqlConnectorConfig.SERVER_ID, 18765)
.with(MySqlConnectorConfig.SERVER_NAME, "regression")
.with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10)
.with(MySqlConnectorConfig.DATABASE_WHITELIST, "regression_test")
.with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class)
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.toString())
.with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH)
.with("database.useSSL",false) // eliminates MySQL driver warning about SSL connections
.build();
// Start the connector ...
start(MySqlConnector.class, config);
// ---------------------------------------------------------------------------------------------------------------
// Consume all of the events due to startup and initialization of the database
// ---------------------------------------------------------------------------------------------------------------
//Testing.Debug.enable();
SourceRecords records = consumeRecordsByTopic(2+1); // 2 schema change record, 1 insert
stopConnector();
assertThat(records).isNotNull();
assertThat(records.recordsForTopic("regression").size()).isEqualTo(2);
assertThat(records.recordsForTopic("regression.regression_test.t1464075356413_testtable6").size()).isEqualTo(1);
assertThat(records.topics().size()).isEqualTo(2);
assertThat(records.databaseNames().size()).isEqualTo(1);
assertThat(records.ddlRecordsForDatabase("regression_test").size()).isEqualTo(2);
assertThat(records.ddlRecordsForDatabase("connector_test")).isNull();
assertThat(records.ddlRecordsForDatabase("readbinlog_test")).isNull();
records.ddlRecordsForDatabase("regression_test").forEach(this::print);
// Check that all records are valid, can be serialized and deserialized ...
records.forEach(this::validate);
}
}

View File

@ -5,6 +5,8 @@
*/
package io.debezium.data;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.List;
import java.util.Map;
@ -169,6 +171,11 @@ public RecordWriter append(Object obj) {
appendFirst(field.name(), s.get(field));
}
sb.append('}');
} else if (obj instanceof ByteBuffer) {
ByteBuffer b = (ByteBuffer) obj;
sb.append('"').append(Base64.getEncoder().encode(b.array())).append('"');
} else if (obj instanceof byte[]) {
sb.append('"').append(Base64.getEncoder().encode((byte[])obj)).append('"');
} else if (obj instanceof Map<?, ?>) {
Map<?, ?> map = (Map<?, ?>) obj;
sb.append('{');

View File

@ -6,6 +6,7 @@
package io.debezium.relational;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
@ -105,7 +106,8 @@ public TableSchema create(ResultSet resultSet, String name) throws SQLException
* <p>
* This is equivalent to calling {@code create(table,false)}.
*
* @param schemaPrefix the prefix added to the table identifier to construct the schema names; may be null if there is no prefix
* @param schemaPrefix the prefix added to the table identifier to construct the schema names; may be null if there is no
* prefix
* @param table the table definition; may not be null
* @return the table schema that can be used for sending rows of data for this table to Kafka Connect; never null
*/
@ -121,7 +123,8 @@ public TableSchema create(String schemaPrefix, Table table) {
* <p>
* This is equivalent to calling {@code create(table,false)}.
*
* @param schemaPrefix the prefix added to the table identifier to construct the schema names; may be null if there is no prefix
* @param schemaPrefix the prefix added to the table identifier to construct the schema names; may be null if there is no
* prefix
* @param table the table definition; may not be null
* @param filter the filter that specifies whether columns in the table should be included; may be null if all columns
* are to be included
@ -129,7 +132,7 @@ public TableSchema create(String schemaPrefix, Table table) {
* @return the table schema that can be used for sending rows of data for this table to Kafka Connect; never null
*/
public TableSchema create(String schemaPrefix, Table table, Predicate<ColumnId> filter, ColumnMappers mappers) {
if ( schemaPrefix == null ) schemaPrefix = "";
if (schemaPrefix == null) schemaPrefix = "";
// Build the schemas ...
final TableId tableId = table.id();
final String tableIdStr = tableId.toString();
@ -285,7 +288,10 @@ protected ValueConverter[] convertersForColumns(Schema schema, TableId tableId,
if (mappers != null) {
ValueConverter mappingConverter = mappers.mappingConverterFor(tableId, column);
if (mappingConverter != null) {
converter = (value) -> mappingConverter.convert(valueConverter.convert(value));
converter = (value) -> {
if (value != null) value = valueConverter.convert(value);
return mappingConverter.convert(value);
};
}
}
if (converter == null) converter = valueConverter;
@ -376,7 +382,7 @@ protected void addField(SchemaBuilder builder, Column column, ColumnMapper mappe
case Types.NCLOB:
fieldBuilder = SchemaBuilder.string();
break;
// Variable-length string values
case Types.VARCHAR:
case Types.LONGVARCHAR:
@ -424,7 +430,7 @@ protected void addField(SchemaBuilder builder, Column column, ColumnMapper mappe
if (fieldBuilder != null) {
if (mapper != null) {
// Let the mapper add properties to the schema ...
mapper.alterFieldSchema(column,fieldBuilder);
mapper.alterFieldSchema(column, fieldBuilder);
}
if (column.isOptional()) fieldBuilder.optional();
builder.field(column.name(), fieldBuilder.build());
@ -450,6 +456,14 @@ protected SchemaBuilder addOtherField(Column column, ColumnMapper mapper) {
/**
* Create a {@link ValueConverter} that can be used to convert row values for the given column into the Kafka Connect value
* object described by the {@link Field field definition}.
* <p>
* Subclasses can override this method to specialize the behavior. The subclass method should do custom checks and
* conversions,
* and then delegate to this method implementation to handle all other cases.
* <p>
* Alternatively, subclasses can leave this method as-is and instead override one of the lower-level type-specific methods
* that this method calls (e.g., {@link #convertBinary(Column, Field, Object)},
* {@link #convertTinyInt(Column, Field, Object)}, etc.).
*
* @param column the column describing the input values; never null
* @param fieldDefn the definition for the field in a Kafka Connect {@link Schema} describing the output of the function;
@ -461,94 +475,38 @@ protected ValueConverter createValueConverterFor(Column column, Field fieldDefn)
case Types.NULL:
return (data) -> null;
case Types.BIT:
return (data) -> convertBit(column, fieldDefn, data);
case Types.BOOLEAN:
return (data) -> {
if (data instanceof Boolean) return (Boolean) data;
if (data instanceof Short) return ((Short) data).intValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
if (data instanceof Integer) return ((Integer) data).intValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
if (data instanceof Long) return ((Long) data).intValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
return handleUnknownData(column, fieldDefn, data);
};
return (data) -> convertBoolean(column, fieldDefn, data);
// Binary values ...
case Types.BLOB:
case Types.BINARY:
case Types.VARBINARY:
case Types.LONGVARBINARY:
return (data) -> (byte[]) data;
return (data) -> convertBinary(column, fieldDefn, data);
// Numeric integers
case Types.TINYINT:
return (data) -> {
if (data instanceof Byte) return (Byte) data;
if (data instanceof Boolean) return ((Boolean) data).booleanValue() ? (byte) 1 : (byte) 0;
return handleUnknownData(column, fieldDefn, data);
};
return (data) -> convertTinyInt(column, fieldDefn, data);
case Types.SMALLINT:
return (data) -> {
if (data instanceof Short) return (Short) data;
if (data instanceof Integer) return new Short(((Integer) data).shortValue());
if (data instanceof Long) return new Short(((Long) data).shortValue());
return handleUnknownData(column, fieldDefn, data);
};
return (data) -> convertSmallInt(column, fieldDefn, data);
case Types.INTEGER:
return (data) -> {
if (data instanceof Integer) return (Integer) data;
if (data instanceof Short) return new Integer(((Short) data).intValue());
if (data instanceof Long) return new Integer(((Long) data).intValue());
return handleUnknownData(column, fieldDefn, data);
};
return (data) -> convertInteger(column, fieldDefn, data);
case Types.BIGINT:
return (data) -> {
if (data instanceof Long) return (Long) data;
if (data instanceof Integer) return new Long(((Integer) data).longValue());
if (data instanceof Short) return new Long(((Short) data).longValue());
return handleUnknownData(column, fieldDefn, data);
};
return (data) -> convertBigInt(column, fieldDefn, data);
// Numeric decimal numbers
case Types.FLOAT:
return (data) -> convertFloat(column, fieldDefn, data);
case Types.DOUBLE:
return (data) -> {
if (data instanceof Double) return (Double) data;
if (data instanceof Float) return new Double(((Float) data).doubleValue());
if (data instanceof Integer) return new Double(((Integer) data).doubleValue());
if (data instanceof Long) return new Double(((Long) data).doubleValue());
if (data instanceof Short) return new Double(((Short) data).doubleValue());
return handleUnknownData(column, fieldDefn, data);
};
return (data) -> convertDouble(column, fieldDefn, data);
case Types.REAL:
return (data) -> {
if (data instanceof Float) return (Float) data;
if (data instanceof Double) return new Float(((Double) data).floatValue());
if (data instanceof Integer) return new Float(((Integer) data).floatValue());
if (data instanceof Long) return new Float(((Long) data).floatValue());
if (data instanceof Short) return new Float(((Short) data).floatValue());
return handleUnknownData(column, fieldDefn, data);
};
return (data) -> convertReal(column, fieldDefn, data);
case Types.NUMERIC:
return (data) -> convertNumeric(column, fieldDefn, data);
case Types.DECIMAL:
return (data) -> {
BigDecimal decimal = null;
if (data instanceof BigDecimal)
decimal = (BigDecimal) data;
else if (data instanceof Boolean)
decimal = new BigDecimal(((Boolean) data).booleanValue() ? 1 : 0);
else if (data instanceof Short)
decimal = new BigDecimal(((Short) data).intValue());
else if (data instanceof Integer)
decimal = new BigDecimal(((Integer) data).intValue());
else if (data instanceof Long)
decimal = BigDecimal.valueOf(((Long) data).longValue());
else if (data instanceof Float)
decimal = BigDecimal.valueOf(((Float) data).doubleValue());
else if (data instanceof Double)
decimal = BigDecimal.valueOf(((Double) data).doubleValue());
else {
handleUnknownData(column, fieldDefn, data);
}
return decimal;
};
return (data) -> convertDecimal(column, fieldDefn, data);
// String values
case Types.CHAR: // variable-length
@ -561,26 +519,23 @@ else if (data instanceof Double)
case Types.NCLOB: // fixed-length
case Types.DATALINK:
case Types.SQLXML:
return (data) -> data.toString();
return (data) -> convertString(column, fieldDefn, data);
// Date and time values
case Types.DATE:
return (data) -> convertDate(fieldDefn, data);
return (data) -> convertDate(column, fieldDefn, data);
case Types.TIME:
return (data) -> convertTime(fieldDefn, data);
return (data) -> convertTime(column, fieldDefn, data);
case Types.TIMESTAMP:
return (data) -> convertTimestamp(fieldDefn, data);
return (data) -> convertTimestamp(column, fieldDefn, data);
case Types.TIME_WITH_TIMEZONE:
return (data) -> convertTimeWithZone(fieldDefn, data);
return (data) -> convertTimeWithZone(column, fieldDefn, data);
case Types.TIMESTAMP_WITH_TIMEZONE:
return (data) -> convertTimestampWithZone(fieldDefn, data);
return (data) -> convertTimestampWithZone(column, fieldDefn, data);
// Other types ...
case Types.ROWID:
return (data) -> {
java.sql.RowId rowId = (java.sql.RowId) data;
return rowId.getBytes();
};
return (data) -> convertRowId(column, fieldDefn, data);
// Unhandled types
case Types.ARRAY:
@ -620,11 +575,13 @@ protected Object handleUnknownData(Column column, Field fieldDefn, Object data)
* This method handles several types of objects, including {@link OffsetDateTime}, {@link java.sql.Timestamp},
* {@link java.util.Date}, {@link java.time.LocalTime}, and {@link java.time.LocalDateTime}.
*
* @param column the column definition describing the {@code data} value; never null
* @param fieldDefn the field definition; never null
* @param data the data object to be converted into a {@link Date Kafka Connect date} type; never null
* @return the converted value, or null if the conversion could not be made
*/
protected Object convertTimestampWithZone(Field fieldDefn, Object data) {
protected Object convertTimestampWithZone(Column column, Field fieldDefn, Object data) {
if (data == null) return null;
OffsetDateTime dateTime = null;
if (data instanceof OffsetDateTime) {
// JDBC specification indicates that this will be the canonical object for this JDBC type.
@ -679,11 +636,13 @@ protected OffsetDateTime unexpectedTimestampWithZone(Object value, Field fieldDe
* {@link java.time.LocalTime}, and {@link java.time.LocalDateTime}. If any of the types have date components, those date
* components are ignored.
*
* @param column the column definition describing the {@code data} value; never null
* @param fieldDefn the field definition; never null
* @param data the data object to be converted into a {@link Date Kafka Connect date} type; never null
* @return the converted value, or null if the conversion could not be made
*/
protected Object convertTimeWithZone(Field fieldDefn, Object data) {
protected Object convertTimeWithZone(Column column, Field fieldDefn, Object data) {
if (data == null) return null;
OffsetTime time = null;
if (data instanceof OffsetTime) {
// JDBC specification indicates that this will be the canonical object for this JDBC type.
@ -731,11 +690,13 @@ protected OffsetTime unexpectedTimeWithZone(Object value, Field fieldDefn) {
* but no time zone info. This method handles {@link java.sql.Date} objects plus any other standard date-related objects such
* as {@link java.util.Date}, {@link java.time.LocalTime}, and {@link java.time.LocalDateTime}.
*
* @param column the column definition describing the {@code data} value; never null
* @param fieldDefn the field definition; never null
* @param data the data object to be converted into a {@link Date Kafka Connect date} type; never null
* @return the converted value, or null if the conversion could not be made
*/
protected Object convertTimestamp(Field fieldDefn, Object data) {
protected Object convertTimestamp(Column column, Field fieldDefn, Object data) {
if (data == null) return null;
java.util.Date date = null;
if (data instanceof java.sql.Timestamp) {
// JDBC specification indicates that this will be the canonical object for this JDBC type.
@ -782,11 +743,13 @@ protected java.util.Date unexpectedTimestamp(Object value, Field fieldDefn) {
* {@link java.util.Date}, {@link java.time.LocalTime}, and {@link java.time.LocalDateTime}. If any of the types might
* have date components, those date components are ignored.
*
* @param column the column definition describing the {@code data} value; never null
* @param fieldDefn the field definition; never null
* @param data the data object to be converted into a {@link Date Kafka Connect date} type; never null
* @return the converted value, or null if the conversion could not be made
*/
protected Object convertTime(Field fieldDefn, Object data) {
protected Object convertTime(Column column, Field fieldDefn, Object data) {
if (data == null) return null;
java.util.Date date = null;
if (data instanceof java.sql.Time) {
// JDBC specification indicates that this will be the canonical object for this JDBC type.
@ -834,11 +797,13 @@ protected java.util.Date unexpectedTime(Object value, Field fieldDefn) {
* {@link java.util.Date}, {@link java.time.LocalDate}, and {@link java.time.LocalDateTime}. If any of the types might
* have time components, those time components are ignored.
*
* @param column the column definition describing the {@code data} value; never null
* @param fieldDefn the field definition; never null
* @param data the data object to be converted into a {@link Date Kafka Connect date} type; never null
* @return the converted value, or null if the conversion could not be made
*/
protected Object convertDate(Field fieldDefn, Object data) {
protected Object convertDate(Column column, Field fieldDefn, Object data) {
if (data == null) return null;
java.util.Date date = null;
if (data instanceof java.sql.Date) {
// JDBC specification indicates that this will be the nominal object for this JDBC type.
@ -882,4 +847,286 @@ protected java.util.Date unexpectedDate(Object value, Field fieldDefn) {
fieldDefn.schema(), value.getClass(), value);
return null;
}
/**
* Converts a value object for an expected JDBC type of {@link Types#BLOB}, {@link Types#BINARY},
* {@link Types#VARBINARY}, {@link Types#LONGVARBINARY}.
* <p>
* Per the JDBC specification, databases should return {@link java.sql.Date} instances that have no notion of time or
* time zones. This method handles {@link java.sql.Date} objects plus any other standard date-related objects such as
* {@link java.util.Date}, {@link java.time.LocalDate}, and {@link java.time.LocalDateTime}. If any of the types might
* have time components, those time components are ignored.
*
* @param column the column definition describing the {@code data} value; never null
* @param fieldDefn the field definition; never null
* @param data the data object to be converted into a {@link Date Kafka Connect date} type; never null
* @return the converted value, or null if the conversion could not be made
*/
protected Object convertBinary(Column column, Field fieldDefn, Object data) {
if (data == null) return null;
if (data instanceof char[]) {
data = new String((char[]) data); // convert to string
}
if (data instanceof String) {
// This was encoded as a hexadecimal string, but we receive it as a normal string ...
data = ((String) data).getBytes();
}
if (data instanceof byte[]) {
return ByteBuffer.wrap((byte[])data);
}
// An unexpected value
return unexpectedBinary(data, fieldDefn);
}
/**
* Handle the unexpected value from a row with a column type of {@link Types#BLOB}, {@link Types#BINARY},
* {@link Types#VARBINARY}, {@link Types#LONGVARBINARY}.
*
* @param value the binary value for which no conversion was found; never null
* @param fieldDefn the field definition in the Kafka Connect schema; never null
* @return the converted value, or null
* @see #convertBinary(Column, Field, Object)
*/
protected byte[] unexpectedBinary(Object value, Field fieldDefn) {
LOGGER.warn("Unexpected JDBC BINARY value for field {} with schema {}: class={}, value={}", fieldDefn.name(),
fieldDefn.schema(), value.getClass(), value);
return null;
}
/**
* Converts a value object for an expected JDBC type of {@link Types#TINYINT}.
*
* @param column the column definition describing the {@code data} value; never null
* @param fieldDefn the field definition; never null
* @param data the data object to be converted into a {@link Date Kafka Connect date} type; never null
* @return the converted value, or null if the conversion could not be made
*/
protected Object convertTinyInt(Column column, Field fieldDefn, Object data) {
if (data == null) return null;
if (data instanceof Byte) return data;
if (data instanceof Boolean) return ((Boolean) data).booleanValue() ? (byte) 1 : (byte) 0;
return handleUnknownData(column, fieldDefn, data);
}
/**
* Converts a value object for an expected JDBC type of {@link Types#SMALLINT}.
*
* @param column the column definition describing the {@code data} value; never null
* @param fieldDefn the field definition; never null
* @param data the data object to be converted into a {@link Date Kafka Connect date} type; never null
* @return the converted value, or null if the conversion could not be made
*/
protected Object convertSmallInt(Column column, Field fieldDefn, Object data) {
if (data == null) return null;
if (data instanceof Short) return data;
if (data instanceof Integer) return new Short(((Integer) data).shortValue());
if (data instanceof Long) return new Short(((Long) data).shortValue());
return handleUnknownData(column, fieldDefn, data);
}
/**
* Converts a value object for an expected JDBC type of {@link Types#INTEGER}.
*
* @param column the column definition describing the {@code data} value; never null
* @param fieldDefn the field definition; never null
* @param data the data object to be converted into a {@link Date Kafka Connect date} type; never null
* @return the converted value, or null if the conversion could not be made
*/
protected Object convertInteger(Column column, Field fieldDefn, Object data) {
if (data == null) return null;
if (data instanceof Integer) return data;
if (data instanceof Short) return new Integer(((Short) data).intValue());
if (data instanceof Long) return new Integer(((Long) data).intValue());
return handleUnknownData(column, fieldDefn, data);
}
/**
* Converts a value object for an expected JDBC type of {@link Types#INTEGER}.
*
* @param column the column definition describing the {@code data} value; never null
* @param fieldDefn the field definition; never null
* @param data the data object to be converted into a {@link Date Kafka Connect date} type; never null
* @return the converted value, or null if the conversion could not be made
*/
protected Object convertBigInt(Column column, Field fieldDefn, Object data) {
if (data == null) return null;
if (data instanceof Long) return data;
if (data instanceof Integer) return new Long(((Integer) data).longValue());
if (data instanceof Short) return new Long(((Short) data).longValue());
return handleUnknownData(column, fieldDefn, data);
}
/**
* Converts a value object for an expected JDBC type of {@link Types#FLOAT}.
*
* @param column the column definition describing the {@code data} value; never null
* @param fieldDefn the field definition; never null
* @param data the data object to be converted into a {@link Date Kafka Connect date} type; never null
* @return the converted value, or null if the conversion could not be made
*/
protected Object convertFloat(Column column, Field fieldDefn, Object data) {
return convertDouble(column, fieldDefn, data);
}
/**
* Converts a value object for an expected JDBC type of {@link Types#DOUBLE}.
*
* @param column the column definition describing the {@code data} value; never null
* @param fieldDefn the field definition; never null
* @param data the data object to be converted into a {@link Date Kafka Connect date} type; never null
* @return the converted value, or null if the conversion could not be made
*/
protected Object convertDouble(Column column, Field fieldDefn, Object data) {
if (data == null) return null;
if (data instanceof Double) return data;
if (data instanceof Float) return new Double(((Float) data).doubleValue());
if (data instanceof Integer) return new Double(((Integer) data).doubleValue());
if (data instanceof Long) return new Double(((Long) data).doubleValue());
if (data instanceof Short) return new Double(((Short) data).doubleValue());
return handleUnknownData(column, fieldDefn, data);
}
/**
* Converts a value object for an expected JDBC type of {@link Types#REAL}.
*
* @param column the column definition describing the {@code data} value; never null
* @param fieldDefn the field definition; never null
* @param data the data object to be converted into a {@link Date Kafka Connect date} type; never null
* @return the converted value, or null if the conversion could not be made
*/
protected Object convertReal(Column column, Field fieldDefn, Object data) {
if (data == null) return null;
if (data instanceof Float) return data;
if (data instanceof Double) return new Float(((Double) data).floatValue());
if (data instanceof Integer) return new Float(((Integer) data).floatValue());
if (data instanceof Long) return new Float(((Long) data).floatValue());
if (data instanceof Short) return new Float(((Short) data).floatValue());
return handleUnknownData(column, fieldDefn, data);
}
/**
* Converts a value object for an expected JDBC type of {@link Types#NUMERIC}.
*
* @param column the column definition describing the {@code data} value; never null
* @param fieldDefn the field definition; never null
* @param data the data object to be converted into a {@link Date Kafka Connect date} type; never null
* @return the converted value, or null if the conversion could not be made
*/
protected Object convertNumeric(Column column, Field fieldDefn, Object data) {
BigDecimal decimal = null;
if (data instanceof BigDecimal)
decimal = (BigDecimal) data;
else if (data instanceof Boolean)
decimal = new BigDecimal(((Boolean) data).booleanValue() ? 1 : 0);
else if (data instanceof Short)
decimal = new BigDecimal(((Short) data).intValue());
else if (data instanceof Integer)
decimal = new BigDecimal(((Integer) data).intValue());
else if (data instanceof Long)
decimal = BigDecimal.valueOf(((Long) data).longValue());
else if (data instanceof Float)
decimal = BigDecimal.valueOf(((Float) data).doubleValue());
else if (data instanceof Double)
decimal = BigDecimal.valueOf(((Double) data).doubleValue());
else {
return handleUnknownData(column, fieldDefn, data);
}
return decimal;
}
/**
* Converts a value object for an expected JDBC type of {@link Types#NUMERIC}.
*
* @param column the column definition describing the {@code data} value; never null
* @param fieldDefn the field definition; never null
* @param data the data object to be converted into a {@link Date Kafka Connect date} type; never null
* @return the converted value, or null if the conversion could not be made
*/
protected Object convertDecimal(Column column, Field fieldDefn, Object data) {
if (data == null) return null;
BigDecimal decimal = null;
if (data instanceof BigDecimal)
decimal = (BigDecimal) data;
else if (data instanceof Boolean)
decimal = new BigDecimal(((Boolean) data).booleanValue() ? 1 : 0);
else if (data instanceof Short)
decimal = new BigDecimal(((Short) data).intValue());
else if (data instanceof Integer)
decimal = new BigDecimal(((Integer) data).intValue());
else if (data instanceof Long)
decimal = BigDecimal.valueOf(((Long) data).longValue());
else if (data instanceof Float)
decimal = BigDecimal.valueOf(((Float) data).doubleValue());
else if (data instanceof Double)
decimal = BigDecimal.valueOf(((Double) data).doubleValue());
else {
return handleUnknownData(column, fieldDefn, data);
}
return decimal;
}
/**
* Converts a value object for an expected JDBC type of {@link Types#CHAR}, {@link Types#VARCHAR},
* {@link Types#LONGVARCHAR}, {@link Types#CLOB}, {@link Types#NCHAR}, {@link Types#NVARCHAR}, {@link Types#LONGNVARCHAR},
* {@link Types#NCLOB}, {@link Types#DATALINK}, and {@link Types#SQLXML}.
*
* @param column the column definition describing the {@code data} value; never null
* @param fieldDefn the field definition; never null
* @param data the data object to be converted into a {@link Date Kafka Connect date} type; never null
* @return the converted value, or null if the conversion could not be made
*/
protected Object convertString(Column column, Field fieldDefn, Object data) {
return data == null ? null : data.toString();
}
/**
* Converts a value object for an expected JDBC type of {@link Types#ROWID}.
*
* @param column the column definition describing the {@code data} value; never null
* @param fieldDefn the field definition; never null
* @param data the data object to be converted into a {@link Date Kafka Connect date} type; never null
* @return the converted value, or null if the conversion could not be made
*/
protected Object convertRowId(Column column, Field fieldDefn, Object data) {
if (data == null) return null;
if (data instanceof java.sql.RowId) {
java.sql.RowId row = (java.sql.RowId)data;
return ByteBuffer.wrap(row.getBytes());
}
return handleUnknownData(column, fieldDefn, data);
}
/**
* Converts a value object for an expected JDBC type of {@link Types#BIT}.
*
* @param column the column definition describing the {@code data} value; never null
* @param fieldDefn the field definition; never null
* @param data the data object to be converted into a {@link Date Kafka Connect date} type; never null
* @return the converted value, or null if the conversion could not be made
*/
protected Object convertBit(Column column, Field fieldDefn, Object data) {
if (data == null) return null;
if (data instanceof Boolean) return data;
if (data instanceof Short) return ((Short) data).intValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
if (data instanceof Integer) return ((Integer) data).intValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
if (data instanceof Long) return ((Long) data).intValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
return handleUnknownData(column, fieldDefn, data);
}
/**
* Converts a value object for an expected JDBC type of {@link Types#BOOLEAN}.
*
* @param column the column definition describing the {@code data} value; never null
* @param fieldDefn the field definition; never null
* @param data the data object to be converted into a {@link Date Kafka Connect date} type; never null
* @return the converted value, or null if the conversion could not be made
*/
protected Object convertBoolean(Column column, Field fieldDefn, Object data) {
if (data == null) return null;
if (data instanceof Boolean) return data;
if (data instanceof Short) return ((Short) data).intValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
if (data instanceof Integer) return ((Integer) data).intValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
if (data instanceof Long) return ((Long) data).intValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
return handleUnknownData(column, fieldDefn, data);
}
}

View File

@ -7,8 +7,13 @@
import static org.junit.Assert.fail;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
@ -47,18 +52,18 @@ public class VerifyRecord {
private static final MockSchemaRegistryClient schemaRegistry = new MockSchemaRegistryClient();
private static final AvroConverter avroKeyConverter = new AvroConverter(schemaRegistry);
private static final AvroConverter avroValueConverter = new AvroConverter(schemaRegistry);
static {
Map<String,Object> config = new HashMap<>();
config.put("schemas.enable",Boolean.TRUE.toString());
config.put("schemas.cache.size",100);
Map<String, Object> config = new HashMap<>();
config.put("schemas.enable", Boolean.TRUE.toString());
config.put("schemas.cache.size", 100);
keyJsonConverter.configure(config, true);
keyJsonDeserializer.configure(config, true);
valueJsonConverter.configure(config, false);
valueJsonDeserializer.configure(config, false);
config = new HashMap<>();
config.put("schema.registry.url","http://fake-url");
config.put("schema.registry.url", "http://fake-url");
avroKeyConverter.configure(config, false);
avroValueConverter.configure(config, false);
}
@ -276,73 +281,97 @@ public static void isValid(SourceRecord record) {
SchemaAndValue valueWithSchema = null;
SchemaAndValue avroKeyWithSchema = null;
SchemaAndValue avroValueWithSchema = null;
String msg = null;
try {
// The key should never be null ...
msg = "checking key is not null";
assertThat(record.key()).isNotNull();
assertThat(record.keySchema()).isNotNull();
// If the value is not null there must be a schema; otherwise, the schema should also be null ...
if (record.value() == null) {
msg = "checking value schema is null";
assertThat(record.valueSchema()).isNull();
} else {
msg = "checking value schema is not null";
assertThat(record.valueSchema()).isNotNull();
}
// First serialize and deserialize the key ...
msg = "serializing key using JSON converter";
byte[] keyBytes = keyJsonConverter.fromConnectData(record.topic(), record.keySchema(), record.key());
msg = "deserializing key using JSON deserializer";
keyJson = keyJsonDeserializer.deserialize(record.topic(), keyBytes);
msg = "deserializing key using JSON converter";
keyWithSchema = keyJsonConverter.toConnectData(record.topic(), keyBytes);
msg = "comparing key schema to that serialized/deserialized with JSON converter";
assertThat(keyWithSchema.schema()).isEqualTo(record.keySchema());
msg = "comparing key to that serialized/deserialized with JSON converter";
assertThat(keyWithSchema.value()).isEqualTo(record.key());
msg = "comparing key to its schema";
schemaMatchesStruct(keyWithSchema);
// then the value ...
msg = "serializing value using JSON converter";
byte[] valueBytes = valueJsonConverter.fromConnectData(record.topic(), record.valueSchema(), record.value());
msg = "deserializing value using JSON deserializer";
valueJson = valueJsonDeserializer.deserialize(record.topic(), valueBytes);
msg = "deserializing value using JSON converter";
valueWithSchema = valueJsonConverter.toConnectData(record.topic(), valueBytes);
assertThat(valueWithSchema.schema()).isEqualTo(record.valueSchema());
assertThat(valueWithSchema.value()).isEqualTo(record.value());
msg = "comparing value schema to that serialized/deserialized with JSON converter";
assertEquals(valueWithSchema.schema(),record.valueSchema());
msg = "comparing value to that serialized/deserialized with JSON converter";
assertEquals(valueWithSchema.value(),record.value());
msg = "comparing value to its schema";
schemaMatchesStruct(valueWithSchema);
// Serialize and deserialize the key using the Avro converter, and check that we got the same result ...
msg = "serializing key using Avro converter";
byte[] avroKeyBytes = avroValueConverter.fromConnectData(record.topic(), record.keySchema(), record.key());
msg = "deserializing key using Avro converter";
avroKeyWithSchema = avroValueConverter.toConnectData(record.topic(), avroKeyBytes);
assertThat(keyWithSchema.schema()).isEqualTo(record.keySchema());
assertThat(keyWithSchema.value()).isEqualTo(record.key());
msg = "comparing key schema to that serialized/deserialized with Avro converter";
assertEquals(keyWithSchema.schema(),record.keySchema());
msg = "comparing key to that serialized/deserialized with Avro converter";
assertEquals(keyWithSchema.value(),record.key());
msg = "comparing key to its schema";
schemaMatchesStruct(keyWithSchema);
// Serialize and deserialize the value using the Avro converter, and check that we got the same result ...
msg = "serializing value using Avro converter";
byte[] avroValueBytes = avroValueConverter.fromConnectData(record.topic(), record.valueSchema(), record.value());
msg = "deserializing value using Avro converter";
avroValueWithSchema = avroValueConverter.toConnectData(record.topic(), avroValueBytes);
assertThat(valueWithSchema.schema()).isEqualTo(record.valueSchema());
assertThat(valueWithSchema.value()).isEqualTo(record.value());
msg = "comparing value schema to that serialized/deserialized with Avro converter";
assertEquals(valueWithSchema.schema(),record.valueSchema());
msg = "comparing value to that serialized/deserialized with Avro converter";
assertEquals(valueWithSchema.value(),record.value());
msg = "comparing value to its schema";
schemaMatchesStruct(valueWithSchema);
} catch (Throwable t) {
Testing.Print.enable();
Testing.print("Problem with message on topic '" + record.topic() + "':");
Testing.printError(t);
if (keyJson == null ){
Testing.print("error deserializing key from JSON: " + SchemaUtil.asString(record.key()));
} else if (keyWithSchema == null ){
Testing.print("error using JSON converter on key: " + prettyJson(keyJson));
} else if (avroKeyWithSchema == null ){
Testing.print("error using Avro converter on key: " + prettyJson(keyJson));
} else {
Testing.print("valid key = " + prettyJson(keyJson));
Testing.print("error " + msg);
Testing.print(" key: " + SchemaUtil.asString(record.key()));
Testing.print(" key deserialized from JSON: " + prettyJson(keyJson));
if (keyWithSchema != null) {
Testing.print(" key to/from JSON: " + SchemaUtil.asString(keyWithSchema.value()));
}
if (valueJson == null ){
Testing.print("error deserializing value from JSON: " + SchemaUtil.asString(record.value()));
} else if (valueWithSchema == null ){
Testing.print("error using JSON converter on value: " + prettyJson(valueJson));
} else if (avroValueWithSchema == null ){
Testing.print("error using Avro converter on value: " + prettyJson(valueJson));
} else {
Testing.print("valid key = " + prettyJson(keyJson));
if (avroKeyWithSchema != null) {
Testing.print(" key to/from Avro: " + SchemaUtil.asString(avroKeyWithSchema.value()));
}
Testing.print(" value: " + SchemaUtil.asString(record.value()));
Testing.print(" value deserialized from JSON: " + prettyJson(valueJson));
if (valueWithSchema != null) {
Testing.print(" value to/from JSON: " + SchemaUtil.asString(valueWithSchema.value()));
}
if (avroValueWithSchema != null) {
Testing.print(" value to/from Avro: " + SchemaUtil.asString(avroValueWithSchema.value()));
}
if (t instanceof AssertionError) throw t;
fail(t.getMessage());
fail("error " + msg + ": " + t.getMessage());
}
}
@ -390,5 +419,135 @@ protected static String prettyJson(JsonNode json) {
return null;
}
}
// The remaining methods are needed simply because of the KAFKA-3803, so our comparisons cannot rely upon Struct.equals
protected static void assertEquals( Object o1, Object o2 ) {
// assertThat(o1).isEqualTo(o2);
if ( !equals(o1,o2) ) {
fail(SchemaUtil.asString(o1) + " was not equal to " + SchemaUtil.asString(o2));
}
}
@SuppressWarnings("unchecked")
protected static boolean equals( Object o1, Object o2 ) {
if ( o1 == o2 ) return true;
if (o1 == null) return o2 == null ? true : false;
if (o2 == null ) return false;
if ( o1 instanceof ByteBuffer ) {
o1 = ((ByteBuffer)o1).array();
}
if ( o2 instanceof ByteBuffer ) {
o2 = ((ByteBuffer)o2).array();
}
if ( o1 instanceof byte[] && o2 instanceof byte[] ) {
boolean result = Arrays.equals((byte[])o1,(byte[])o2);
return result;
}
if ( o1 instanceof Object[] && o2 instanceof Object[] ) {
boolean result = deepEquals((Object[])o1,(Object[])o2);
return result;
}
if ( o1 instanceof Map && o2 instanceof Map ) {
Map<String,Object> m1 = (Map<String,Object>)o1;
Map<String,Object> m2 = (Map<String,Object>)o2;
if ( !m1.keySet().equals(m2.keySet())) return false;
for ( Map.Entry<String, Object> entry : m1.entrySet()) {
Object v1 = entry.getValue();
Object v2 = m2.get(entry.getKey());
if ( !equals(v1,v2) ) return false;
}
return true;
}
if ( o1 instanceof Collection && o2 instanceof Collection ) {
Collection<Object> m1 = (Collection<Object>)o1;
Collection<Object> m2 = (Collection<Object>)o2;
if ( m1.size() != m2.size() ) return false;
Iterator<?> iter1 = m1.iterator();
Iterator<?> iter2 = m2.iterator();
while ( iter1.hasNext() && iter2.hasNext() ) {
if ( !equals(iter1.next(),iter2.next()) ) return false;
}
return true;
}
if ( o1 instanceof Struct && o2 instanceof Struct ) {
// Unfortunately, the Struct.equals() method has a bug in that it is not using Arrays.deepEquals(...) to
// compare values in two Struct objects. The result is that the equals only works if the values of the
// first level Struct are non arrays; otherwise, the array values are compared using == and that obviously
// does not work for non-primitive values.
Struct struct1 = (Struct) o1;
Struct struct2 = (Struct) o2;
if (! Objects.equals(struct1.schema(),struct2.schema()) ) {
return false;
}
Object[] array1 = valuesFor(struct1);
Object[] array2 = valuesFor(struct2);
boolean result = deepEquals(array1, array2);
return result;
}
return Objects.equals(o1, o2);
}
private static Object[] valuesFor( Struct struct ) {
Object[] array = new Object[struct.schema().fields().size()];
int index = 0;
for ( Field field : struct.schema().fields() ) {
array[index] = struct.get(field);
++index;
}
return array;
}
private static boolean deepEquals(Object[] a1, Object[] a2) {
if (a1 == a2)
return true;
if (a1 == null || a2==null)
return false;
int length = a1.length;
if (a2.length != length)
return false;
for (int i = 0; i < length; i++) {
Object e1 = a1[i];
Object e2 = a2[i];
if (e1 == e2)
continue;
if (e1 == null)
return false;
// Figure out whether the two elements are equal
boolean eq = deepEquals0(e1, e2);
if (!eq)
return false;
}
return true;
}
private static boolean deepEquals0(Object e1, Object e2) {
assert e1 != null;
boolean eq;
if (e1 instanceof Object[] && e2 instanceof Object[])
eq = deepEquals ((Object[]) e1, (Object[]) e2);
else if (e1 instanceof byte[] && e2 instanceof byte[])
eq = Arrays.equals((byte[]) e1, (byte[]) e2);
else if (e1 instanceof short[] && e2 instanceof short[])
eq = Arrays.equals((short[]) e1, (short[]) e2);
else if (e1 instanceof int[] && e2 instanceof int[])
eq = Arrays.equals((int[]) e1, (int[]) e2);
else if (e1 instanceof long[] && e2 instanceof long[])
eq = Arrays.equals((long[]) e1, (long[]) e2);
else if (e1 instanceof char[] && e2 instanceof char[])
eq = Arrays.equals((char[]) e1, (char[]) e2);
else if (e1 instanceof float[] && e2 instanceof float[])
eq = Arrays.equals((float[]) e1, (float[]) e2);
else if (e1 instanceof double[] && e2 instanceof double[])
eq = Arrays.equals((double[]) e1, (double[]) e2);
else if (e1 instanceof boolean[] && e2 instanceof boolean[])
eq = Arrays.equals((boolean[]) e1, (boolean[]) e2);
else
eq = equals(e1,e2);
return eq;
}
}