From 9bea1e2638565a7a664aecfe1d718cb26bab4073 Mon Sep 17 00:00:00 2001 From: Igor Gabaydulin Date: Mon, 20 Jan 2020 14:43:44 +0300 Subject: [PATCH] DBZ-1727 Move null check to AbstractMessageDecoder --- .../connection/AbstractMessageDecoder.java | 16 ++++++++++++++++ .../pgoutput/PgOutputMessageDecoder.java | 2 +- .../pgproto/PgProtoMessageDecoder.java | 7 +------ .../NonStreamingWal2JsonMessageDecoder.java | 2 +- .../StreamingWal2JsonMessageDecoder.java | 2 +- 5 files changed, 20 insertions(+), 9 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/AbstractMessageDecoder.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/AbstractMessageDecoder.java index 210e92805..8a4bfab9c 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/AbstractMessageDecoder.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/AbstractMessageDecoder.java @@ -6,10 +6,14 @@ package io.debezium.connector.postgresql.connection; import java.nio.ByteBuffer; +import java.sql.SQLException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.debezium.connector.postgresql.TypeRegistry; +import io.debezium.connector.postgresql.connection.ReplicationStream.ReplicationMessageProcessor; + /** * Abstract implementation of {@link MessageDecoder} that all decoders should inherit from. * @@ -19,6 +23,18 @@ public abstract class AbstractMessageDecoder implements MessageDecoder { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractMessageDecoder.class); + @Override + public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException { + // if message is empty pass control right to ReplicationMessageProcessor to update WAL position info + if (buffer == null) { + processor.process(null); + } else { + processNotEmptyMessage(buffer, processor, typeRegistry); + } + } + + protected abstract void processNotEmptyMessage(ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException; + @Override public boolean shouldMessageBeSkipped(ByteBuffer buffer, Long lastReceivedLsn, Long startLsn, boolean skipFirstFlushRecord) { // the lsn we started from is inclusive, so we need to avoid sending back the same message twice diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java index cfa82c788..b6aa657dc 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java @@ -157,7 +157,7 @@ public boolean shouldMessageBeSkipped(ByteBuffer buffer, Long lastReceivedLsn, L } @Override - public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException { + public void processNotEmptyMessage(ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException { if (LOGGER.isTraceEnabled()) { if (!buffer.hasArray()) { throw new IllegalStateException("Invalid buffer received from PG server during streaming replication"); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgproto/PgProtoMessageDecoder.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgproto/PgProtoMessageDecoder.java index 2de8ee82b..c3069d291 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgproto/PgProtoMessageDecoder.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgproto/PgProtoMessageDecoder.java @@ -40,12 +40,7 @@ public class PgProtoMessageDecoder extends AbstractMessageDecoder { private boolean warnedOnUnkownOp = false; @Override - public void processMessage(final ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException { - if (buffer == null) { - processor.process(null); - return; - } - + public void processNotEmptyMessage(final ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException { try { if (!buffer.hasArray()) { throw new IllegalStateException( diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/NonStreamingWal2JsonMessageDecoder.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/NonStreamingWal2JsonMessageDecoder.java index 11c7a7aef..cbf08eb2f 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/NonStreamingWal2JsonMessageDecoder.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/NonStreamingWal2JsonMessageDecoder.java @@ -45,7 +45,7 @@ public class NonStreamingWal2JsonMessageDecoder extends AbstractMessageDecoder { private boolean containsMetadata = false; @Override - public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException { + public void processNotEmptyMessage(ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException { try { if (!buffer.hasArray()) { throw new IllegalStateException("Invalid buffer received from PG server during streaming replication"); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/StreamingWal2JsonMessageDecoder.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/StreamingWal2JsonMessageDecoder.java index 5b5556cf9..4f4c3b7cc 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/StreamingWal2JsonMessageDecoder.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/StreamingWal2JsonMessageDecoder.java @@ -108,7 +108,7 @@ public class StreamingWal2JsonMessageDecoder extends AbstractMessageDecoder { private Instant commitTime; @Override - public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException { + public void processNotEmptyMessage(ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException { try { if (!buffer.hasArray()) { throw new IllegalStateException("Invalid buffer received from PG server during streaming replication");