Merge pull request #107 from rhauch/dbz-123

DBZ-123 Corrected MySQL Connector's support for BIT(n) columns
This commit is contained in:
Randall Hauch 2016-09-21 15:22:00 -05:00 committed by GitHub
commit 730603976d
5 changed files with 253 additions and 26 deletions

View File

@ -5,6 +5,7 @@
*/
package io.debezium.connector.mysql;
import java.nio.ByteOrder;
import java.nio.charset.Charset;
import java.nio.charset.IllegalCharsetNameException;
import java.nio.charset.StandardCharsets;
@ -70,6 +71,11 @@ public MySqlValueConverters(boolean adaptiveTimePrecision) {
public MySqlValueConverters(boolean adaptiveTimePrecision, ZoneOffset defaultOffset) {
super(adaptiveTimePrecision, defaultOffset);
}
@Override
protected ByteOrder byteOrderOfBitType() {
return ByteOrder.BIG_ENDIAN;
}
@Override
public SchemaBuilder schemaBuilder(Column column) {

View File

@ -259,4 +259,15 @@ CREATE TABLE dbz_114_zerovaluetest (
c4 TIMESTAMP(2)
);
INSERT IGNORE INTO dbz_114_zerovaluetest VALUES ('0000-00-00', '00:00:00.000', '0000-00-00 00:00:00.000', '0000-00-00 00:00:00.000');
INSERT IGNORE INTO dbz_114_zerovaluetest VALUES ('0001-00-00', '00:01:00.000', '0001-00-00 00:00:00.000', '0001-00-00 00:00:00.000');
INSERT IGNORE INTO dbz_114_zerovaluetest VALUES ('0001-00-00', '00:01:00.000', '0001-00-00 00:00:00.000', '0001-00-00 00:00:00.000');
-- DBZ-123 handle bit values, including bit field literals
CREATE TABLE dbz_123_bitvaluetest (
c1 BIT,
c2 BIT(2),
c3 BIT(8) NOT NULL,
c4 BIT(64)
);
INSERT INTO dbz_123_bitvaluetest VALUES (1,2,64,23989979);
INSERT INTO dbz_123_bitvaluetest VALUES (b'1',b'10',b'01000000',b'1011011100000111011011011');

View File

@ -86,19 +86,23 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshot() throws
// Consume all of the events due to startup and initialization of the database
// ---------------------------------------------------------------------------------------------------------------
// Testing.Debug.enable();
SourceRecords records = consumeRecordsByTopic(1 + 6 + 9); // 1 create database, 6 create table, 9 inserts
int numCreateDatabase = 1;
int numCreateTables = 7;
int numDataRecords = 11;
SourceRecords records = consumeRecordsByTopic(numCreateDatabase + numCreateTables + numDataRecords);
stopConnector();
assertThat(records).isNotNull();
assertThat(records.recordsForTopic("regression").size()).isEqualTo(7);
assertThat(records.recordsForTopic("regression").size()).isEqualTo(numCreateDatabase + numCreateTables);
assertThat(records.recordsForTopic("regression.regression_test.t1464075356413_testtable6").size()).isEqualTo(1);
assertThat(records.recordsForTopic("regression.regression_test.dbz84_integer_types_table").size()).isEqualTo(1);
assertThat(records.recordsForTopic("regression.regression_test.dbz_85_fractest").size()).isEqualTo(1);
assertThat(records.recordsForTopic("regression.regression_test.dbz_100_enumsettest").size()).isEqualTo(3);
assertThat(records.recordsForTopic("regression.regression_test.dbz_102_charsettest").size()).isEqualTo(1);
assertThat(records.recordsForTopic("regression.regression_test.dbz_114_zerovaluetest").size()).isEqualTo(2);
assertThat(records.topics().size()).isEqualTo(7);
assertThat(records.recordsForTopic("regression.regression_test.dbz_123_bitvaluetest").size()).isEqualTo(2);
assertThat(records.topics().size()).isEqualTo(1 + numCreateTables);
assertThat(records.databaseNames().size()).isEqualTo(1);
assertThat(records.ddlRecordsForDatabase("regression_test").size()).isEqualTo(7);
assertThat(records.ddlRecordsForDatabase("regression_test").size()).isEqualTo(numCreateDatabase + numCreateTables);
assertThat(records.ddlRecordsForDatabase("connector_test")).isNull();
assertThat(records.ddlRecordsForDatabase("readbinlog_test")).isNull();
records.ddlRecordsForDatabase("regression_test").forEach(this::print);
@ -189,8 +193,10 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshot() throws
// c3 DATETIME(2),
// c4 TIMESTAMP(2)
//
// INSERT IGNORE INTO dbz_114_zerovaluetest VALUES ('0000-00-00', '00:00:00.000', '0000-00-00 00:00:00.000', '0000-00-00 00:00:00.000');
// INSERT IGNORE INTO dbz_114_zerovaluetest VALUES ('0001-00-00', '00:01:00.000', '0001-00-00 00:00:00.000', '0001-00-00 00:00:00.000');
// INSERT IGNORE INTO dbz_114_zerovaluetest VALUES ('0000-00-00', '00:00:00.000', '0000-00-00 00:00:00.000',
// '0000-00-00 00:00:00.000');
// INSERT IGNORE INTO dbz_114_zerovaluetest VALUES ('0001-00-00', '00:01:00.000', '0001-00-00 00:00:00.000',
// '0001-00-00 00:00:00.000');
//
// results in:
//
@ -231,6 +237,35 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshot() throws
// We're running the connector in the same timezone as the server, so the timezone in the timestamp
// should match our current offset ...
assertThat(c4DateTime.getOffset()).isEqualTo(OffsetDateTime.now().getOffset());
} else if (record.topic().endsWith("dbz_123_bitvaluetest")) {
// All row events should have the same values ...
Struct after = value.getStruct(Envelope.FieldName.AFTER);
// c1 BIT, // 1 bit
// c2 BIT(2), // 2 bits
// c3 BIT(8) // 8 bits
// c4 BIT(64) // 64 bits
Boolean c1 = after.getBoolean("c1");
assertThat(c1).isEqualTo(Boolean.TRUE);
byte[] c2 = after.getBytes("c2");
assertThat(c2.length).isEqualTo(1);
assertThat(c2[0]).isEqualTo((byte)2);
byte[] c3 = after.getBytes("c3");
assertThat(c3.length).isEqualTo(1);
assertThat(c3[0]).isEqualTo((byte)64);
// 1011011100000111011011011 = 23989979
byte[] c4 = after.getBytes("c4");
assertThat(c4.length).isEqualTo(8); // bytes, little endian
assertThat(c4[0]).isEqualTo((byte)219); // 11011011
assertThat(c4[1]).isEqualTo((byte)14); // 00001110
assertThat(c4[2]).isEqualTo((byte)110); // 01101110
assertThat(c4[3]).isEqualTo((byte)1); // 1
assertThat(c4[4]).isEqualTo((byte)0);
assertThat(c4[5]).isEqualTo((byte)0);
assertThat(c4[6]).isEqualTo((byte)0);
assertThat(c4[7]).isEqualTo((byte)0);
}
});
}
@ -262,19 +297,23 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshotAndConnect
// Consume all of the events due to startup and initialization of the database
// ---------------------------------------------------------------------------------------------------------------
// Testing.Debug.enable();
SourceRecords records = consumeRecordsByTopic(1 + 6 + 9); // 1 create database, 6 create table, 9 inserts
int numCreateDatabase = 1;
int numCreateTables = 7;
int numDataRecords = 11;
SourceRecords records = consumeRecordsByTopic(numCreateDatabase + numCreateTables + numDataRecords);
stopConnector();
assertThat(records).isNotNull();
assertThat(records.recordsForTopic("regression").size()).isEqualTo(7);
assertThat(records.recordsForTopic("regression").size()).isEqualTo(numCreateDatabase + numCreateTables);
assertThat(records.recordsForTopic("regression.regression_test.t1464075356413_testtable6").size()).isEqualTo(1);
assertThat(records.recordsForTopic("regression.regression_test.dbz84_integer_types_table").size()).isEqualTo(1);
assertThat(records.recordsForTopic("regression.regression_test.dbz_85_fractest").size()).isEqualTo(1);
assertThat(records.recordsForTopic("regression.regression_test.dbz_100_enumsettest").size()).isEqualTo(3);
assertThat(records.recordsForTopic("regression.regression_test.dbz_102_charsettest").size()).isEqualTo(1);
assertThat(records.recordsForTopic("regression.regression_test.dbz_114_zerovaluetest").size()).isEqualTo(2);
assertThat(records.topics().size()).isEqualTo(7);
assertThat(records.recordsForTopic("regression.regression_test.dbz_123_bitvaluetest").size()).isEqualTo(2);
assertThat(records.topics().size()).isEqualTo(1 + numCreateTables);
assertThat(records.databaseNames().size()).isEqualTo(1);
assertThat(records.ddlRecordsForDatabase("regression_test").size()).isEqualTo(7);
assertThat(records.ddlRecordsForDatabase("regression_test").size()).isEqualTo(numCreateDatabase + numCreateTables);
assertThat(records.ddlRecordsForDatabase("connector_test")).isNull();
assertThat(records.ddlRecordsForDatabase("readbinlog_test")).isNull();
records.ddlRecordsForDatabase("regression_test").forEach(this::print);
@ -364,8 +403,10 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshotAndConnect
// c3 DATETIME(2),
// c4 TIMESTAMP(2)
//
// INSERT IGNORE INTO dbz_114_zerovaluetest VALUES ('0000-00-00', '00:00:00.000', '0000-00-00 00:00:00.000', '0000-00-00 00:00:00.000');
// INSERT IGNORE INTO dbz_114_zerovaluetest VALUES ('0001-00-00', '00:01:00.000', '0001-00-00 00:00:00.000', '0001-00-00 00:00:00.000');
// INSERT IGNORE INTO dbz_114_zerovaluetest VALUES ('0000-00-00', '00:00:00.000', '0000-00-00 00:00:00.000',
// '0000-00-00 00:00:00.000');
// INSERT IGNORE INTO dbz_114_zerovaluetest VALUES ('0001-00-00', '00:01:00.000', '0001-00-00 00:00:00.000',
// '0001-00-00 00:00:00.000');
//
// results in:
//
@ -408,6 +449,35 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshotAndConnect
// We're running the connector in the same timezone as the server, so the timezone in the timestamp
// should match our current offset ...
assertThat(c4DateTime.getOffset()).isEqualTo(OffsetDateTime.now().getOffset());
} else if (record.topic().endsWith("dbz_123_bitvaluetest")) {
// All row events should have the same values ...
Struct after = value.getStruct(Envelope.FieldName.AFTER);
// c1 BIT, // 1 bit
// c2 BIT(2), // 2 bits
// c3 BIT(8) // 8 bits
// c4 BIT(64) // 64 bits
Boolean c1 = after.getBoolean("c1");
assertThat(c1).isEqualTo(Boolean.TRUE);
byte[] c2 = after.getBytes("c2");
assertThat(c2.length).isEqualTo(1);
assertThat(c2[0]).isEqualTo((byte)2);
byte[] c3 = after.getBytes("c3");
assertThat(c3.length).isEqualTo(1);
assertThat(c3[0]).isEqualTo((byte)64);
// 1011011100000111011011011 = 23989979
byte[] c4 = after.getBytes("c4");
assertThat(c4.length).isEqualTo(8); // bytes, little endian
assertThat(c4[0]).isEqualTo((byte)219); // 11011011
assertThat(c4[1]).isEqualTo((byte)14); // 00001110
assertThat(c4[2]).isEqualTo((byte)110); // 01101110
assertThat(c4[3]).isEqualTo((byte)1); // 1
assertThat(c4[4]).isEqualTo((byte)0);
assertThat(c4[5]).isEqualTo((byte)0);
assertThat(c4[6]).isEqualTo((byte)0);
assertThat(c4[7]).isEqualTo((byte)0);
}
});
}
@ -437,27 +507,32 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio
// Consume all of the events due to startup and initialization of the database
// ---------------------------------------------------------------------------------------------------------------
//Testing.Debug.enable();
// We expect a total of 16 schema change records:
int numTables = 7;
int numDataRecords = 11;
// We expect a total of:
// 1 set variables
// 6 drop tables
// 7 drop tables
// 1 drop database
// 1 create database
// 1 use database
// 6 create tables
SourceRecords records = consumeRecordsByTopic(16 + 9); // plus 9 data records ...
// 7 create tables
int numDdlRecords = numTables * 2 + 3; // 1 create, 1 drop, 1 use
int numSetVariables = 1;
SourceRecords records = consumeRecordsByTopic(numDdlRecords + numSetVariables + numDataRecords);
stopConnector();
assertThat(records).isNotNull();
assertThat(records.recordsForTopic("regression").size()).isEqualTo(16);
assertThat(records.recordsForTopic("regression").size()).isEqualTo(18);
assertThat(records.recordsForTopic("regression.regression_test.t1464075356413_testtable6").size()).isEqualTo(1);
assertThat(records.recordsForTopic("regression.regression_test.dbz84_integer_types_table").size()).isEqualTo(1);
assertThat(records.recordsForTopic("regression.regression_test.dbz_85_fractest").size()).isEqualTo(1);
assertThat(records.recordsForTopic("regression.regression_test.dbz_100_enumsettest").size()).isEqualTo(3);
assertThat(records.recordsForTopic("regression.regression_test.dbz_102_charsettest").size()).isEqualTo(1);
assertThat(records.recordsForTopic("regression.regression_test.dbz_114_zerovaluetest").size()).isEqualTo(2);
assertThat(records.topics().size()).isEqualTo(7);
assertThat(records.recordsForTopic("regression.regression_test.dbz_123_bitvaluetest").size()).isEqualTo(2);
assertThat(records.topics().size()).isEqualTo(numTables + 1);
assertThat(records.databaseNames().size()).isEqualTo(2);
assertThat(records.databaseNames()).containsOnly("regression_test", "");
assertThat(records.ddlRecordsForDatabase("regression_test").size()).isEqualTo(15);
assertThat(records.ddlRecordsForDatabase("regression_test").size()).isEqualTo(numDdlRecords);
assertThat(records.ddlRecordsForDatabase("connector_test")).isNull();
assertThat(records.ddlRecordsForDatabase("readbinlog_test")).isNull();
assertThat(records.ddlRecordsForDatabase("").size()).isEqualTo(1); // SET statement
@ -543,6 +618,35 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio
// We're running the connector in the same timezone as the server, so the timezone in the timestamp
// should match our current offset ...
assertThat(c4DateTime.getOffset()).isEqualTo(OffsetDateTime.now().getOffset());
} else if (record.topic().endsWith("dbz_123_bitvaluetest")) {
// All row events should have the same values ...
Struct after = value.getStruct(Envelope.FieldName.AFTER);
// c1 BIT, // 1 bit
// c2 BIT(2), // 2 bits
// c3 BIT(8) // 8 bits
// c4 BIT(64) // 64 bits
Boolean c1 = after.getBoolean("c1");
assertThat(c1).isEqualTo(Boolean.TRUE);
byte[] c2 = after.getBytes("c2");
assertThat(c2.length).isEqualTo(1);
assertThat(c2[0]).isEqualTo((byte)2);
byte[] c3 = after.getBytes("c3");
assertThat(c3.length).isEqualTo(1);
assertThat(c3[0]).isEqualTo((byte)64);
// 1011011100000111011011011 = 23989979
byte[] c4 = after.getBytes("c4");
assertThat(c4.length).isEqualTo(8); // bytes, little endian
assertThat(c4[0]).isEqualTo((byte)219); // 11011011
assertThat(c4[1]).isEqualTo((byte)14); // 00001110
assertThat(c4[2]).isEqualTo((byte)110); // 01101110
assertThat(c4[3]).isEqualTo((byte)1); // 1
assertThat(c4[4]).isEqualTo((byte)0);
assertThat(c4[5]).isEqualTo((byte)0);
assertThat(c4[6]).isEqualTo((byte)0);
assertThat(c4[7]).isEqualTo((byte)0);
}
});
}

View File

@ -52,11 +52,11 @@ public static Schema schema(int length) {
* @param value the logical value
* @return the encoded value
*/
public static byte[] fromLogical(Schema schema, BitSet value) {
public static byte[] fromBitSet(Schema schema, BitSet value) {
return value.toByteArray();
}
public static BitSet toLogical(Schema schema, byte[] value) {
public static BitSet toBitSet(Schema schema, byte[] value) {
return BitSet.valueOf(value);
}
}

View File

@ -7,12 +7,14 @@
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.sql.Types;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.OffsetTime;
import java.time.ZoneOffset;
import java.util.BitSet;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Decimal;
@ -165,7 +167,7 @@ public SchemaBuilder schemaBuilder(Column column) {
// Date and time values
case Types.DATE:
if (adaptiveTimePrecision) {
return Date.builder();
return Date.builder();
}
return org.apache.kafka.connect.data.Date.builder();
case Types.TIME:
@ -213,6 +215,11 @@ public ValueConverter converter(Column column, Field fieldDefn) {
case Types.NULL:
return (data) -> null;
case Types.BIT:
if (column.length() > 1) {
int numBits = column.length();
int numBytes = numBits / Byte.SIZE + (numBits % Byte.SIZE == 0 ? 0 : 1);
return (data) -> convertBits(column, fieldDefn, data, numBytes);
}
return (data) -> convertBit(column, fieldDefn, data);
case Types.BOOLEAN:
return (data) -> convertBoolean(column, fieldDefn, data);
@ -262,7 +269,7 @@ public ValueConverter converter(Column column, Field fieldDefn) {
// Date and time values
case Types.DATE:
if (adaptiveTimePrecision) {
return (data) -> convertDateToEpochDays(column, fieldDefn, data);
return (data) -> convertDateToEpochDays(column, fieldDefn, data);
}
return (data) -> convertDateToEpochDaysAsDate(column, fieldDefn, data);
case Types.TIME:
@ -322,7 +329,7 @@ protected Object convertTimestampWithZone(Column column, Field fieldDefn, Object
}
if (data == null) {
if (column.isOptional()) return null;
data = OffsetDateTime.of(LocalDate.ofEpochDay(0),LocalTime.MIDNIGHT, defaultOffset); // return epoch
data = OffsetDateTime.of(LocalDate.ofEpochDay(0), LocalTime.MIDNIGHT, defaultOffset); // return epoch
}
try {
return ZonedTimestamp.toIsoString(data, defaultOffset);
@ -1025,9 +1032,108 @@ protected Object convertBit(Column column, Field fieldDefn, Object data) {
if (data instanceof Short) return ((Short) data).intValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
if (data instanceof Integer) return ((Integer) data).intValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
if (data instanceof Long) return ((Long) data).intValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
if (data instanceof BitSet) {
BitSet value = (BitSet) data;
return value.get(0);
}
return handleUnknownData(column, fieldDefn, data);
}
/**
* Converts a value object for an expected JDBC type of {@link Types#BIT} of length 2+.
*
* @param column the column definition describing the {@code data} value; never null
* @param fieldDefn the field definition; never null
* @param data the data object to be converted into a {@link Date Kafka Connect date} type; never null
* @param numBytes the number of bytes that should be included in the resulting byte[]
* @return the converted value, or null if the conversion could not be made and the column allows nulls
* @throws IllegalArgumentException if the value could not be converted but the column does not allow nulls
*/
protected Object convertBits(Column column, Field fieldDefn, Object data, int numBytes) {
if (data == null) {
data = fieldDefn.schema().defaultValue();
}
if (data == null) {
if (column.isOptional()) return null;
return false;
}
if (data instanceof Boolean) {
Boolean value = (Boolean) data;
return new byte[] { value.booleanValue() ? (byte) 1 : (byte) 0 };
}
if (data instanceof Short) {
Short value = (Short) data;
ByteBuffer buffer = ByteBuffer.allocate(Short.BYTES);
buffer.order(ByteOrder.LITTLE_ENDIAN);
buffer.putShort(value.shortValue());
return buffer.array();
}
if (data instanceof Integer) {
Integer value = (Integer) data;
ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES);
buffer.order(ByteOrder.LITTLE_ENDIAN);
buffer.putInt(value.intValue());
return buffer.array();
}
if (data instanceof Long) {
Long value = (Long) data;
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
buffer.order(ByteOrder.LITTLE_ENDIAN);
buffer.putLong(value.longValue());
return buffer.array();
}
if (data instanceof byte[]) {
byte[] bytes = (byte[]) data;
if (bytes.length == 1) {
return bytes;
}
if (byteOrderOfBitType() == ByteOrder.BIG_ENDIAN) {
// Reverse it to little endian ...
int i = 0;
int j = bytes.length - 1;
byte tmp;
while (j > i) {
tmp = bytes[j];
bytes[j] = bytes[i];
bytes[i] = tmp;
++i;
--j;
}
}
return padLittleEndian(numBytes, bytes);
}
if (data instanceof BitSet) {
byte[] bytes = ((BitSet) data).toByteArray();
return padLittleEndian(numBytes, bytes);
}
return handleUnknownData(column, fieldDefn, data);
}
protected byte[] padLittleEndian(int numBytes, byte[] data) {
if (data.length < numBytes) {
byte[] padded = new byte[numBytes];
System.arraycopy(data, 0, padded, 0, data.length);
for (int i = data.length; i != numBytes; ++i) {
padded[i] = 0;
}
return padded;
}
return data;
}
/**
* Determine whether the {@code byte[]} values for columns of type {@code BIT(n)} are {@link ByteOrder#BIG_ENDIAN big-endian}
* or {@link ByteOrder#LITTLE_ENDIAN little-endian}. All values for {@code BIT(n)} columns are to be returned in
* {@link ByteOrder#LITTLE_ENDIAN little-endian}.
* <p>
* By default, this method returns {@link ByteOrder#LITTLE_ENDIAN}.
*
* @return little endian or big endian; never null
*/
protected ByteOrder byteOrderOfBitType() {
return ByteOrder.LITTLE_ENDIAN;
}
/**
* Converts a value object for an expected JDBC type of {@link Types#BOOLEAN}.
*
@ -1068,6 +1174,6 @@ protected Object handleUnknownData(Column column, Field fieldDefn, Object data)
return null;
}
throw new IllegalArgumentException("Unexpected value for JDBC type " + column.jdbcType() + " and column " + column +
": class=" + data.getClass()); // don't include value in case its sensitive
": class=" + data.getClass()); // don't include value in case its sensitive
}
}