DBZ-578 Removing unnecessary "server.zone.offset" option;
This reverts parts of the previous work done for DBZ-578. TIMESTAMP columns (logical date + time without time zone) are always interpreted using UTC again.
This commit is contained in:
parent
f5629d9f0b
commit
987e94cfbb
@ -7,7 +7,6 @@
|
||||
package io.debezium.connector.postgresql;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.time.ZoneOffset;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@ -652,15 +651,6 @@ public String getPostgresPluginName() {
|
||||
+ "'string' uses string to represent values (including the special ones like NaN or Infinity); "
|
||||
+ "'double' represents values using Java's 'double', which may not offer the precision but will be far easier to use in consumers.");
|
||||
|
||||
public static final Field SERVER_ZONE_OFFSET = Field.create("server.zone.offset")
|
||||
.withDisplayName("Timezone offset of database server")
|
||||
.withType(Type.STRING)
|
||||
.withWidth(Width.SHORT)
|
||||
.withImportance(Importance.MEDIUM)
|
||||
.withValidation(Field::isZoneOffset)
|
||||
.withDescription("Specifies timezone offset of the timezone where database server is located. "
|
||||
+ "This value is used to create timestamps without timezones to the value as defined by server timezone.");
|
||||
|
||||
public static final Field STATUS_UPDATE_INTERVAL_MS = Field.create("status.update.interval.ms")
|
||||
.withDisplayName("Status update interval (ms)")
|
||||
.withType(Type.INT) // Postgres doesn't accept long for this value
|
||||
@ -710,8 +700,7 @@ public String getPostgresPluginName() {
|
||||
SSL_MODE, SSL_CLIENT_CERT, SSL_CLIENT_KEY_PASSWORD,
|
||||
SSL_ROOT_CERT, SSL_CLIENT_KEY, SNAPSHOT_LOCK_TIMEOUT_MS, ROWS_FETCH_SIZE, SSL_SOCKET_FACTORY,
|
||||
STATUS_UPDATE_INTERVAL_MS, TCP_KEEPALIVE, INCLUDE_UNKNOWN_DATATYPES,
|
||||
SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, CommonConnectorConfig.TOMBSTONES_ON_DELETE,
|
||||
SERVER_ZONE_OFFSET);
|
||||
SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, CommonConnectorConfig.TOMBSTONES_ON_DELETE);
|
||||
|
||||
private final Configuration config;
|
||||
private final String serverName;
|
||||
@ -763,10 +752,6 @@ protected Integer statusUpdateIntervalMillis() {
|
||||
return config.getInteger(STATUS_UPDATE_INTERVAL_MS, null);
|
||||
}
|
||||
|
||||
protected ZoneOffset serverZoneOffset() {
|
||||
return ZoneOffset.of(config.getString(SERVER_ZONE_OFFSET, "Z"));
|
||||
}
|
||||
|
||||
protected TemporalPrecisionMode temporalPrecisionMode() {
|
||||
return temporalPrecisionMode;
|
||||
}
|
||||
@ -853,7 +838,7 @@ protected static ConfigDef configDef() {
|
||||
DROP_SLOT_ON_STOP, SSL_SOCKET_FACTORY, STATUS_UPDATE_INTERVAL_MS, TCP_KEEPALIVE);
|
||||
Field.group(config, "Events", SCHEMA_WHITELIST, SCHEMA_BLACKLIST, TABLE_WHITELIST, TABLE_BLACKLIST,
|
||||
COLUMN_BLACKLIST, INCLUDE_UNKNOWN_DATATYPES, SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE,
|
||||
CommonConnectorConfig.TOMBSTONES_ON_DELETE, SERVER_ZONE_OFFSET);
|
||||
CommonConnectorConfig.TOMBSTONES_ON_DELETE);
|
||||
Field.group(config, "Connector", TOPIC_SELECTION_STRATEGY, CommonConnectorConfig.POLL_INTERVAL_MS, CommonConnectorConfig.MAX_BATCH_SIZE, CommonConnectorConfig.MAX_QUEUE_SIZE,
|
||||
SNAPSHOT_MODE, SNAPSHOT_LOCK_TIMEOUT_MS, TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE, ROWS_FETCH_SIZE);
|
||||
return config;
|
||||
|
@ -65,7 +65,7 @@ protected PostgresSchema(PostgresConnectorConfig config, TypeRegistry typeRegist
|
||||
this.topicSelector = topicSelector;
|
||||
|
||||
this.valueConverter = new PostgresValueConverter(config.decimalHandlingMode(), config.temporalPrecisionMode(),
|
||||
ZoneOffset.UTC, null, config.includeUnknownDatatypes(), typeRegistry, config.serverZoneOffset());
|
||||
ZoneOffset.UTC, null, config.includeUnknownDatatypes(), typeRegistry);
|
||||
this.schemaNameAdjuster = SchemaNameAdjuster.create(LOGGER);
|
||||
this.schemaBuilder = new TableSchemaBuilder(valueConverter, this.schemaNameAdjuster, SourceInfo.SCHEMA);
|
||||
|
||||
|
@ -60,7 +60,6 @@ protected ReplicationConnection createReplicationConnection() throws SQLExceptio
|
||||
.dropSlotOnClose(config.dropSlotOnStop())
|
||||
.statusUpdateIntervalMillis(config.statusUpdateIntervalMillis())
|
||||
.withTypeRegistry(schema.getTypeRegistry())
|
||||
.withServerTimezone(config.serverZoneOffset())
|
||||
.build();
|
||||
}
|
||||
|
||||
|
@ -16,7 +16,6 @@
|
||||
import java.time.LocalTime;
|
||||
import java.time.OffsetDateTime;
|
||||
import java.time.OffsetTime;
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZoneOffset;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
@ -89,18 +88,12 @@ public class PostgresValueConverter extends JdbcValueConverters {
|
||||
*/
|
||||
private final boolean includeUnknownDatatypes;
|
||||
|
||||
/**
|
||||
* Offset of timezone used by the database server
|
||||
*/
|
||||
private final ZoneOffset serverZoneOffset;
|
||||
|
||||
private final TypeRegistry typeRegistry;
|
||||
|
||||
protected PostgresValueConverter(DecimalMode decimalMode, TemporalPrecisionMode temporalPrecisionMode, ZoneOffset defaultOffset, BigIntUnsignedMode bigIntUnsignedMode, boolean includeUnknownDatatypes, TypeRegistry typeRegistry, ZoneOffset serverZoneOffset) {
|
||||
protected PostgresValueConverter(DecimalMode decimalMode, TemporalPrecisionMode temporalPrecisionMode, ZoneOffset defaultOffset, BigIntUnsignedMode bigIntUnsignedMode, boolean includeUnknownDatatypes, TypeRegistry typeRegistry) {
|
||||
super(decimalMode, temporalPrecisionMode, defaultOffset, null, bigIntUnsignedMode);
|
||||
this.includeUnknownDatatypes = includeUnknownDatatypes;
|
||||
this.typeRegistry = typeRegistry;
|
||||
this.serverZoneOffset = serverZoneOffset;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -221,7 +214,7 @@ public ValueConverter converter(Column column, Field fieldDefn) {
|
||||
case PgOid.INTERVAL:
|
||||
return data -> convertInterval(column, fieldDefn, data);
|
||||
case PgOid.TIMESTAMP:
|
||||
return ((ValueConverter)(data-> convertTimestampToUTC(column, fieldDefn, data))).and(super.converter(column, fieldDefn));
|
||||
return ((ValueConverter)(data-> convertTimestampToLocalDateTime(column, fieldDefn, data))).and(super.converter(column, fieldDefn));
|
||||
case PgOid.TIMESTAMPTZ:
|
||||
return data -> convertTimestampWithZone(column, fieldDefn, data);
|
||||
case PgOid.TIMETZ:
|
||||
@ -627,7 +620,7 @@ public static Optional<SpecialValueDecimal> toSpecialValue(String value) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
protected Object convertTimestampToUTC(Column column, Field fieldDefn, Object data) {
|
||||
protected Object convertTimestampToLocalDateTime(Column column, Field fieldDefn, Object data) {
|
||||
if (data == null) {
|
||||
return null;
|
||||
}
|
||||
@ -635,10 +628,8 @@ protected Object convertTimestampToUTC(Column column, Field fieldDefn, Object da
|
||||
return data;
|
||||
}
|
||||
final Timestamp timestamp = (Timestamp) data;
|
||||
final LocalDateTime serverLocalTime = timestamp.toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime();
|
||||
final LocalDateTime utcTime = LocalDateTime
|
||||
.ofInstant(serverLocalTime.atOffset(serverZoneOffset).toInstant(), ZoneOffset.UTC);
|
||||
return utcTime;
|
||||
|
||||
return timestamp.toLocalDateTime();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -8,7 +8,6 @@
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.sql.SQLException;
|
||||
import java.time.ZoneOffset;
|
||||
|
||||
import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
|
||||
|
||||
@ -30,9 +29,8 @@ public interface MessageDecoder {
|
||||
* @param buffer - binary representation of replication message
|
||||
* @param processor - message processing on arrival
|
||||
* @param typeRegistry - registry with known types
|
||||
* @param serverTimezone - a timezone in which the database server is located
|
||||
*/
|
||||
void processMessage(final ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry, ZoneOffset serverTimezone) throws SQLException, InterruptedException;
|
||||
void processMessage(final ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException;
|
||||
|
||||
/**
|
||||
* Allows MessageDecoder to configure options with which the replication stream is started.
|
||||
|
@ -10,7 +10,6 @@
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.SQLWarning;
|
||||
import java.time.ZoneOffset;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Function;
|
||||
@ -46,7 +45,6 @@ public class PostgresReplicationConnection extends JdbcConnection implements Rep
|
||||
private final Integer statusUpdateIntervalMillis;
|
||||
private final MessageDecoder messageDecoder;
|
||||
private final TypeRegistry typeRegistry;
|
||||
private final ZoneOffset serverTimezone;
|
||||
|
||||
private long defaultStartingPos;
|
||||
|
||||
@ -66,8 +64,7 @@ private PostgresReplicationConnection(Configuration config,
|
||||
PostgresConnectorConfig.LogicalDecoder plugin,
|
||||
boolean dropSlotOnClose,
|
||||
Integer statusUpdateIntervalMillis,
|
||||
TypeRegistry typeRegistry,
|
||||
ZoneOffset serverTimezone) {
|
||||
TypeRegistry typeRegistry) {
|
||||
super(config, PostgresConnection.FACTORY, null ,PostgresReplicationConnection::defaultSettings);
|
||||
|
||||
this.originalConfig = config;
|
||||
@ -77,7 +74,6 @@ private PostgresReplicationConnection(Configuration config,
|
||||
this.statusUpdateIntervalMillis = statusUpdateIntervalMillis;
|
||||
this.messageDecoder = plugin.messageDecoder();
|
||||
this.typeRegistry = typeRegistry;
|
||||
this.serverTimezone = serverTimezone;
|
||||
|
||||
try {
|
||||
initReplicationSlot();
|
||||
@ -219,7 +215,7 @@ public void readPending(ReplicationMessageProcessor processor) throws SQLExcepti
|
||||
|
||||
private void deserializeMessages(ByteBuffer buffer, ReplicationMessageProcessor processor) throws SQLException, InterruptedException {
|
||||
lastReceivedLSN = stream.getLastReceiveLSN();
|
||||
messageDecoder.processMessage(buffer, processor, typeRegistry, serverTimezone);
|
||||
messageDecoder.processMessage(buffer, processor, typeRegistry);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -328,7 +324,6 @@ protected static class ReplicationConnectionBuilder implements Builder {
|
||||
private boolean dropSlotOnClose = DEFAULT_DROP_SLOT_ON_CLOSE;
|
||||
private Integer statusUpdateIntervalMillis;
|
||||
private TypeRegistry typeRegistry;
|
||||
private ZoneOffset serverTimezone;
|
||||
|
||||
protected ReplicationConnectionBuilder(Configuration config) {
|
||||
assert config != null;
|
||||
@ -364,7 +359,7 @@ public ReplicationConnectionBuilder statusUpdateIntervalMillis(final Integer sta
|
||||
@Override
|
||||
public ReplicationConnection build() {
|
||||
assert plugin != null : "Decoding plugin name is not set";
|
||||
return new PostgresReplicationConnection(config, slotName, plugin, dropSlotOnClose, statusUpdateIntervalMillis, typeRegistry, serverTimezone);
|
||||
return new PostgresReplicationConnection(config, slotName, plugin, dropSlotOnClose, statusUpdateIntervalMillis, typeRegistry);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -372,11 +367,5 @@ public Builder withTypeRegistry(TypeRegistry typeRegistry) {
|
||||
this.typeRegistry = typeRegistry;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Builder withServerTimezone(ZoneOffset serverTimezone) {
|
||||
this.serverTimezone = serverTimezone;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -7,7 +7,6 @@
|
||||
package io.debezium.connector.postgresql.connection;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.time.ZoneOffset;
|
||||
|
||||
import org.postgresql.replication.LogSequenceNumber;
|
||||
import org.postgresql.replication.PGReplicationStream;
|
||||
@ -130,13 +129,10 @@ interface Builder {
|
||||
|
||||
Builder withTypeRegistry(TypeRegistry typeRegistry);
|
||||
|
||||
Builder withServerTimezone(ZoneOffset serverTimezone);
|
||||
|
||||
/**
|
||||
* Creates a new {@link ReplicationConnection} instance
|
||||
* @return a connection, never null
|
||||
*/
|
||||
ReplicationConnection build();
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -7,7 +7,6 @@
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.sql.SQLException;
|
||||
import java.time.ZoneOffset;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.kafka.connect.errors.ConnectException;
|
||||
@ -31,7 +30,7 @@
|
||||
public class PgProtoMessageDecoder implements MessageDecoder {
|
||||
|
||||
@Override
|
||||
public void processMessage(final ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry, ZoneOffset serverTimezone) throws SQLException, InterruptedException {
|
||||
public void processMessage(final ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException {
|
||||
try {
|
||||
if (!buffer.hasArray()) {
|
||||
throw new IllegalStateException(
|
||||
@ -46,7 +45,7 @@ public void processMessage(final ByteBuffer buffer, ReplicationMessageProcessor
|
||||
message.getNewTupleCount(),
|
||||
message.getNewTypeinfoCount()));
|
||||
}
|
||||
processor.process(new PgProtoReplicationMessage(message, typeRegistry, serverTimezone));
|
||||
processor.process(new PgProtoReplicationMessage(message, typeRegistry));
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
throw new ConnectException(e);
|
||||
}
|
||||
|
@ -9,7 +9,6 @@
|
||||
import java.math.BigDecimal;
|
||||
import java.nio.charset.Charset;
|
||||
import java.sql.SQLException;
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneOffset;
|
||||
import java.util.Arrays;
|
||||
@ -48,12 +47,10 @@ class PgProtoReplicationMessage implements ReplicationMessage {
|
||||
|
||||
private final PgProto.RowMessage rawMessage;
|
||||
private final TypeRegistry typeRegistry;
|
||||
private final ZoneOffset serverTimezone;
|
||||
|
||||
public PgProtoReplicationMessage(PgProto.RowMessage rawMessage, TypeRegistry typeRegistry, ZoneOffset serverTimezone) {
|
||||
public PgProtoReplicationMessage(PgProto.RowMessage rawMessage, TypeRegistry typeRegistry) {
|
||||
this.rawMessage = rawMessage;
|
||||
this.typeRegistry = typeRegistry;
|
||||
this.serverTimezone = serverTimezone;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -181,8 +178,7 @@ else if (datumMessage.hasDatumString()) {
|
||||
// these types are sent by the plugin as LONG - microseconds since Unix Epoch
|
||||
// but we'll convert them to nanos which is the smallest unit
|
||||
final LocalDateTime serverLocal = Conversions.toLocalDateTimeUTC(datumMessage.getDatumInt64());
|
||||
final Instant utc = serverLocal.atOffset(serverTimezone).toInstant();
|
||||
return Conversions.toEpochNanos(utc);
|
||||
return Conversions.toEpochNanos(serverLocal.toInstant(ZoneOffset.UTC));
|
||||
case PgOid.TIMESTAMPTZ:
|
||||
case PgOid.TIME:
|
||||
if (!datumMessage.hasDatumInt64()) {
|
||||
|
@ -8,7 +8,6 @@
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.sql.SQLException;
|
||||
import java.time.ZoneOffset;
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
|
||||
@ -43,7 +42,7 @@ public class NonStreamingWal2JsonMessageDecoder implements MessageDecoder {
|
||||
private boolean containsMetadata = false;
|
||||
|
||||
@Override
|
||||
public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry, ZoneOffset serverTimezone) throws SQLException, InterruptedException {
|
||||
public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException {
|
||||
try {
|
||||
if (!buffer.hasArray()) {
|
||||
throw new IllegalStateException("Invalid buffer received from PG server during streaming replication");
|
||||
@ -60,7 +59,7 @@ public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor proces
|
||||
Iterator<Entry> it = changes.iterator();
|
||||
while (it.hasNext()) {
|
||||
Value value = it.next().getValue();
|
||||
processor.process(new Wal2JsonReplicationMessage(txId, commitTime, value.asDocument(), containsMetadata, !it.hasNext(), typeRegistry, serverTimezone));
|
||||
processor.process(new Wal2JsonReplicationMessage(txId, commitTime, value.asDocument(), containsMetadata, !it.hasNext(), typeRegistry));
|
||||
}
|
||||
} catch (final IOException e) {
|
||||
throw new ConnectException(e);
|
||||
|
@ -8,7 +8,6 @@
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.sql.SQLException;
|
||||
import java.time.ZoneOffset;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.kafka.connect.errors.ConnectException;
|
||||
@ -109,7 +108,7 @@ public class StreamingWal2JsonMessageDecoder implements MessageDecoder {
|
||||
private long commitTime;
|
||||
|
||||
@Override
|
||||
public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry, ZoneOffset serverTimezone) throws SQLException, InterruptedException {
|
||||
public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException {
|
||||
try {
|
||||
if (!buffer.hasArray()) {
|
||||
throw new IllegalStateException("Invalid buffer received from PG server during streaming replication");
|
||||
@ -129,7 +128,7 @@ public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor proces
|
||||
byte firstChar = getFirstNonWhiteChar(content);
|
||||
if (firstChar != LEFT_BRACE) {
|
||||
outOfOrderChunk(content);
|
||||
nonInitialChunk(processor, typeRegistry, content, serverTimezone);
|
||||
nonInitialChunk(processor, typeRegistry, content);
|
||||
}
|
||||
else {
|
||||
// We received the beginning of a transaction
|
||||
@ -142,7 +141,7 @@ public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor proces
|
||||
if (message.has("kind")) {
|
||||
// This is not a preamble but out-of-order change chunk
|
||||
outOfOrderChunk(content);
|
||||
nonInitialChunk(processor, typeRegistry, content, serverTimezone);
|
||||
nonInitialChunk(processor, typeRegistry, content);
|
||||
}
|
||||
else {
|
||||
// Correct initial chunk
|
||||
@ -155,7 +154,7 @@ public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor proces
|
||||
}
|
||||
}
|
||||
else {
|
||||
nonInitialChunk(processor, typeRegistry, content, serverTimezone);
|
||||
nonInitialChunk(processor, typeRegistry, content);
|
||||
}
|
||||
}
|
||||
catch (final IOException e) {
|
||||
@ -164,7 +163,7 @@ public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor proces
|
||||
}
|
||||
|
||||
protected void nonInitialChunk(ReplicationMessageProcessor processor, TypeRegistry typeRegistry,
|
||||
final byte[] content, ZoneOffset serverTimezone) throws IOException, SQLException, InterruptedException {
|
||||
final byte[] content) throws IOException, SQLException, InterruptedException {
|
||||
byte firstChar = getFirstNonWhiteChar(content);
|
||||
// We are receiving changes in chunks
|
||||
if (firstChar == LEFT_BRACE) {
|
||||
@ -174,7 +173,7 @@ protected void nonInitialChunk(ReplicationMessageProcessor processor, TypeRegist
|
||||
else if (firstChar == COMMA) {
|
||||
// following changes, they have an extra comma at the start of message
|
||||
if (currentChunk != null) {
|
||||
doProcessMessage(processor, typeRegistry, currentChunk, serverTimezone, false);
|
||||
doProcessMessage(processor, typeRegistry, currentChunk, false);
|
||||
}
|
||||
replaceFirstNonWhiteChar(content, SPACE);
|
||||
currentChunk = content;
|
||||
@ -182,7 +181,7 @@ else if (firstChar == COMMA) {
|
||||
else if (firstChar == RIGHT_BRACKET) {
|
||||
// No more changes
|
||||
if (currentChunk != null) {
|
||||
doProcessMessage(processor, typeRegistry, currentChunk, serverTimezone, true);
|
||||
doProcessMessage(processor, typeRegistry, currentChunk, true);
|
||||
}
|
||||
messageInProgress = false;
|
||||
}
|
||||
@ -235,12 +234,12 @@ private boolean isWhitespace(byte c) {
|
||||
return (c >= TAB && c <= CR) || c == SPACE;
|
||||
}
|
||||
|
||||
private void doProcessMessage(ReplicationMessageProcessor processor, TypeRegistry typeRegistry, byte[] content, ZoneOffset serverTimezone, boolean lastMessage)
|
||||
private void doProcessMessage(ReplicationMessageProcessor processor, TypeRegistry typeRegistry, byte[] content, boolean lastMessage)
|
||||
throws IOException, SQLException, InterruptedException {
|
||||
final Document change = DocumentReader.floatNumbersAsTextReader().read(content);
|
||||
|
||||
LOGGER.trace("Change arrived for decoding {}", change);
|
||||
processor.process(new Wal2JsonReplicationMessage(txId, commitTime, change, containsMetadata, lastMessage, typeRegistry, serverTimezone));
|
||||
processor.process(new Wal2JsonReplicationMessage(txId, commitTime, change, containsMetadata, lastMessage, typeRegistry));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -8,7 +8,6 @@
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.sql.SQLException;
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneOffset;
|
||||
import java.util.ArrayList;
|
||||
@ -59,16 +58,14 @@ class Wal2JsonReplicationMessage implements ReplicationMessage {
|
||||
private final boolean hasMetadata;
|
||||
private final boolean lastEventForLsn;
|
||||
private final TypeRegistry typeRegistry;
|
||||
private final ZoneOffset serverTimezone;
|
||||
|
||||
public Wal2JsonReplicationMessage(long txId, long commitTime, Document rawMessage, boolean hasMetadata, boolean lastEventForLsn, TypeRegistry typeRegistry, ZoneOffset serverTimezone) {
|
||||
public Wal2JsonReplicationMessage(long txId, long commitTime, Document rawMessage, boolean hasMetadata, boolean lastEventForLsn, TypeRegistry typeRegistry) {
|
||||
this.txId = txId;
|
||||
this.commitTime = commitTime;
|
||||
this.rawMessage = rawMessage;
|
||||
this.hasMetadata = hasMetadata;
|
||||
this.lastEventForLsn = lastEventForLsn;
|
||||
this.typeRegistry = typeRegistry;
|
||||
this.serverTimezone = serverTimezone;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -260,8 +257,7 @@ else if (rawValue.isBigInteger()) {
|
||||
case "timestamp":
|
||||
case "timestamp without time zone":
|
||||
final LocalDateTime serverLocal = Conversions.fromNanosToLocalDateTimeUTC(DateTimeFormat.get().timestamp(rawValue.asString()));
|
||||
final Instant utc = serverLocal.atOffset(serverTimezone).toInstant();
|
||||
return Conversions.toEpochNanos(utc);
|
||||
return Conversions.toEpochNanos(serverLocal.toInstant(ZoneOffset.UTC));
|
||||
|
||||
case "time":
|
||||
case "time without time zone":
|
||||
|
@ -297,9 +297,9 @@ protected List<SchemaAndValueField> schemaAndValuesForBinTypes() {
|
||||
}
|
||||
|
||||
protected List<SchemaAndValueField> schemaAndValuesForDateTimeTypes() {
|
||||
long expectedTs = MicroTimestamp.toEpochMicros(LocalDateTime.parse("2016-11-05T00:51:30.123456"), null); // Need to compare it with value in UTC
|
||||
long expectedTsMs = Timestamp.toEpochMillis(LocalDateTime.parse("2016-11-05T00:51:30.123456"), null); // Need to compare it with value in UTC
|
||||
long expectedNegTs = MicroTimestamp.toEpochMicros(LocalDateTime.parse("1936-10-26T09:10:12.608"), null); // Need to compare it with value in UTC
|
||||
long expectedTs = MicroTimestamp.toEpochMicros(LocalDateTime.parse("2016-11-04T13:51:30.123456"), null);
|
||||
long expectedTsMs = Timestamp.toEpochMillis(LocalDateTime.parse("2016-11-04T13:51:30.123456"), null);
|
||||
long expectedNegTs = MicroTimestamp.toEpochMicros(LocalDateTime.parse("1936-10-25T22:10:12.608"), null);
|
||||
String expectedTz = "2016-11-04T11:51:30Z"; //timestamp is stored with TZ, should be read back with UTC
|
||||
int expectedDate = Date.toEpochDay(LocalDate.parse("2016-11-04"), null);
|
||||
long expectedTi = LocalTime.parse("13:51:30").toNanoOfDay() / 1_000;
|
||||
@ -322,9 +322,9 @@ protected List<SchemaAndValueField> schemaAndValuesForDateTimeTypes() {
|
||||
}
|
||||
|
||||
protected List<SchemaAndValueField> schemaAndValuesForDateTimeTypesAdaptiveTimeMicroseconds() {
|
||||
long expectedTs = MicroTimestamp.toEpochMicros(LocalDateTime.parse("2016-11-05T00:51:30.123456"), null); // Need to compare it with value in UTC
|
||||
long expectedTsMs = Timestamp.toEpochMillis(LocalDateTime.parse("2016-11-05T00:51:30.123456"), null); // Need to compare it with value in UTC
|
||||
long expectedNegTs = MicroTimestamp.toEpochMicros(LocalDateTime.parse("1936-10-26T09:10:12.608"), null); // Need to compare it with value in UTC
|
||||
long expectedTs = MicroTimestamp.toEpochMicros(LocalDateTime.parse("2016-11-04T13:51:30.123456"), null);
|
||||
long expectedTsMs = Timestamp.toEpochMillis(LocalDateTime.parse("2016-11-04T13:51:30.123456"), null);
|
||||
long expectedNegTs = MicroTimestamp.toEpochMicros(LocalDateTime.parse("1936-10-25T22:10:12.608"), null);
|
||||
String expectedTz = "2016-11-04T11:51:30Z"; //timestamp is stored with TZ, should be read back with UTC
|
||||
int expectedDate = Date.toEpochDay(LocalDate.parse("2016-11-04"), null);
|
||||
long expectedTi = LocalTime.parse("13:51:30").toNanoOfDay() / 1_000;
|
||||
|
@ -46,7 +46,6 @@ public void before() throws Exception {
|
||||
TestHelper.executeDDL("postgis_create_tables.ddl");
|
||||
|
||||
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig()
|
||||
.with(PostgresConnectorConfig.SERVER_ZONE_OFFSET, TestHelper.databaseTimeZone())
|
||||
.build());
|
||||
TopicSelector selector = TopicSelector.create(config);
|
||||
context = new PostgresTaskContext(
|
||||
@ -182,7 +181,6 @@ public void shouldGenerateSnapshotsForDefaultDatatypesAdaptiveMicroseconds() thr
|
||||
PostgresConnectorConfig config = new PostgresConnectorConfig(
|
||||
TestHelper.defaultConfig()
|
||||
.with(PostgresConnectorConfig.TIME_PRECISION_MODE, TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS)
|
||||
.with(PostgresConnectorConfig.SERVER_ZONE_OFFSET, TestHelper.databaseTimeZone())
|
||||
.build());
|
||||
|
||||
TopicSelector selector = TopicSelector.create(config);
|
||||
|
@ -65,7 +65,6 @@ public void before() throws Exception {
|
||||
TestHelper.execute(statements);
|
||||
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig()
|
||||
.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true)
|
||||
.with(PostgresConnectorConfig.SERVER_ZONE_OFFSET, TestHelper.databaseTimeZone())
|
||||
.build());
|
||||
setupRecordsProducer(config);
|
||||
}
|
||||
|
@ -13,7 +13,6 @@
|
||||
import java.nio.file.Paths;
|
||||
import java.sql.Connection;
|
||||
import java.sql.SQLException;
|
||||
import java.time.ZoneOffset;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@ -53,7 +52,6 @@ public static ReplicationConnection createForReplication(String slotName, boolea
|
||||
.withSlot(slotName)
|
||||
.withTypeRegistry(getTypeRegistry())
|
||||
.dropSlotOnClose(dropOnClose)
|
||||
.withServerTimezone(databaseTimeZone())
|
||||
.build();
|
||||
}
|
||||
|
||||
@ -178,8 +176,4 @@ protected static boolean shouldSSLConnectionFail() {
|
||||
protected static int waitTimeForRecords() {
|
||||
return Integer.parseInt(System.getProperty(TEST_PROPERTY_PREFIX + "records.waittime", "2"));
|
||||
}
|
||||
|
||||
protected static ZoneOffset databaseTimeZone() {
|
||||
return ZoneOffset.of(System.getProperty(TEST_PROPERTY_PREFIX + "database.timeoffset", "-11:00"));
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user