DBZ-6143 Unify Oracle TIMESTAMP WITH TIME ZONE format

When taking a snapshot, the Oracle connector was converting the TIMESTAMP
WITH TIME ZONE value to GMT and per the documentation, the value should
be emitted in the time zone of the data.

The snapshot emitted value in GMT is temporally accurate, so there is no
data inconsistency, but the emitted format itself was inconsistent when
looking at how the column data was emitted during a snapshot versus in a
streaming event.
This commit is contained in:
Chris Cranford 2023-02-22 16:13:11 -05:00 committed by Jiri Pechanec
parent 76a8f8dbe1
commit e320abc834
2 changed files with 73 additions and 1 deletions

View File

@ -574,7 +574,7 @@ else if (data instanceof DATE) {
}
else if (data instanceof TIMESTAMPTZ) {
final TIMESTAMPTZ ts = (TIMESTAMPTZ) data;
data = ZonedDateTime.ofInstant(ts.timestampValue(connection.connection()).toInstant(), ts.getTimeZone().toZoneId());
data = ts.toZonedDateTime();
}
else if (data instanceof TIMESTAMPLTZ) {
final TIMESTAMPLTZ ts = (TIMESTAMPLTZ) data;

View File

@ -4942,6 +4942,78 @@ public void testCapturingChangesForTableWithSpecialCharactersInName() throws Exc
}
}
@Test
@FixFor("DBZ-6143")
public void testTimestampWithTimeZoneFormatConsistentUsingDriverEnabledTimestampTzInGmt() throws Exception {
TestHelper.dropTable(connection, "tz_test");
try {
connection.execute("CREATE TABLE tz_test (a timestamp with time zone)");
connection.execute("INSERT INTO tz_test values (to_timestamp_tz('2010-12-01 23:12:56.788 -12:44', 'YYYY-MM-DD HH24:MI:SS.FF TZH:TZM'))");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.TZ_TEST")
.with("driver.oracle.jdbc.timestampTzInGmt", "true") // driver default
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Snapshot
SourceRecords records = consumeRecordsByTopic(1);
List<SourceRecord> tableRecords = records.recordsForTopic("server1.DEBEZIUM.TZ_TEST");
assertThat(tableRecords).hasSize(1);
assertThat(getAfter(tableRecords.get(0)).get("A")).isEqualTo("2010-12-01T23:12:56.788000-12:44");
// Streaming
connection.execute("INSERT INTO tz_test values (to_timestamp_tz('2010-12-01 23:12:56.788 -12:44', 'YYYY-MM-DD HH24:MI:SS.FF TZH:TZM'))");
records = consumeRecordsByTopic(1);
tableRecords = records.recordsForTopic("server1.DEBEZIUM.TZ_TEST");
assertThat(tableRecords).hasSize(1);
assertThat(getAfter(tableRecords.get(0)).get("A")).isEqualTo("2010-12-01T23:12:56.788000-12:44");
}
finally {
TestHelper.dropTable(connection, "tz_test");
}
}
@Test
@FixFor("DBZ-6143")
public void testTimestampWithTimeZoneFormatConsistentUsingDriverDisabledTimestampTzInGmt() throws Exception {
TestHelper.dropTable(connection, "tz_test");
try {
connection.execute("CREATE TABLE tz_test (a timestamp with time zone)");
connection.execute("INSERT INTO tz_test values (to_timestamp_tz('2010-12-01 23:12:56.788 -12:44', 'YYYY-MM-DD HH24:MI:SS.FF TZH:TZM'))");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.TZ_TEST")
.with("driver.oracle.jdbc.timestampTzInGmt", "false")
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Snapshot
SourceRecords records = consumeRecordsByTopic(1);
List<SourceRecord> tableRecords = records.recordsForTopic("server1.DEBEZIUM.TZ_TEST");
assertThat(tableRecords).hasSize(1);
assertThat(getAfter(tableRecords.get(0)).get("A")).isEqualTo("2010-12-01T23:12:56.788000-12:44");
// Streaming
connection.execute("INSERT INTO tz_test values (to_timestamp_tz('2010-12-01 23:12:56.788 -12:44', 'YYYY-MM-DD HH24:MI:SS.FF TZH:TZM'))");
records = consumeRecordsByTopic(1);
tableRecords = records.recordsForTopic("server1.DEBEZIUM.TZ_TEST");
assertThat(tableRecords).hasSize(1);
assertThat(getAfter(tableRecords.get(0)).get("A")).isEqualTo("2010-12-01T23:12:56.788000-12:44");
}
finally {
TestHelper.dropTable(connection, "tz_test");
}
}
private void waitForCurrentScnToHaveBeenSeenByConnector() throws SQLException {
try (OracleConnection admin = TestHelper.adminConnection(true)) {
final Scn scn = admin.getCurrentScn();