DBZ-768 Fix formatting. Behaviour not dependent on a timezone
This commit is contained in:
parent
741debe18e
commit
b4b1f6976b
@ -9,6 +9,7 @@
|
|||||||
import java.sql.Timestamp;
|
import java.sql.Timestamp;
|
||||||
import java.sql.Types;
|
import java.sql.Types;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
|
import java.time.Instant;
|
||||||
import java.time.LocalDate;
|
import java.time.LocalDate;
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
import java.time.LocalTime;
|
import java.time.LocalTime;
|
||||||
@ -94,8 +95,12 @@ public Object convert(Column column, String value) {
|
|||||||
* @return the converted value;
|
* @return the converted value;
|
||||||
*/
|
*/
|
||||||
private Object convertToLocalDate(Column column, String value) {
|
private Object convertToLocalDate(Column column, String value) {
|
||||||
if (ALL_ZERO_DATE.equals(value) && column.isOptional()) return null;
|
if (ALL_ZERO_DATE.equals(value) && column.isOptional()) {
|
||||||
if (ALL_ZERO_DATE.equals(value)) value = EPOCH_DATE;
|
return null;
|
||||||
|
}
|
||||||
|
if (ALL_ZERO_DATE.equals(value)) {
|
||||||
|
value = EPOCH_DATE;
|
||||||
|
}
|
||||||
return LocalDate.from(DateTimeFormatter.ISO_LOCAL_DATE.parse(value));
|
return LocalDate.from(DateTimeFormatter.ISO_LOCAL_DATE.parse(value));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -110,9 +115,12 @@ private Object convertToLocalDate(Column column, String value) {
|
|||||||
*/
|
*/
|
||||||
private Object convertToLocalDateTime(Column column, String value) {
|
private Object convertToLocalDateTime(Column column, String value) {
|
||||||
final boolean matches = ALL_ZERO_TIMESTAMP.matcher(value).matches();
|
final boolean matches = ALL_ZERO_TIMESTAMP.matcher(value).matches();
|
||||||
if (matches && column.isOptional()) return null;
|
if (matches && column.isOptional()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
if (matches) {
|
if (matches) {
|
||||||
value = EPOCH_TIMESTAMP;
|
value = EPOCH_TIMESTAMP;
|
||||||
|
// Align fraction zeros according to the database schema
|
||||||
if (column.length() > 0) {
|
if (column.length() > 0) {
|
||||||
final StringBuilder sb = new StringBuilder(EPOCH_TIMESTAMP).append('.');
|
final StringBuilder sb = new StringBuilder(EPOCH_TIMESTAMP).append('.');
|
||||||
for (int i = 0; i < column.length(); i++) {
|
for (int i = 0; i < column.length(); i++) {
|
||||||
@ -135,9 +143,13 @@ private Object convertToLocalDateTime(Column column, String value) {
|
|||||||
* @return the converted value;
|
* @return the converted value;
|
||||||
*/
|
*/
|
||||||
private Object convertToTimestamp(Column column, String value) {
|
private Object convertToTimestamp(Column column, String value) {
|
||||||
final boolean matches = ALL_ZERO_TIMESTAMP.matcher(value).matches();
|
final boolean matches = ALL_ZERO_TIMESTAMP.matcher(value).matches() || EPOCH_TIMESTAMP.equals(value);
|
||||||
if (matches && column.isOptional()) return null;
|
if (matches && column.isOptional()) {
|
||||||
if (matches) value = EPOCH_TIMESTAMP;
|
return null;
|
||||||
|
}
|
||||||
|
if (matches) {
|
||||||
|
return Timestamp.from(Instant.EPOCH);
|
||||||
|
}
|
||||||
return Timestamp.valueOf(value).toInstant().atZone(ZoneId.systemDefault());
|
return Timestamp.valueOf(value).toInstant().atZone(ZoneId.systemDefault());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -253,7 +265,9 @@ private Object convertToBoolean(String value) {
|
|||||||
|
|
||||||
private String timeFormat(int length) {
|
private String timeFormat(int length) {
|
||||||
String defaultFormat = "HH:mm:ss";
|
String defaultFormat = "HH:mm:ss";
|
||||||
if (length <= 0) return defaultFormat;
|
if (length <= 0) {
|
||||||
|
return defaultFormat;
|
||||||
|
}
|
||||||
StringBuilder format = new StringBuilder(defaultFormat);
|
StringBuilder format = new StringBuilder(defaultFormat);
|
||||||
format.append(".");
|
format.append(".");
|
||||||
for (int i = 0; i < length; i++) {
|
for (int i = 0; i < length; i++) {
|
||||||
@ -264,7 +278,9 @@ private String timeFormat(int length) {
|
|||||||
|
|
||||||
private String timestampFormat(int length) {
|
private String timestampFormat(int length) {
|
||||||
String defaultFormat = "yyyy-MM-dd HH:mm:ss";
|
String defaultFormat = "yyyy-MM-dd HH:mm:ss";
|
||||||
if (length <= 0) return defaultFormat;
|
if (length <= 0) {
|
||||||
|
return defaultFormat;
|
||||||
|
}
|
||||||
StringBuilder format = new StringBuilder(defaultFormat);
|
StringBuilder format = new StringBuilder(defaultFormat);
|
||||||
format.append(".");
|
format.append(".");
|
||||||
for (int i = 0; i < length; i++) {
|
for (int i = 0; i < length; i++) {
|
||||||
|
@ -9,6 +9,8 @@
|
|||||||
|
|
||||||
import java.math.BigDecimal;
|
import java.math.BigDecimal;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.time.ZoneId;
|
||||||
import java.time.ZoneOffset;
|
import java.time.ZoneOffset;
|
||||||
import java.time.ZonedDateTime;
|
import java.time.ZonedDateTime;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
@ -23,6 +25,7 @@
|
|||||||
import io.debezium.relational.TableId;
|
import io.debezium.relational.TableId;
|
||||||
import io.debezium.relational.Tables;
|
import io.debezium.relational.Tables;
|
||||||
import io.debezium.relational.ddl.AbstractDdlParser;
|
import io.debezium.relational.ddl.AbstractDdlParser;
|
||||||
|
import io.debezium.time.ZonedTimestamp;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author laomei
|
* @author laomei
|
||||||
@ -323,15 +326,15 @@ public void parseNumericAndDecimalToDecimalDefaultValue() {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void parseTimeDefaultValue() {
|
public void parseTimeDefaultValue() {
|
||||||
String sql = "CREATE TABLE TIME_TABLE (\n" +
|
String sql = "CREATE TABLE TIME_TABLE (" +
|
||||||
" A timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,\n" +
|
" A timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP," +
|
||||||
" B timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',\n" +
|
" B timestamp NOT NULL DEFAULT '0000-00-00 00:00:00'," +
|
||||||
" C timestamp NOT NULL DEFAULT '0000-00-00 00:00:00.000'," +
|
" C timestamp NOT NULL DEFAULT '0000-00-00 00:00:00.000'," +
|
||||||
" D timestamp NOT NULL DEFAULT '2018-06-26 12:34:56'," +
|
" D timestamp NOT NULL DEFAULT '2018-06-26 12:34:56'," +
|
||||||
" E timestamp NOT NULL DEFAULT '2018-06-26 12:34:56.000'," +
|
" E timestamp NOT NULL DEFAULT '2018-06-26 12:34:56.000'," +
|
||||||
" F timestamp NOT NULL DEFAULT '2018-06-26 12:34:56.78'," +
|
" F timestamp NOT NULL DEFAULT '2018-06-26 12:34:56.78'," +
|
||||||
" G datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,\n" +
|
" G datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP," +
|
||||||
" H datetime NOT NULL DEFAULT '0000-00-00 00:00:00',\n" +
|
" H datetime NOT NULL DEFAULT '0000-00-00 00:00:00'," +
|
||||||
" I datetime(3) NOT NULL DEFAULT '0000-00-00 00:00:00.000'," +
|
" I datetime(3) NOT NULL DEFAULT '0000-00-00 00:00:00.000'," +
|
||||||
" J datetime NOT NULL DEFAULT '2018-06-26 12:34:56'," +
|
" J datetime NOT NULL DEFAULT '2018-06-26 12:34:56'," +
|
||||||
" K datetime(3) NOT NULL DEFAULT '2018-06-26 12:34:56.000'," +
|
" K datetime(3) NOT NULL DEFAULT '2018-06-26 12:34:56.000'," +
|
||||||
@ -339,12 +342,12 @@ public void parseTimeDefaultValue() {
|
|||||||
");";
|
");";
|
||||||
parser.parse(sql, tables);
|
parser.parse(sql, tables);
|
||||||
Table table = tables.forTable(new TableId(null, null, "TIME_TABLE"));
|
Table table = tables.forTable(new TableId(null, null, "TIME_TABLE"));
|
||||||
assertThat(table.columnWithName("A").defaultValue()).isEqualTo("1970-01-01T00:00:00+01:00");
|
assertThat(table.columnWithName("A").defaultValue()).isEqualTo("1970-01-01T00:00:00Z");
|
||||||
assertThat(table.columnWithName("B").defaultValue()).isEqualTo("1970-01-01T00:00:00+01:00");
|
assertThat(table.columnWithName("B").defaultValue()).isEqualTo("1970-01-01T00:00:00Z");
|
||||||
assertThat(table.columnWithName("C").defaultValue()).isEqualTo("1970-01-01T00:00:00+01:00");
|
assertThat(table.columnWithName("C").defaultValue()).isEqualTo("1970-01-01T00:00:00Z");
|
||||||
assertThat(table.columnWithName("D").defaultValue()).isEqualTo("2018-06-26T12:34:56+02:00");
|
assertThat(table.columnWithName("D").defaultValue()).isEqualTo(ZonedTimestamp.toIsoString(LocalDateTime.of(2018, 6, 26, 12, 34, 56, 0).atZone(ZoneId.systemDefault()), null));
|
||||||
assertThat(table.columnWithName("E").defaultValue()).isEqualTo("2018-06-26T12:34:56+02:00");
|
assertThat(table.columnWithName("E").defaultValue()).isEqualTo(ZonedTimestamp.toIsoString(LocalDateTime.of(2018, 6, 26, 12, 34, 56, 0).atZone(ZoneId.systemDefault()), null));
|
||||||
assertThat(table.columnWithName("F").defaultValue()).isEqualTo("2018-06-26T12:34:56.78+02:00");
|
assertThat(table.columnWithName("F").defaultValue()).isEqualTo(ZonedTimestamp.toIsoString(LocalDateTime.of(2018, 6, 26, 12, 34, 56, 780_000_000).atZone(ZoneId.systemDefault()), null));
|
||||||
assertThat(table.columnWithName("G").defaultValue()).isEqualTo(Date.from(Instant.ofEpochMilli(0)));
|
assertThat(table.columnWithName("G").defaultValue()).isEqualTo(Date.from(Instant.ofEpochMilli(0)));
|
||||||
assertThat(table.columnWithName("H").defaultValue()).isEqualTo((Date.from(Instant.ofEpochMilli(0))));
|
assertThat(table.columnWithName("H").defaultValue()).isEqualTo((Date.from(Instant.ofEpochMilli(0))));
|
||||||
assertThat(table.columnWithName("I").defaultValue()).isEqualTo((Date.from(Instant.ofEpochMilli(0))));
|
assertThat(table.columnWithName("I").defaultValue()).isEqualTo((Date.from(Instant.ofEpochMilli(0))));
|
||||||
|
@ -5,25 +5,27 @@
|
|||||||
*/
|
*/
|
||||||
package io.debezium.connector.mysql;
|
package io.debezium.connector.mysql;
|
||||||
|
|
||||||
import io.debezium.config.Configuration;
|
import static org.fest.assertions.Assertions.assertThat;
|
||||||
import io.debezium.embedded.AbstractConnectorTest;
|
|
||||||
import io.debezium.time.Timestamp;
|
import java.nio.file.Path;
|
||||||
import io.debezium.time.ZonedTimestamp;
|
import java.time.Instant;
|
||||||
import io.debezium.util.Testing;
|
import java.time.LocalDateTime;
|
||||||
|
import java.time.ZoneOffset;
|
||||||
|
import java.time.ZonedDateTime;
|
||||||
|
import java.time.format.DateTimeFormatter;
|
||||||
|
import java.util.Collections;
|
||||||
|
|
||||||
import org.apache.kafka.connect.data.Schema;
|
import org.apache.kafka.connect.data.Schema;
|
||||||
import org.apache.kafka.connect.source.SourceRecord;
|
import org.apache.kafka.connect.source.SourceRecord;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.nio.file.Path;
|
import io.debezium.config.Configuration;
|
||||||
import java.time.LocalDateTime;
|
import io.debezium.embedded.AbstractConnectorTest;
|
||||||
import java.time.ZoneId;
|
import io.debezium.time.Timestamp;
|
||||||
import java.time.ZonedDateTime;
|
import io.debezium.time.ZonedTimestamp;
|
||||||
import java.time.format.DateTimeFormatter;
|
import io.debezium.util.Testing;
|
||||||
import java.util.Collections;
|
|
||||||
|
|
||||||
import static org.fest.assertions.Assertions.assertThat;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author luobo on 2018/6/8 14:16
|
* @author luobo on 2018/6/8 14:16
|
||||||
@ -74,9 +76,8 @@ public void allZeroDateAndTimeTypeTest() throws InterruptedException {
|
|||||||
Schema schemaF = record.valueSchema().fields().get(1).schema().fields().get(5).schema();
|
Schema schemaF = record.valueSchema().fields().get(1).schema().fields().get(5).schema();
|
||||||
|
|
||||||
//column A, 0000-00-00 00:00:00 => 1970-01-01 00:00:00
|
//column A, 0000-00-00 00:00:00 => 1970-01-01 00:00:00
|
||||||
String valueA = "1970-01-01 00:00:00";
|
ZonedDateTime a = ZonedDateTime.ofInstant(Instant.EPOCH, ZoneOffset.UTC);
|
||||||
ZonedDateTime a = java.sql.Timestamp.valueOf(valueA).toInstant().atZone(ZoneId.systemDefault());
|
String isoStringA = ZonedTimestamp.toIsoString(a, ZoneOffset.UTC, MySqlValueConverters::adjustTemporal);
|
||||||
String isoStringA = ZonedTimestamp.toIsoString(a, ZoneId.systemDefault(), MySqlValueConverters::adjustTemporal);
|
|
||||||
assertThat(schemaA.defaultValue()).isEqualTo(isoStringA);
|
assertThat(schemaA.defaultValue()).isEqualTo(isoStringA);
|
||||||
|
|
||||||
//column B allows null, default value should be null
|
//column B allows null, default value should be null
|
||||||
|
@ -509,9 +509,8 @@ public void dateAndTimeTest() throws InterruptedException {
|
|||||||
assertThat(schemaI.defaultValue()).isEqualTo(82800123456L);
|
assertThat(schemaI.defaultValue()).isEqualTo(82800123456L);
|
||||||
|
|
||||||
//current timestamp will be replaced with epoch timestamp
|
//current timestamp will be replaced with epoch timestamp
|
||||||
String value5 = "1970-01-01 00:00:00";
|
ZonedDateTime t5 = ZonedDateTime.ofInstant(Instant.EPOCH, ZoneOffset.UTC);
|
||||||
ZonedDateTime t5 = java.sql.Timestamp.valueOf(value5).toInstant().atZone(ZoneId.systemDefault());
|
String isoString5 = ZonedTimestamp.toIsoString(t5, ZoneOffset.UTC, MySqlValueConverters::adjustTemporal);
|
||||||
String isoString5 = ZonedTimestamp.toIsoString(t5, ZoneId.systemDefault(), MySqlValueConverters::adjustTemporal);
|
|
||||||
assertThat(schemaJ.defaultValue()).isEqualTo(isoString5);
|
assertThat(schemaJ.defaultValue()).isEqualTo(isoString5);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user