DBZ-730 add MySqlDecimalIT integration test for all decimal.handling.mode/s

This commit is contained in:
rkerner 2020-07-07 18:33:34 +02:00 committed by Jiri Pechanec
parent f5ab12c346
commit 7b2e9be0d5
3 changed files with 175 additions and 2 deletions

View File

@ -0,0 +1,163 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import static org.fest.assertions.Assertions.assertThat;
import java.math.BigDecimal;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.List;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import io.debezium.config.Configuration;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.util.Testing;
/**
* Verify correct DECIMAL handling with different types of io.debezium.relational.RelationalDatabaseConnectorConfig.DecimalHandlingMode.
*
* @author René Kerner
*/
public class MySqlDecimalIT extends AbstractConnectorTest {
private static final String TABLE_NAME = "DBZ730";
private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-decimal.txt")
.toAbsolutePath();
private final UniqueDatabase DATABASE = new UniqueDatabase("decimaldb", "decimal_test")
.withDbHistoryPath(DB_HISTORY_PATH);
private Configuration config;
@Before
public void beforeEach() {
stopConnector();
DATABASE.createAndInitialize();
initializeConnectorTestFramework();
Testing.Files.delete(DB_HISTORY_PATH);
}
@After
public void afterEach() {
try {
stopConnector();
}
finally {
Testing.Files.delete(DB_HISTORY_PATH);
}
}
@Test
@FixFor("DBZ-730")
public void testPreciseDecimalHandlingMode() throws SQLException, InterruptedException {
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL)
.with(MySqlConnectorConfig.TABLE_WHITELIST, DATABASE.qualifiedTableName(TABLE_NAME))
.with(MySqlConnectorConfig.DECIMAL_HANDLING_MODE, RelationalDatabaseConnectorConfig.DecimalHandlingMode.PRECISE)
.build();
start(MySqlConnector.class, config);
assertBigDecimalChangeRecord(consumeInsert());
stopConnector();
}
@Test
@FixFor("DBZ-730")
public void testDoubleDecimalHandlingMode() throws SQLException, InterruptedException {
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL)
.with(MySqlConnectorConfig.TABLE_WHITELIST, DATABASE.qualifiedTableName(TABLE_NAME))
.with(MySqlConnectorConfig.DECIMAL_HANDLING_MODE, RelationalDatabaseConnectorConfig.DecimalHandlingMode.DOUBLE)
.build();
start(MySqlConnector.class, config);
assertDoubleChangeRecord(consumeInsert());
stopConnector();
}
@Test
@FixFor("DBZ-730")
public void testStringDecimalHandlingMode() throws SQLException, InterruptedException {
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL)
.with(MySqlConnectorConfig.TABLE_WHITELIST, DATABASE.qualifiedTableName(TABLE_NAME))
.with(MySqlConnectorConfig.DECIMAL_HANDLING_MODE, RelationalDatabaseConnectorConfig.DecimalHandlingMode.STRING)
.build();
start(MySqlConnector.class, config);
assertStringChangeRecord(consumeInsert());
stopConnector();
}
private SourceRecord consumeInsert() throws InterruptedException {
final int numDatabase = 2;
final int numTables = 4;
final int numOthers = 1;
SourceRecords records = consumeRecordsByTopic(numDatabase + numTables + numOthers);
assertThat(records).isNotNull();
List<SourceRecord> events = records.recordsForTopic(DATABASE.topicForTable(TABLE_NAME));
assertThat(events).hasSize(1);
return events.get(0);
}
private void assertBigDecimalChangeRecord(SourceRecord record) {
Assertions.assertThat(record).isNotNull();
final Struct change = ((Struct) record.value()).getStruct("after");
Assertions.assertThat(change.get("A")).isEqualTo(new BigDecimal("1.33"));
Assertions.assertThat(change.get("B")).isEqualTo(new BigDecimal("-2.111"));
Assertions.assertThat(change.get("C")).isEqualTo(new BigDecimal("3.44400"));
Assertions.assertThat(change.getWithoutDefault("D")).isNull();
Assertions.assertThat(record.valueSchema().field("after").schema().field("D").schema().defaultValue())
.isEqualTo(new BigDecimal("15.28000"));
}
private void assertDoubleChangeRecord(SourceRecord record) {
Assertions.assertThat(record).isNotNull();
final Struct change = ((Struct) record.value()).getStruct("after");
Assertions.assertThat(change.getFloat64("A")).isEqualTo(1.33);
Assertions.assertThat(change.getFloat64("B")).isEqualTo(-2.111);
Assertions.assertThat(change.getFloat64("C")).isEqualTo(3.44400);
Assertions.assertThat(change.getFloat64("D")).isNull();
Assertions.assertThat(record.valueSchema().field("after").schema().field("D").schema().defaultValue())
.isEqualTo(15.28000);
}
private void assertStringChangeRecord(SourceRecord record) {
Assertions.assertThat(record).isNotNull();
final Struct change = ((Struct) record.value()).getStruct("after");
Assertions.assertThat(change.getString("A").trim()).isEqualTo("1.33");
Assertions.assertThat(change.getString("B").trim()).isEqualTo("-2.111");
Assertions.assertThat(change.getString("C").trim()).isEqualTo("3.44400");
Assertions.assertThat(change.getString("D")).isNull();
Assertions.assertThat(record.valueSchema().field("after").schema().field("D").schema().defaultValue())
.isEqualTo("15.28000");
}
}

View File

@ -41,6 +41,7 @@
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.junit.SkipWhenKafkaVersion;
import io.debezium.junit.SkipWhenKafkaVersion.KafkaVersion;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.time.MicroTimestamp;
import io.debezium.time.Timestamp;
@ -525,7 +526,7 @@ public void realTest() throws InterruptedException {
public void numericAndDecimalToDoubleTest() throws InterruptedException {
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL)
.with(MySqlConnectorConfig.DECIMAL_HANDLING_MODE, JdbcValueConverters.DecimalMode.DOUBLE)
.with(MySqlConnectorConfig.DECIMAL_HANDLING_MODE, RelationalDatabaseConnectorConfig.DecimalHandlingMode.DOUBLE)
.build();
start(MySqlConnector.class, config);
@ -549,7 +550,7 @@ public void numericAndDecimalToDoubleTest() throws InterruptedException {
public void numericAndDecimalToDecimalTest() throws InterruptedException {
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL)
.with(MySqlConnectorConfig.DECIMAL_HANDLING_MODE, JdbcValueConverters.DecimalMode.PRECISE)
.with(MySqlConnectorConfig.DECIMAL_HANDLING_MODE, RelationalDatabaseConnectorConfig.DecimalHandlingMode.PRECISE)
.build();
start(MySqlConnector.class, config);

View File

@ -0,0 +1,9 @@
CREATE TABLE `DBZ730` (
id INT NOT NULL PRIMARY KEY AUTO_INCREMENT,
A NUMERIC(3, 2) NOT NULL DEFAULT 1.23,
B DECIMAL(4, 3) NOT NULL DEFAULT 2.321,
C NUMERIC(7, 5) NULL DEFAULT '12.678',
D NUMERIC(7, 5) NULL DEFAULT '15.28'
) ENGINE=InnoDB AUTO_INCREMENT=15851 DEFAULT CHARSET=utf8;
INSERT INTO `DBZ730`(A, B, C, D)
VALUES (1.33, -2.111 , 3.444, NULL);