diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/StreamingWal2JsonMessageDecoder.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/StreamingWal2JsonMessageDecoder.java index b90eab991..a9471052c 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/StreamingWal2JsonMessageDecoder.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/StreamingWal2JsonMessageDecoder.java @@ -78,7 +78,16 @@ */ public class StreamingWal2JsonMessageDecoder implements MessageDecoder { - private static final Logger LOGGER = LoggerFactory.getLogger(StreamingWal2JsonMessageDecoder.class); + private static final Logger LOGGER = LoggerFactory.getLogger(StreamingWal2JsonMessageDecoder.class); + + private static final byte TAB = 9; + private static final byte CR = 13; + private static final byte SPACE = 32; + + private static final byte COMMA = 44; + private static final byte RIGHT_BRACKET = 93; + private static final byte LEFT_BRACE = 123; + private static final byte RIGHT_BRACE = 125; private final DateTimeFormat dateTime = DateTimeFormat.get(); private boolean containsMetadata = false; @@ -88,7 +97,7 @@ public class StreamingWal2JsonMessageDecoder implements MessageDecoder { * To identify if the last current chunk is the last one we can send the current one * for processing only after we read the next one or the end of message fragment. */ - private String currentChunk; + private byte[] currentChunk; private int txId; @@ -103,14 +112,22 @@ public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor proces throw new IllegalStateException("Invalid buffer received from PG server during streaming replication"); } final byte[] source = buffer.array(); - String content = new String(Arrays.copyOfRange(source, buffer.arrayOffset(), source.length)).trim(); - LOGGER.debug("Chunk arrived from database {}", content); + // Extend the array by two as we might need to append two chars and set them to space by default + final byte[] content = Arrays.copyOfRange(source, buffer.arrayOffset(), source.length + 2); + final int lastPos = content.length - 1; + content[lastPos - 1] = 32; + content[lastPos] = 32; + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Chunk arrived from database {}", new String(content)); + } if (!messageInProgress) { // We received the beginning of a transaction - if (!content.endsWith("}")) { + if (getLastNonWhiteChar(content) != RIGHT_BRACE) { // Chunks are enabled and we have an unfinished message, it is necessary to add a sequence of closing chars - content += "]}"; + content[lastPos - 1] = RIGHT_BRACKET; + content[lastPos] = RIGHT_BRACE; } final Document message = DocumentReader.defaultReader().read(content); txId = message.getInteger("xid"); @@ -120,17 +137,19 @@ public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor proces currentChunk = null; } else { + byte firstChar = getFirstNonWhiteChar(content); // We are receiving changes in chunks - if (content.startsWith("{")) { + if (firstChar == LEFT_BRACE) { // First change, this is a valid JSON currentChunk = content; } - else if (content.startsWith(",")) { + else if (firstChar == COMMA) { // following changes, they have an extra comma at the start of message doProcessMessage(processor, typeRegistry, currentChunk, false); - currentChunk = content.substring(1); + replaceFirstNonWhiteChar(content, SPACE); + currentChunk = content; } - else if (content.startsWith("]")) { + else if (firstChar == RIGHT_BRACKET) { // No more changes if (currentChunk != null) { doProcessMessage(processor, typeRegistry, currentChunk, true); @@ -147,7 +166,38 @@ else if (content.startsWith("]")) { } } - private void doProcessMessage(ReplicationMessageProcessor processor, TypeRegistry typeRegistry, String content, boolean lastMessage) + private byte getLastNonWhiteChar(byte[] array) throws IllegalArgumentException { + for (int i = array.length - 1; i >= 0; i--) { + if (!isWhitespace(array[i])) { + return array[i]; + } + } + throw new IllegalArgumentException("No non-white char"); + } + + private byte getFirstNonWhiteChar(byte[] array) throws IllegalArgumentException { + for (int i = 0; i < array.length; i++) { + if (!isWhitespace(array[i])) { + return array[i]; + } + } + throw new IllegalArgumentException("No non-white char"); + } + + private void replaceFirstNonWhiteChar(byte[] array, byte to) { + for (int i = 0; i < array.length; i++) { + if (!isWhitespace(array[i])) { + array[i] = to; + return; + } + } + } + + private boolean isWhitespace(byte c) { + return (c >= TAB && c <= CR) || c == SPACE; + } + + private void doProcessMessage(ReplicationMessageProcessor processor, TypeRegistry typeRegistry, byte[] content, boolean lastMessage) throws IOException, SQLException, InterruptedException { final Document change = DocumentReader.floatNumbersAsTextReader().read(content); LOGGER.debug("Change arrived for decoding {}", change);