DBZ-7920 Add support for timestamp infinity values for Db2
This commit is contained in:
parent
4fd96e86af
commit
e406ffd034
@ -128,6 +128,16 @@ public String getAlterTableColumnDelimiter() {
|
|||||||
return " ";
|
return " ";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getTimestampPositiveInfinityValue() {
|
||||||
|
return "9999-12-31T23:59:59+00:00";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getTimestampNegativeInfinityValue() {
|
||||||
|
return "0001-01-01T00:00:00+00:00";
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getUpsertStatement(TableDescriptor table, SinkRecordDescriptor record) {
|
public String getUpsertStatement(TableDescriptor table, SinkRecordDescriptor record) {
|
||||||
final SqlStatementBuilder builder = new SqlStatementBuilder();
|
final SqlStatementBuilder builder = new SqlStatementBuilder();
|
||||||
|
@ -9,12 +9,7 @@
|
|||||||
import java.time.ZonedDateTime;
|
import java.time.ZonedDateTime;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.kafka.connect.data.Schema;
|
|
||||||
import org.apache.kafka.connect.errors.ConnectException;
|
|
||||||
|
|
||||||
import io.debezium.connector.jdbc.ValueBindDescriptor;
|
import io.debezium.connector.jdbc.ValueBindDescriptor;
|
||||||
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
|
|
||||||
import io.debezium.connector.jdbc.type.AbstractTimestampType;
|
|
||||||
import io.debezium.connector.jdbc.type.Type;
|
import io.debezium.connector.jdbc.type.Type;
|
||||||
import io.debezium.time.ZonedTimestamp;
|
import io.debezium.time.ZonedTimestamp;
|
||||||
|
|
||||||
@ -23,34 +18,28 @@
|
|||||||
*
|
*
|
||||||
* @author Chris Cranford
|
* @author Chris Cranford
|
||||||
*/
|
*/
|
||||||
public class ZonedTimestampType extends AbstractTimestampType {
|
public class ZonedTimestampType extends io.debezium.connector.jdbc.type.debezium.ZonedTimestampType {
|
||||||
|
|
||||||
public static final ZonedTimestampType INSTANCE = new ZonedTimestampType();
|
public static final ZonedTimestampType INSTANCE = new ZonedTimestampType();
|
||||||
|
|
||||||
@Override
|
protected List<ValueBindDescriptor> infinityTimestampValue(int index, Object value) {
|
||||||
public String[] getRegistrationKeys() {
|
final ZonedDateTime zdt;
|
||||||
return new String[]{ ZonedTimestamp.SCHEMA_NAME };
|
|
||||||
|
if (POSITIVE_INFINITY.equals(value)) {
|
||||||
|
zdt = ZonedDateTime.parse(getDialect().getTimestampPositiveInfinityValue(), ZonedTimestamp.FORMATTER);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
zdt = ZonedDateTime.parse(getDialect().getTimestampNegativeInfinityValue(), ZonedTimestamp.FORMATTER);
|
||||||
|
}
|
||||||
|
|
||||||
|
return List.of(new ValueBindDescriptor(index, Timestamp.valueOf((zdt.toLocalDateTime()))));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getDefaultValueBinding(DatabaseDialect dialect, Schema schema, Object value) {
|
protected List<ValueBindDescriptor> normalTimestampValue(int index, Object value) {
|
||||||
return dialect.getFormattedTimestampWithTimeZone((String) value);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
final ZonedDateTime zdt = ZonedDateTime.parse((String) value, ZonedTimestamp.FORMATTER).withZoneSameInstant(getDatabaseTimeZone().toZoneId());
|
||||||
public List<ValueBindDescriptor> bind(int index, Schema schema, Object value) {
|
|
||||||
|
|
||||||
if (value == null) {
|
return List.of(new ValueBindDescriptor(index, Timestamp.from(zdt.toInstant())));
|
||||||
return List.of(new ValueBindDescriptor(index, null));
|
|
||||||
}
|
|
||||||
if (value instanceof String) {
|
|
||||||
|
|
||||||
final ZonedDateTime zdt = ZonedDateTime.parse((String) value, ZonedTimestamp.FORMATTER).withZoneSameInstant(getDatabaseTimeZone().toZoneId());
|
|
||||||
|
|
||||||
return List.of(new ValueBindDescriptor(index, Timestamp.from(zdt.toInstant())));
|
|
||||||
}
|
|
||||||
|
|
||||||
throw new ConnectException(String.format("Unexpected %s value '%s' with type '%s'", getClass().getSimpleName(),
|
|
||||||
value, value.getClass().getName()));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2593,7 +2593,7 @@ public void testTimestampWithTimeZoneDataTypeWithInfinityValue(Source source, Si
|
|||||||
private static @NotNull List<ZonedDateTime> getExpectedZonedDateTimes(Sink sink) {
|
private static @NotNull List<ZonedDateTime> getExpectedZonedDateTimes(Sink sink) {
|
||||||
|
|
||||||
List<ZonedDateTime> expectedValues = List.of();
|
List<ZonedDateTime> expectedValues = List.of();
|
||||||
if (sink.getType().is(SinkType.SQLSERVER)) {
|
if (sink.getType().is(SinkType.SQLSERVER) && sink.getType().is(SinkType.DB2)) {
|
||||||
|
|
||||||
expectedValues = List.of(ZonedDateTime.of(1, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC),
|
expectedValues = List.of(ZonedDateTime.of(1, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC),
|
||||||
ZonedDateTime.of(9999, 12, 31, 23, 59, 59, 0, ZoneOffset.UTC));
|
ZonedDateTime.of(9999, 12, 31, 23, 59, 59, 0, ZoneOffset.UTC));
|
||||||
|
@ -28,4 +28,12 @@
|
|||||||
<appender-ref ref="CONSOLE" />
|
<appender-ref ref="CONSOLE" />
|
||||||
</logger>
|
</logger>
|
||||||
|
|
||||||
|
<!-- To enable Hibernate loggin set lebel to debug -->
|
||||||
|
<logger name="org.hibernate.SQL" level="off">
|
||||||
|
<appender-ref ref="CONSOLE" />
|
||||||
|
</logger>
|
||||||
|
<!-- To enable Hibernate loggin set lebel to trace -->
|
||||||
|
<logger name="org.hibernate.type" level="off">
|
||||||
|
<appender-ref ref="CONSOLE" />
|
||||||
|
</logger>
|
||||||
</configuration>
|
</configuration>
|
||||||
|
Loading…
Reference in New Issue
Block a user