DBZ-7933 Fix streaming TIME precision for micro/nano -seconds

This commit is contained in:
Chris Cranford 2024-06-07 15:20:04 -04:00 committed by Jiri Pechanec
parent 212fe3db38
commit 26af1c967a
4 changed files with 167 additions and 34 deletions

View File

@ -68,7 +68,7 @@ protected int getOperation(ResultSet resultSet) throws SQLException {
@Override
protected Object getColumnData(ResultSet resultSet, int columnIndex) throws SQLException {
if (resultSet.getMetaData().getColumnType(columnIndex) == Types.TIME) {
return resultSet.getTime(columnIndex);
return resultSet.getTimestamp(columnIndex);
}
return super.getColumnData(resultSet, columnIndex);
}

View File

@ -34,6 +34,7 @@
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.time.Date;
import io.debezium.time.MicroTime;
import io.debezium.time.NanoTime;
import io.debezium.time.NanoTimestamp;
import io.debezium.time.Time;
import io.debezium.time.Timestamp;
@ -89,6 +90,7 @@ public abstract class AbstractSqlServerDatatypesTest extends AbstractConnectorTe
" val_date date, " +
" val_time_p2 time(2), " +
" val_time time(4), " +
" val_time_p7 time(7), " +
" val_datetime2 datetime2, " +
" val_datetimeoffset datetimeoffset, " +
" val_datetime datetime, " +
@ -128,6 +130,7 @@ public abstract class AbstractSqlServerDatatypesTest extends AbstractConnectorTe
private static final List<SchemaAndValueField> EXPECTED_DATE_TIME = Arrays.asList(
new SchemaAndValueField("val_date", Date.builder().optional().build(), 17_725),
new SchemaAndValueField("val_time_p2", Time.builder().optional().build(), 37_425_680),
new SchemaAndValueField("val_time_p7", NanoTime.builder().optional().build(), 37_425_678_901_200L),
new SchemaAndValueField("val_time", MicroTime.builder().optional().build(), 37_425_678_900L),
new SchemaAndValueField("val_datetime2", NanoTimestamp.builder().optional().build(), 1_531_481_025_340_000_000L),
new SchemaAndValueField("val_datetimeoffset", ZonedTimestamp.builder().optional().build(), "2018-07-13T12:23:45.456+11:00"),
@ -147,6 +150,10 @@ public abstract class AbstractSqlServerDatatypesTest extends AbstractConnectorTe
java.util.Date.from(LocalTime.of(10, 23, 45, 678_900_000).atDate(LocalDate.ofEpochDay(0))
.atOffset(ZoneOffset.UTC)
.toInstant())),
new SchemaAndValueField("val_time_p7", org.apache.kafka.connect.data.Time.builder().optional().build(),
java.util.Date.from(LocalTime.of(10, 23, 45, 678_900_000).atDate(LocalDate.ofEpochDay(0))
.atOffset(ZoneOffset.UTC)
.toInstant())),
new SchemaAndValueField("val_datetime2", org.apache.kafka.connect.data.Timestamp.builder().optional().build(),
java.util.Date.from(LocalDateTime.of(2018, 7, 13, 11, 23, 45, 340_000_000)
.atOffset(ZoneOffset.UTC)
@ -180,7 +187,7 @@ public abstract class AbstractSqlServerDatatypesTest extends AbstractConnectorTe
DDL_XML
};
private static final int EXPECTED_RECORD_COUNT = ALL_DDLS.length;
private boolean useSnapshot = true;
@AfterClass
public static void dropTables() throws SQLException {
@ -188,36 +195,25 @@ public static void dropTables() throws SQLException {
}
@BeforeClass
public static void createTables() throws SQLException {
public static void beforeClass() throws SQLException {
TestHelper.createTestDatabase();
try (SqlServerConnection connection = TestHelper.testConnection()) {
connection.execute(ALL_DDLS);
for (String table : ALL_TABLES) {
TestHelper.enableTableCdc(connection, table);
}
connection.execute(
"INSERT INTO type_int VALUES (0, 1, 22, 333, 4444, 55555)",
"INSERT INTO type_fp VALUES (0, 1.123, 2, 3.323, 4.323, 5.323, 6.323)",
"INSERT INTO type_string VALUES (0, 'c\u010d', 'vc\u010d', 't\u010d', N'c\u010d', N'vc\u010d', N't\u010d')",
"INSERT INTO type_time VALUES (0, '2018-07-13', '10:23:45.678', '10:23:45.6789', '2018-07-13 11:23:45.34', '2018-07-13 12:23:45.456+11:00', '2018-07-13 13:23:45.78', '2018-07-13 14:23:45')",
"INSERT INTO type_xml VALUES (0, '<a>b</a>')");
// Make sure to wait for the CDC record for the last insert.
TestHelper.waitForCdcRecord(connection, "type_xml", rs -> rs.getInt("id") == 0);
}
}
@Test
public void intTypes() throws Exception {
Testing.debug("Inserted");
final SourceRecords records = consumeRecordsByTopic(EXPECTED_RECORD_COUNT);
if (!useSnapshot) {
insertIntTypes();
}
final SourceRecords records = consumeRecordsByTopic(getExpectedRecordCount());
List<SourceRecord> testTableRecords = records.recordsForTopic("server1.testDB1.dbo.type_int");
assertThat(testTableRecords).hasSize(1);
// insert
VerifyRecord.isValidRead(testTableRecords.get(0));
validateRecord(testTableRecords.get(0));
Struct after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after");
assertRecord(after, EXPECTED_INT);
}
@ -226,13 +222,17 @@ public void intTypes() throws Exception {
public void fpTypes() throws Exception {
Testing.debug("Inserted");
final SourceRecords records = consumeRecordsByTopic(EXPECTED_RECORD_COUNT);
if (!useSnapshot) {
insertFpTypes();
}
final SourceRecords records = consumeRecordsByTopic(getExpectedRecordCount());
List<SourceRecord> testTableRecords = records.recordsForTopic("server1.testDB1.dbo.type_fp");
assertThat(testTableRecords).hasSize(1);
// insert
VerifyRecord.isValidRead(testTableRecords.get(0));
validateRecord(testTableRecords.get(0));
Struct after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after");
assertRecord(after, EXPECTED_FP);
}
@ -241,13 +241,17 @@ public void fpTypes() throws Exception {
public void stringTypes() throws Exception {
Testing.debug("Inserted");
final SourceRecords records = consumeRecordsByTopic(EXPECTED_RECORD_COUNT);
if (!useSnapshot) {
insertStringTypes();
}
final SourceRecords records = consumeRecordsByTopic(getExpectedRecordCount());
List<SourceRecord> testTableRecords = records.recordsForTopic("server1.testDB1.dbo.type_string");
assertThat(testTableRecords).hasSize(1);
// insert
VerifyRecord.isValidRead(testTableRecords.get(0));
validateRecord(testTableRecords.get(0));
Struct after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after");
assertRecord(after, EXPECTED_STRING);
}
@ -256,13 +260,17 @@ public void stringTypes() throws Exception {
public void dateTimeTypes() throws Exception {
Testing.debug("Inserted");
final SourceRecords records = consumeRecordsByTopic(EXPECTED_RECORD_COUNT);
if (!useSnapshot) {
insertTimeTypes();
}
final SourceRecords records = consumeRecordsByTopic(getExpectedRecordCount());
List<SourceRecord> testTableRecords = records.recordsForTopic("server1.testDB1.dbo.type_time");
assertThat(testTableRecords).hasSize(1);
// insert
VerifyRecord.isValidRead(testTableRecords.get(0));
validateRecord(testTableRecords.get(0));
Struct after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after");
assertRecord(after, EXPECTED_DATE_TIME);
}
@ -270,17 +278,21 @@ public void dateTimeTypes() throws Exception {
@Test
public void dateTimeTypesAsConnect() throws Exception {
stopConnector();
init(TemporalPrecisionMode.CONNECT);
init(TemporalPrecisionMode.CONNECT, useSnapshot);
if (!useSnapshot) {
insertTimeTypes();
}
Testing.debug("Inserted");
final SourceRecords records = consumeRecordsByTopic(EXPECTED_RECORD_COUNT);
final SourceRecords records = consumeRecordsByTopic(getExpectedRecordCount());
List<SourceRecord> testTableRecords = records.recordsForTopic("server1.testDB1.dbo.type_time");
assertThat(testTableRecords).hasSize(1);
// insert
VerifyRecord.isValidRead(testTableRecords.get(0));
validateRecord(testTableRecords.get(0));
Struct after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after");
assertRecord(after, EXPECTED_DATE_TIME_AS_CONNECT);
}
@ -289,13 +301,17 @@ public void dateTimeTypesAsConnect() throws Exception {
public void otherTypes() throws Exception {
Testing.debug("Inserted");
final SourceRecords records = consumeRecordsByTopic(EXPECTED_RECORD_COUNT);
if (!useSnapshot) {
insertXmlTypes();
}
final SourceRecords records = consumeRecordsByTopic(getExpectedRecordCount());
List<SourceRecord> testTableRecords = records.recordsForTopic("server1.testDB1.dbo.type_xml");
assertThat(testTableRecords).hasSize(1);
// insert
VerifyRecord.isValidRead(testTableRecords.get(0));
validateRecord(testTableRecords.get(0));
Struct after = (Struct) ((Struct) testTableRecords.get(0).value()).get("after");
assertRecord(after, EXPECTED_XML);
}
@ -304,17 +320,92 @@ private void assertRecord(Struct record, List<SchemaAndValueField> expected) {
expected.forEach(schemaAndValueField -> schemaAndValueField.assertFor(record));
}
public void init(TemporalPrecisionMode temporalPrecisionMode) throws Exception {
public void init(TemporalPrecisionMode temporalPrecisionMode, boolean useSnapshot) throws Exception {
this.useSnapshot = useSnapshot;
initializeConnectorTestFramework();
Testing.Debug.enable();
Testing.Files.delete(TestHelper.SCHEMA_HISTORY_PATH);
Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, useSnapshot ? SnapshotMode.INITIAL : SnapshotMode.NO_DATA)
.with(RelationalDatabaseConnectorConfig.TIME_PRECISION_MODE, temporalPrecisionMode)
.build();
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
TestHelper.waitForSnapshotToBeCompleted();
}
protected static void createTables() throws SQLException {
TestHelper.createTestDatabase();
try (SqlServerConnection connection = TestHelper.testConnection()) {
connection.execute(ALL_DDLS);
for (String table : ALL_TABLES) {
TestHelper.enableTableCdc(connection, table);
}
}
}
protected static void insertIntTypes() throws SQLException {
try (SqlServerConnection connection = TestHelper.testConnection()) {
connection.execute("INSERT INTO type_int VALUES (0, 1, 22, 333, 4444, 55555)");
TestHelper.waitForCdcRecord(connection, "type_int", rs -> rs.getInt("id") == 0);
}
}
protected static void insertFpTypes() throws SQLException {
try (SqlServerConnection connection = TestHelper.testConnection()) {
connection.execute("INSERT INTO type_fp VALUES (0, 1.123, 2, 3.323, 4.323, 5.323, 6.323)");
TestHelper.waitForCdcRecord(connection, "type_fp", rs -> rs.getInt("id") == 0);
}
}
protected static void insertStringTypes() throws SQLException {
try (SqlServerConnection connection = TestHelper.testConnection()) {
connection.execute("INSERT INTO type_string VALUES (0, 'c\u010d', 'vc\u010d', 't\u010d', N'c\u010d', N'vc\u010d', N't\u010d')");
TestHelper.waitForCdcRecord(connection, "type_string", rs -> rs.getInt("id") == 0);
}
}
protected static void insertTimeTypes() throws SQLException {
try (SqlServerConnection connection = TestHelper.testConnection()) {
connection.execute(
"INSERT INTO type_time VALUES (0, '2018-07-13', '10:23:45.678', '10:23:45.6789', '10:23:45.6789012', '2018-07-13 11:23:45.34', '2018-07-13 12:23:45.456+11:00', '2018-07-13 13:23:45.78', '2018-07-13 14:23:45')");
TestHelper.waitForCdcRecord(connection, "type_time", rs -> rs.getInt("id") == 0);
}
}
protected static void insertXmlTypes() throws SQLException {
try (SqlServerConnection connection = TestHelper.testConnection()) {
connection.execute("INSERT INTO type_xml VALUES (0, '<a>b</a>')");
TestHelper.waitForCdcRecord(connection, "type_xml", rs -> rs.getInt("id") == 0);
}
}
protected static void dropAllTables() throws SQLException {
try (SqlServerConnection connection = TestHelper.testConnection()) {
for (String tableName : ALL_TABLES) {
try {
connection.execute("DROP TABLE " + tableName);
}
catch (SQLException e) {
// ignored
}
}
}
}
protected void validateRecord(SourceRecord record) {
if (useSnapshot) {
VerifyRecord.isValidRead(record);
}
else {
VerifyRecord.isValidInsert(record, "id", 0);
}
}
protected int getExpectedRecordCount() {
return useSnapshot ? ALL_DDLS.length : 1;
}
}

View File

@ -22,11 +22,19 @@ public class DatatypesFromSnapshotIT extends AbstractSqlServerDatatypesTest {
@BeforeClass
public static void beforeClass() throws SQLException {
AbstractSqlServerDatatypesTest.beforeClass();
createTables();
insertIntTypes();
insertFpTypes();
insertStringTypes();
insertTimeTypes();
insertXmlTypes();
}
@Before
public void before() throws Exception {
init(TemporalPrecisionMode.ADAPTIVE);
init(TemporalPrecisionMode.ADAPTIVE, true);
}
}

View File

@ -0,0 +1,34 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import java.sql.SQLException;
import org.junit.Before;
import org.junit.BeforeClass;
import io.debezium.jdbc.TemporalPrecisionMode;
/**
* Integration test to verify different SQL Server datatypes.
* The types are discovered during streaming phase.
*
* @author Chris Cranford
*/
public class DatatypesFromStreamingIT extends AbstractSqlServerDatatypesTest {
@BeforeClass
public static void beforeClass() throws SQLException {
AbstractSqlServerDatatypesTest.beforeClass();
}
@Before
public void before() throws Exception {
dropAllTables();
createTables();
init(TemporalPrecisionMode.ADAPTIVE, false);
}
}