From b90274cc63bcb5c27632b2580577fca781b1b8cd Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Thu, 28 Mar 2019 14:38:28 +0100 Subject: [PATCH] DBZ-1194 Convert nulls even for key --- .../connector/mysql/MySqlDateTimeInKeyIT.java | 101 ++++++++++++++++++ .../test/resources/ddl/datetime_key_test.sql | 14 +++ .../relational/TableSchemaBuilder.java | 6 +- 3 files changed, 120 insertions(+), 1 deletion(-) create mode 100644 debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlDateTimeInKeyIT.java create mode 100644 debezium-connector-mysql/src/test/resources/ddl/datetime_key_test.sql diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlDateTimeInKeyIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlDateTimeInKeyIT.java new file mode 100644 index 000000000..41d0574fd --- /dev/null +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlDateTimeInKeyIT.java @@ -0,0 +1,101 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.mysql; + +import static org.fest.assertions.Assertions.assertThat; + +import java.nio.file.Path; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.List; + +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import io.debezium.config.Configuration; +import io.debezium.doc.FixFor; +import io.debezium.embedded.AbstractConnectorTest; +import io.debezium.util.Testing; + +public class MySqlDateTimeInKeyIT extends AbstractConnectorTest { + + private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-decimal-column.txt") + .toAbsolutePath(); + private final UniqueDatabase DATABASE = new UniqueDatabase("pkdb", "datetime_key_test") + .withDbHistoryPath(DB_HISTORY_PATH); + + private Configuration config; + + @Before + public void beforeEach() { + stopConnector(); + DATABASE.createAndInitialize(); + initializeConnectorTestFramework(); + Testing.Files.delete(DB_HISTORY_PATH); + } + + @After + public void afterEach() { + try { + stopConnector(); + } finally { + Testing.Files.delete(DB_HISTORY_PATH); + } + } + + @Test + @FixFor("DBZ-1194") + public void shouldAcceptAllZeroDatetimeInPrimaryKey() throws SQLException, InterruptedException { + // Use the DB configuration to define the connector's configuration ... + config = DATABASE.defaultConfig() + .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL) + .build(); + + // Start the connector ... + start(MySqlConnector.class, config); + + // Testing.Debug.enable(); + final int numDatabase = 3; + final int numTables = 2; + final int numInserts = 1; + final int numOthers = 1; // SET + SourceRecords records = consumeRecordsByTopic(numDatabase + numTables + numInserts + numOthers); + + assertThat(records).isNotNull(); + records.forEach(this::validate); + + List changes = records.recordsForTopic(DATABASE.topicForTable("dbz_1194_datetime_key_test")); + assertThat(changes).hasSize(1); + + assertKey(changes); + + try (final Connection conn = MySQLConnection.forTestDatabase(DATABASE.getDatabaseName()).connection()) { + conn.createStatement().execute("SET sql_mode='';"); + conn.createStatement().execute("INSERT INTO dbz_1194_datetime_key_test VALUES (default, '0000-00-00 00:00:00', '0000-00-00', '00:00:00')"); + } + records = consumeRecordsByTopic(1); + + assertThat(records).isNotNull(); + records.forEach(this::validate); + + changes = records.recordsForTopic(DATABASE.topicForTable("dbz_1194_datetime_key_test")); + assertThat(changes).hasSize(1); + + assertKey(changes); + + stopConnector(); + } + + private void assertKey(List changes) { + Struct key = (Struct) changes.get(0).key(); + assertThat(key.getInt64("dtval")).isZero(); + assertThat(key.getInt64("tval")).isZero(); + assertThat(key.getInt32("dval")).isZero(); + } +} diff --git a/debezium-connector-mysql/src/test/resources/ddl/datetime_key_test.sql b/debezium-connector-mysql/src/test/resources/ddl/datetime_key_test.sql new file mode 100644 index 000000000..08740721b --- /dev/null +++ b/debezium-connector-mysql/src/test/resources/ddl/datetime_key_test.sql @@ -0,0 +1,14 @@ +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: datetime_key_test +-- ---------------------------------------------------------------------------------------------------------------- + +SET sql_mode=''; +CREATE TABLE dbz_1194_datetime_key_test ( + id INT AUTO_INCREMENT NOT NULL, + dtval DATETIME NOT NULL, + dval DATE NOT NULL, + tval TIME NOT NULL, + PRIMARY KEY (id, dtval, dval, tval) +) DEFAULT CHARSET=utf8; + +INSERT INTO dbz_1194_datetime_key_test VALUES (default, '0000-00-00 00:00:00', '0000-00-00', '00:00:00'); diff --git a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java index 3e38eb56a..7a2e8baca 100644 --- a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java +++ b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java @@ -171,7 +171,11 @@ protected Function createKeyGenerator(Schema schema, TableId c Object value = row[recordIndexes[i]]; ValueConverter converter = converters[i]; if (converter != null) { - value = value == null ? value : converter.convert(value); + // A component of primary key must be not-null. + // It is possible for some databases and values (MySQL and all-zero datetime) + // to be reported as null by JDBC or streaming reader. + // It thus makes sense to convert them to a sensible default replacement value. + value = converter.convert(value); try { result.put(fields[i], value); } catch (DataException e) {