DBZ-1727 Move null check to AbstractMessageDecoder

This commit is contained in:
Igor Gabaydulin 2020-01-20 14:43:44 +03:00 committed by Jiri Pechanec
parent fcc7a05579
commit 9bea1e2638
5 changed files with 20 additions and 9 deletions

View File

@ -6,10 +6,14 @@
package io.debezium.connector.postgresql.connection;
import java.nio.ByteBuffer;
import java.sql.SQLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.postgresql.TypeRegistry;
import io.debezium.connector.postgresql.connection.ReplicationStream.ReplicationMessageProcessor;
/**
* Abstract implementation of {@link MessageDecoder} that all decoders should inherit from.
*
@ -19,6 +23,18 @@ public abstract class AbstractMessageDecoder implements MessageDecoder {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractMessageDecoder.class);
@Override
public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException {
// if message is empty pass control right to ReplicationMessageProcessor to update WAL position info
if (buffer == null) {
processor.process(null);
} else {
processNotEmptyMessage(buffer, processor, typeRegistry);
}
}
protected abstract void processNotEmptyMessage(ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException;
@Override
public boolean shouldMessageBeSkipped(ByteBuffer buffer, Long lastReceivedLsn, Long startLsn, boolean skipFirstFlushRecord) {
// the lsn we started from is inclusive, so we need to avoid sending back the same message twice

View File

@ -157,7 +157,7 @@ public boolean shouldMessageBeSkipped(ByteBuffer buffer, Long lastReceivedLsn, L
}
@Override
public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException {
public void processNotEmptyMessage(ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException {
if (LOGGER.isTraceEnabled()) {
if (!buffer.hasArray()) {
throw new IllegalStateException("Invalid buffer received from PG server during streaming replication");

View File

@ -40,12 +40,7 @@ public class PgProtoMessageDecoder extends AbstractMessageDecoder {
private boolean warnedOnUnkownOp = false;
@Override
public void processMessage(final ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException {
if (buffer == null) {
processor.process(null);
return;
}
public void processNotEmptyMessage(final ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException {
try {
if (!buffer.hasArray()) {
throw new IllegalStateException(

View File

@ -45,7 +45,7 @@ public class NonStreamingWal2JsonMessageDecoder extends AbstractMessageDecoder {
private boolean containsMetadata = false;
@Override
public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException {
public void processNotEmptyMessage(ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException {
try {
if (!buffer.hasArray()) {
throw new IllegalStateException("Invalid buffer received from PG server during streaming replication");

View File

@ -108,7 +108,7 @@ public class StreamingWal2JsonMessageDecoder extends AbstractMessageDecoder {
private Instant commitTime;
@Override
public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException {
public void processNotEmptyMessage(ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException {
try {
if (!buffer.hasArray()) {
throw new IllegalStateException("Invalid buffer received from PG server during streaming replication");