From 987e94cfbb3cd278f2ed0cd0a01a51b4dbc1a5c1 Mon Sep 17 00:00:00 2001 From: Gunnar Morling Date: Tue, 19 Jun 2018 16:38:50 +0200 Subject: [PATCH] DBZ-578 Removing unnecessary "server.zone.offset" option; This reverts parts of the previous work done for DBZ-578. TIMESTAMP columns (logical date + time without time zone) are always interpreted using UTC again. --- .../postgresql/PostgresConnectorConfig.java | 19 ++----------------- .../connector/postgresql/PostgresSchema.java | 2 +- .../postgresql/PostgresTaskContext.java | 1 - .../postgresql/PostgresValueConverter.java | 19 +++++-------------- .../postgresql/connection/MessageDecoder.java | 4 +--- .../PostgresReplicationConnection.java | 17 +++-------------- .../connection/ReplicationConnection.java | 4 ---- .../pgproto/PgProtoMessageDecoder.java | 5 ++--- .../pgproto/PgProtoReplicationMessage.java | 8 ++------ .../NonStreamingWal2JsonMessageDecoder.java | 5 ++--- .../StreamingWal2JsonMessageDecoder.java | 19 +++++++++---------- .../wal2json/Wal2JsonReplicationMessage.java | 8 ++------ .../AbstractRecordsProducerTest.java | 12 ++++++------ .../postgresql/RecordsSnapshotProducerIT.java | 2 -- .../postgresql/RecordsStreamProducerIT.java | 1 - .../connector/postgresql/TestHelper.java | 6 ------ 16 files changed, 35 insertions(+), 97 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java index 6cea082a0..599d28032 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java @@ -7,7 +7,6 @@ package io.debezium.connector.postgresql; import java.math.BigDecimal; -import java.time.ZoneOffset; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -652,15 +651,6 @@ public String getPostgresPluginName() { + "'string' uses string to represent values (including the special ones like NaN or Infinity); " + "'double' represents values using Java's 'double', which may not offer the precision but will be far easier to use in consumers."); - public static final Field SERVER_ZONE_OFFSET = Field.create("server.zone.offset") - .withDisplayName("Timezone offset of database server") - .withType(Type.STRING) - .withWidth(Width.SHORT) - .withImportance(Importance.MEDIUM) - .withValidation(Field::isZoneOffset) - .withDescription("Specifies timezone offset of the timezone where database server is located. " - + "This value is used to create timestamps without timezones to the value as defined by server timezone."); - public static final Field STATUS_UPDATE_INTERVAL_MS = Field.create("status.update.interval.ms") .withDisplayName("Status update interval (ms)") .withType(Type.INT) // Postgres doesn't accept long for this value @@ -710,8 +700,7 @@ public String getPostgresPluginName() { SSL_MODE, SSL_CLIENT_CERT, SSL_CLIENT_KEY_PASSWORD, SSL_ROOT_CERT, SSL_CLIENT_KEY, SNAPSHOT_LOCK_TIMEOUT_MS, ROWS_FETCH_SIZE, SSL_SOCKET_FACTORY, STATUS_UPDATE_INTERVAL_MS, TCP_KEEPALIVE, INCLUDE_UNKNOWN_DATATYPES, - SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, CommonConnectorConfig.TOMBSTONES_ON_DELETE, - SERVER_ZONE_OFFSET); + SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, CommonConnectorConfig.TOMBSTONES_ON_DELETE); private final Configuration config; private final String serverName; @@ -763,10 +752,6 @@ protected Integer statusUpdateIntervalMillis() { return config.getInteger(STATUS_UPDATE_INTERVAL_MS, null); } - protected ZoneOffset serverZoneOffset() { - return ZoneOffset.of(config.getString(SERVER_ZONE_OFFSET, "Z")); - } - protected TemporalPrecisionMode temporalPrecisionMode() { return temporalPrecisionMode; } @@ -853,7 +838,7 @@ protected static ConfigDef configDef() { DROP_SLOT_ON_STOP, SSL_SOCKET_FACTORY, STATUS_UPDATE_INTERVAL_MS, TCP_KEEPALIVE); Field.group(config, "Events", SCHEMA_WHITELIST, SCHEMA_BLACKLIST, TABLE_WHITELIST, TABLE_BLACKLIST, COLUMN_BLACKLIST, INCLUDE_UNKNOWN_DATATYPES, SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, - CommonConnectorConfig.TOMBSTONES_ON_DELETE, SERVER_ZONE_OFFSET); + CommonConnectorConfig.TOMBSTONES_ON_DELETE); Field.group(config, "Connector", TOPIC_SELECTION_STRATEGY, CommonConnectorConfig.POLL_INTERVAL_MS, CommonConnectorConfig.MAX_BATCH_SIZE, CommonConnectorConfig.MAX_QUEUE_SIZE, SNAPSHOT_MODE, SNAPSHOT_LOCK_TIMEOUT_MS, TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE, ROWS_FETCH_SIZE); return config; diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java index c39a8cb15..6e9fc8379 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java @@ -65,7 +65,7 @@ protected PostgresSchema(PostgresConnectorConfig config, TypeRegistry typeRegist this.topicSelector = topicSelector; this.valueConverter = new PostgresValueConverter(config.decimalHandlingMode(), config.temporalPrecisionMode(), - ZoneOffset.UTC, null, config.includeUnknownDatatypes(), typeRegistry, config.serverZoneOffset()); + ZoneOffset.UTC, null, config.includeUnknownDatatypes(), typeRegistry); this.schemaNameAdjuster = SchemaNameAdjuster.create(LOGGER); this.schemaBuilder = new TableSchemaBuilder(valueConverter, this.schemaNameAdjuster, SourceInfo.SCHEMA); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java index 2ccfd7787..a2e007b82 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresTaskContext.java @@ -60,7 +60,6 @@ protected ReplicationConnection createReplicationConnection() throws SQLExceptio .dropSlotOnClose(config.dropSlotOnStop()) .statusUpdateIntervalMillis(config.statusUpdateIntervalMillis()) .withTypeRegistry(schema.getTypeRegistry()) - .withServerTimezone(config.serverZoneOffset()) .build(); } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java index de193eb61..1008d740e 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java @@ -16,7 +16,6 @@ import java.time.LocalTime; import java.time.OffsetDateTime; import java.time.OffsetTime; -import java.time.ZoneId; import java.time.ZoneOffset; import java.util.Collections; import java.util.Date; @@ -89,18 +88,12 @@ public class PostgresValueConverter extends JdbcValueConverters { */ private final boolean includeUnknownDatatypes; - /** - * Offset of timezone used by the database server - */ - private final ZoneOffset serverZoneOffset; - private final TypeRegistry typeRegistry; - protected PostgresValueConverter(DecimalMode decimalMode, TemporalPrecisionMode temporalPrecisionMode, ZoneOffset defaultOffset, BigIntUnsignedMode bigIntUnsignedMode, boolean includeUnknownDatatypes, TypeRegistry typeRegistry, ZoneOffset serverZoneOffset) { + protected PostgresValueConverter(DecimalMode decimalMode, TemporalPrecisionMode temporalPrecisionMode, ZoneOffset defaultOffset, BigIntUnsignedMode bigIntUnsignedMode, boolean includeUnknownDatatypes, TypeRegistry typeRegistry) { super(decimalMode, temporalPrecisionMode, defaultOffset, null, bigIntUnsignedMode); this.includeUnknownDatatypes = includeUnknownDatatypes; this.typeRegistry = typeRegistry; - this.serverZoneOffset = serverZoneOffset; } @Override @@ -221,7 +214,7 @@ public ValueConverter converter(Column column, Field fieldDefn) { case PgOid.INTERVAL: return data -> convertInterval(column, fieldDefn, data); case PgOid.TIMESTAMP: - return ((ValueConverter)(data-> convertTimestampToUTC(column, fieldDefn, data))).and(super.converter(column, fieldDefn)); + return ((ValueConverter)(data-> convertTimestampToLocalDateTime(column, fieldDefn, data))).and(super.converter(column, fieldDefn)); case PgOid.TIMESTAMPTZ: return data -> convertTimestampWithZone(column, fieldDefn, data); case PgOid.TIMETZ: @@ -627,7 +620,7 @@ public static Optional toSpecialValue(String value) { return Optional.empty(); } - protected Object convertTimestampToUTC(Column column, Field fieldDefn, Object data) { + protected Object convertTimestampToLocalDateTime(Column column, Field fieldDefn, Object data) { if (data == null) { return null; } @@ -635,10 +628,8 @@ protected Object convertTimestampToUTC(Column column, Field fieldDefn, Object da return data; } final Timestamp timestamp = (Timestamp) data; - final LocalDateTime serverLocalTime = timestamp.toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime(); - final LocalDateTime utcTime = LocalDateTime - .ofInstant(serverLocalTime.atOffset(serverZoneOffset).toInstant(), ZoneOffset.UTC); - return utcTime; + + return timestamp.toLocalDateTime(); } @Override diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/MessageDecoder.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/MessageDecoder.java index 5009551b2..76b206e23 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/MessageDecoder.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/MessageDecoder.java @@ -8,7 +8,6 @@ import java.nio.ByteBuffer; import java.sql.SQLException; -import java.time.ZoneOffset; import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder; @@ -30,9 +29,8 @@ public interface MessageDecoder { * @param buffer - binary representation of replication message * @param processor - message processing on arrival * @param typeRegistry - registry with known types - * @param serverTimezone - a timezone in which the database server is located */ - void processMessage(final ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry, ZoneOffset serverTimezone) throws SQLException, InterruptedException; + void processMessage(final ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException; /** * Allows MessageDecoder to configure options with which the replication stream is started. diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java index fc9bf54f8..f4b3d0eae 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java @@ -10,7 +10,6 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.SQLWarning; -import java.time.ZoneOffset; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; @@ -46,7 +45,6 @@ public class PostgresReplicationConnection extends JdbcConnection implements Rep private final Integer statusUpdateIntervalMillis; private final MessageDecoder messageDecoder; private final TypeRegistry typeRegistry; - private final ZoneOffset serverTimezone; private long defaultStartingPos; @@ -66,8 +64,7 @@ private PostgresReplicationConnection(Configuration config, PostgresConnectorConfig.LogicalDecoder plugin, boolean dropSlotOnClose, Integer statusUpdateIntervalMillis, - TypeRegistry typeRegistry, - ZoneOffset serverTimezone) { + TypeRegistry typeRegistry) { super(config, PostgresConnection.FACTORY, null ,PostgresReplicationConnection::defaultSettings); this.originalConfig = config; @@ -77,7 +74,6 @@ private PostgresReplicationConnection(Configuration config, this.statusUpdateIntervalMillis = statusUpdateIntervalMillis; this.messageDecoder = plugin.messageDecoder(); this.typeRegistry = typeRegistry; - this.serverTimezone = serverTimezone; try { initReplicationSlot(); @@ -219,7 +215,7 @@ public void readPending(ReplicationMessageProcessor processor) throws SQLExcepti private void deserializeMessages(ByteBuffer buffer, ReplicationMessageProcessor processor) throws SQLException, InterruptedException { lastReceivedLSN = stream.getLastReceiveLSN(); - messageDecoder.processMessage(buffer, processor, typeRegistry, serverTimezone); + messageDecoder.processMessage(buffer, processor, typeRegistry); } @Override @@ -328,7 +324,6 @@ protected static class ReplicationConnectionBuilder implements Builder { private boolean dropSlotOnClose = DEFAULT_DROP_SLOT_ON_CLOSE; private Integer statusUpdateIntervalMillis; private TypeRegistry typeRegistry; - private ZoneOffset serverTimezone; protected ReplicationConnectionBuilder(Configuration config) { assert config != null; @@ -364,7 +359,7 @@ public ReplicationConnectionBuilder statusUpdateIntervalMillis(final Integer sta @Override public ReplicationConnection build() { assert plugin != null : "Decoding plugin name is not set"; - return new PostgresReplicationConnection(config, slotName, plugin, dropSlotOnClose, statusUpdateIntervalMillis, typeRegistry, serverTimezone); + return new PostgresReplicationConnection(config, slotName, plugin, dropSlotOnClose, statusUpdateIntervalMillis, typeRegistry); } @Override @@ -372,11 +367,5 @@ public Builder withTypeRegistry(TypeRegistry typeRegistry) { this.typeRegistry = typeRegistry; return this; } - - @Override - public Builder withServerTimezone(ZoneOffset serverTimezone) { - this.serverTimezone = serverTimezone; - return this; - } } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationConnection.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationConnection.java index 6945f2c89..ec7b4143e 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationConnection.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationConnection.java @@ -7,7 +7,6 @@ package io.debezium.connector.postgresql.connection; import java.sql.SQLException; -import java.time.ZoneOffset; import org.postgresql.replication.LogSequenceNumber; import org.postgresql.replication.PGReplicationStream; @@ -130,13 +129,10 @@ interface Builder { Builder withTypeRegistry(TypeRegistry typeRegistry); - Builder withServerTimezone(ZoneOffset serverTimezone); - /** * Creates a new {@link ReplicationConnection} instance * @return a connection, never null */ ReplicationConnection build(); - } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgproto/PgProtoMessageDecoder.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgproto/PgProtoMessageDecoder.java index 96a75f2c4..f2f52aae6 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgproto/PgProtoMessageDecoder.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgproto/PgProtoMessageDecoder.java @@ -7,7 +7,6 @@ import java.nio.ByteBuffer; import java.sql.SQLException; -import java.time.ZoneOffset; import java.util.Arrays; import org.apache.kafka.connect.errors.ConnectException; @@ -31,7 +30,7 @@ public class PgProtoMessageDecoder implements MessageDecoder { @Override - public void processMessage(final ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry, ZoneOffset serverTimezone) throws SQLException, InterruptedException { + public void processMessage(final ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException { try { if (!buffer.hasArray()) { throw new IllegalStateException( @@ -46,7 +45,7 @@ public void processMessage(final ByteBuffer buffer, ReplicationMessageProcessor message.getNewTupleCount(), message.getNewTypeinfoCount())); } - processor.process(new PgProtoReplicationMessage(message, typeRegistry, serverTimezone)); + processor.process(new PgProtoReplicationMessage(message, typeRegistry)); } catch (InvalidProtocolBufferException e) { throw new ConnectException(e); } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgproto/PgProtoReplicationMessage.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgproto/PgProtoReplicationMessage.java index 2148a2f2c..23e483acb 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgproto/PgProtoReplicationMessage.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgproto/PgProtoReplicationMessage.java @@ -9,7 +9,6 @@ import java.math.BigDecimal; import java.nio.charset.Charset; import java.sql.SQLException; -import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.Arrays; @@ -48,12 +47,10 @@ class PgProtoReplicationMessage implements ReplicationMessage { private final PgProto.RowMessage rawMessage; private final TypeRegistry typeRegistry; - private final ZoneOffset serverTimezone; - public PgProtoReplicationMessage(PgProto.RowMessage rawMessage, TypeRegistry typeRegistry, ZoneOffset serverTimezone) { + public PgProtoReplicationMessage(PgProto.RowMessage rawMessage, TypeRegistry typeRegistry) { this.rawMessage = rawMessage; this.typeRegistry = typeRegistry; - this.serverTimezone = serverTimezone; } @Override @@ -181,8 +178,7 @@ else if (datumMessage.hasDatumString()) { // these types are sent by the plugin as LONG - microseconds since Unix Epoch // but we'll convert them to nanos which is the smallest unit final LocalDateTime serverLocal = Conversions.toLocalDateTimeUTC(datumMessage.getDatumInt64()); - final Instant utc = serverLocal.atOffset(serverTimezone).toInstant(); - return Conversions.toEpochNanos(utc); + return Conversions.toEpochNanos(serverLocal.toInstant(ZoneOffset.UTC)); case PgOid.TIMESTAMPTZ: case PgOid.TIME: if (!datumMessage.hasDatumInt64()) { diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/NonStreamingWal2JsonMessageDecoder.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/NonStreamingWal2JsonMessageDecoder.java index e61b6a7a9..cca9de178 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/NonStreamingWal2JsonMessageDecoder.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/NonStreamingWal2JsonMessageDecoder.java @@ -8,7 +8,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.sql.SQLException; -import java.time.ZoneOffset; import java.util.Arrays; import java.util.Iterator; @@ -43,7 +42,7 @@ public class NonStreamingWal2JsonMessageDecoder implements MessageDecoder { private boolean containsMetadata = false; @Override - public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry, ZoneOffset serverTimezone) throws SQLException, InterruptedException { + public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException { try { if (!buffer.hasArray()) { throw new IllegalStateException("Invalid buffer received from PG server during streaming replication"); @@ -60,7 +59,7 @@ public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor proces Iterator it = changes.iterator(); while (it.hasNext()) { Value value = it.next().getValue(); - processor.process(new Wal2JsonReplicationMessage(txId, commitTime, value.asDocument(), containsMetadata, !it.hasNext(), typeRegistry, serverTimezone)); + processor.process(new Wal2JsonReplicationMessage(txId, commitTime, value.asDocument(), containsMetadata, !it.hasNext(), typeRegistry)); } } catch (final IOException e) { throw new ConnectException(e); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/StreamingWal2JsonMessageDecoder.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/StreamingWal2JsonMessageDecoder.java index 9cce7f653..b13836513 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/StreamingWal2JsonMessageDecoder.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/StreamingWal2JsonMessageDecoder.java @@ -8,7 +8,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.sql.SQLException; -import java.time.ZoneOffset; import java.util.Arrays; import org.apache.kafka.connect.errors.ConnectException; @@ -109,7 +108,7 @@ public class StreamingWal2JsonMessageDecoder implements MessageDecoder { private long commitTime; @Override - public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry, ZoneOffset serverTimezone) throws SQLException, InterruptedException { + public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException { try { if (!buffer.hasArray()) { throw new IllegalStateException("Invalid buffer received from PG server during streaming replication"); @@ -129,7 +128,7 @@ public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor proces byte firstChar = getFirstNonWhiteChar(content); if (firstChar != LEFT_BRACE) { outOfOrderChunk(content); - nonInitialChunk(processor, typeRegistry, content, serverTimezone); + nonInitialChunk(processor, typeRegistry, content); } else { // We received the beginning of a transaction @@ -142,7 +141,7 @@ public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor proces if (message.has("kind")) { // This is not a preamble but out-of-order change chunk outOfOrderChunk(content); - nonInitialChunk(processor, typeRegistry, content, serverTimezone); + nonInitialChunk(processor, typeRegistry, content); } else { // Correct initial chunk @@ -155,7 +154,7 @@ public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor proces } } else { - nonInitialChunk(processor, typeRegistry, content, serverTimezone); + nonInitialChunk(processor, typeRegistry, content); } } catch (final IOException e) { @@ -164,7 +163,7 @@ public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor proces } protected void nonInitialChunk(ReplicationMessageProcessor processor, TypeRegistry typeRegistry, - final byte[] content, ZoneOffset serverTimezone) throws IOException, SQLException, InterruptedException { + final byte[] content) throws IOException, SQLException, InterruptedException { byte firstChar = getFirstNonWhiteChar(content); // We are receiving changes in chunks if (firstChar == LEFT_BRACE) { @@ -174,7 +173,7 @@ protected void nonInitialChunk(ReplicationMessageProcessor processor, TypeRegist else if (firstChar == COMMA) { // following changes, they have an extra comma at the start of message if (currentChunk != null) { - doProcessMessage(processor, typeRegistry, currentChunk, serverTimezone, false); + doProcessMessage(processor, typeRegistry, currentChunk, false); } replaceFirstNonWhiteChar(content, SPACE); currentChunk = content; @@ -182,7 +181,7 @@ else if (firstChar == COMMA) { else if (firstChar == RIGHT_BRACKET) { // No more changes if (currentChunk != null) { - doProcessMessage(processor, typeRegistry, currentChunk, serverTimezone, true); + doProcessMessage(processor, typeRegistry, currentChunk, true); } messageInProgress = false; } @@ -235,12 +234,12 @@ private boolean isWhitespace(byte c) { return (c >= TAB && c <= CR) || c == SPACE; } - private void doProcessMessage(ReplicationMessageProcessor processor, TypeRegistry typeRegistry, byte[] content, ZoneOffset serverTimezone, boolean lastMessage) + private void doProcessMessage(ReplicationMessageProcessor processor, TypeRegistry typeRegistry, byte[] content, boolean lastMessage) throws IOException, SQLException, InterruptedException { final Document change = DocumentReader.floatNumbersAsTextReader().read(content); LOGGER.trace("Change arrived for decoding {}", change); - processor.process(new Wal2JsonReplicationMessage(txId, commitTime, change, containsMetadata, lastMessage, typeRegistry, serverTimezone)); + processor.process(new Wal2JsonReplicationMessage(txId, commitTime, change, containsMetadata, lastMessage, typeRegistry)); } @Override diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/Wal2JsonReplicationMessage.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/Wal2JsonReplicationMessage.java index 5dd55491b..92956a53c 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/Wal2JsonReplicationMessage.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/Wal2JsonReplicationMessage.java @@ -8,7 +8,6 @@ import java.math.BigDecimal; import java.sql.SQLException; -import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.ArrayList; @@ -59,16 +58,14 @@ class Wal2JsonReplicationMessage implements ReplicationMessage { private final boolean hasMetadata; private final boolean lastEventForLsn; private final TypeRegistry typeRegistry; - private final ZoneOffset serverTimezone; - public Wal2JsonReplicationMessage(long txId, long commitTime, Document rawMessage, boolean hasMetadata, boolean lastEventForLsn, TypeRegistry typeRegistry, ZoneOffset serverTimezone) { + public Wal2JsonReplicationMessage(long txId, long commitTime, Document rawMessage, boolean hasMetadata, boolean lastEventForLsn, TypeRegistry typeRegistry) { this.txId = txId; this.commitTime = commitTime; this.rawMessage = rawMessage; this.hasMetadata = hasMetadata; this.lastEventForLsn = lastEventForLsn; this.typeRegistry = typeRegistry; - this.serverTimezone = serverTimezone; } @Override @@ -260,8 +257,7 @@ else if (rawValue.isBigInteger()) { case "timestamp": case "timestamp without time zone": final LocalDateTime serverLocal = Conversions.fromNanosToLocalDateTimeUTC(DateTimeFormat.get().timestamp(rawValue.asString())); - final Instant utc = serverLocal.atOffset(serverTimezone).toInstant(); - return Conversions.toEpochNanos(utc); + return Conversions.toEpochNanos(serverLocal.toInstant(ZoneOffset.UTC)); case "time": case "time without time zone": diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java index b4e3382a3..c7de7794b 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java @@ -297,9 +297,9 @@ protected List schemaAndValuesForBinTypes() { } protected List schemaAndValuesForDateTimeTypes() { - long expectedTs = MicroTimestamp.toEpochMicros(LocalDateTime.parse("2016-11-05T00:51:30.123456"), null); // Need to compare it with value in UTC - long expectedTsMs = Timestamp.toEpochMillis(LocalDateTime.parse("2016-11-05T00:51:30.123456"), null); // Need to compare it with value in UTC - long expectedNegTs = MicroTimestamp.toEpochMicros(LocalDateTime.parse("1936-10-26T09:10:12.608"), null); // Need to compare it with value in UTC + long expectedTs = MicroTimestamp.toEpochMicros(LocalDateTime.parse("2016-11-04T13:51:30.123456"), null); + long expectedTsMs = Timestamp.toEpochMillis(LocalDateTime.parse("2016-11-04T13:51:30.123456"), null); + long expectedNegTs = MicroTimestamp.toEpochMicros(LocalDateTime.parse("1936-10-25T22:10:12.608"), null); String expectedTz = "2016-11-04T11:51:30Z"; //timestamp is stored with TZ, should be read back with UTC int expectedDate = Date.toEpochDay(LocalDate.parse("2016-11-04"), null); long expectedTi = LocalTime.parse("13:51:30").toNanoOfDay() / 1_000; @@ -322,9 +322,9 @@ protected List schemaAndValuesForDateTimeTypes() { } protected List schemaAndValuesForDateTimeTypesAdaptiveTimeMicroseconds() { - long expectedTs = MicroTimestamp.toEpochMicros(LocalDateTime.parse("2016-11-05T00:51:30.123456"), null); // Need to compare it with value in UTC - long expectedTsMs = Timestamp.toEpochMillis(LocalDateTime.parse("2016-11-05T00:51:30.123456"), null); // Need to compare it with value in UTC - long expectedNegTs = MicroTimestamp.toEpochMicros(LocalDateTime.parse("1936-10-26T09:10:12.608"), null); // Need to compare it with value in UTC + long expectedTs = MicroTimestamp.toEpochMicros(LocalDateTime.parse("2016-11-04T13:51:30.123456"), null); + long expectedTsMs = Timestamp.toEpochMillis(LocalDateTime.parse("2016-11-04T13:51:30.123456"), null); + long expectedNegTs = MicroTimestamp.toEpochMicros(LocalDateTime.parse("1936-10-25T22:10:12.608"), null); String expectedTz = "2016-11-04T11:51:30Z"; //timestamp is stored with TZ, should be read back with UTC int expectedDate = Date.toEpochDay(LocalDate.parse("2016-11-04"), null); long expectedTi = LocalTime.parse("13:51:30").toNanoOfDay() / 1_000; diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java index dd8105d92..9b5f55fdc 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java @@ -46,7 +46,6 @@ public void before() throws Exception { TestHelper.executeDDL("postgis_create_tables.ddl"); PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SERVER_ZONE_OFFSET, TestHelper.databaseTimeZone()) .build()); TopicSelector selector = TopicSelector.create(config); context = new PostgresTaskContext( @@ -182,7 +181,6 @@ public void shouldGenerateSnapshotsForDefaultDatatypesAdaptiveMicroseconds() thr PostgresConnectorConfig config = new PostgresConnectorConfig( TestHelper.defaultConfig() .with(PostgresConnectorConfig.TIME_PRECISION_MODE, TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS) - .with(PostgresConnectorConfig.SERVER_ZONE_OFFSET, TestHelper.databaseTimeZone()) .build()); TopicSelector selector = TopicSelector.create(config); diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java index acfad80c7..3b54b29c9 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java @@ -65,7 +65,6 @@ public void before() throws Exception { TestHelper.execute(statements); PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig() .with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true) - .with(PostgresConnectorConfig.SERVER_ZONE_OFFSET, TestHelper.databaseTimeZone()) .build()); setupRecordsProducer(config); } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java index 58fbfa995..3205a02fb 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java @@ -13,7 +13,6 @@ import java.nio.file.Paths; import java.sql.Connection; import java.sql.SQLException; -import java.time.ZoneOffset; import java.util.Set; import java.util.stream.Collectors; @@ -53,7 +52,6 @@ public static ReplicationConnection createForReplication(String slotName, boolea .withSlot(slotName) .withTypeRegistry(getTypeRegistry()) .dropSlotOnClose(dropOnClose) - .withServerTimezone(databaseTimeZone()) .build(); } @@ -178,8 +176,4 @@ protected static boolean shouldSSLConnectionFail() { protected static int waitTimeForRecords() { return Integer.parseInt(System.getProperty(TEST_PROPERTY_PREFIX + "records.waittime", "2")); } - - protected static ZoneOffset databaseTimeZone() { - return ZoneOffset.of(System.getProperty(TEST_PROPERTY_PREFIX + "database.timeoffset", "-11:00")); - } }