DBZ-638 Chunk processing operates on byte array

This commit is contained in:
Jiri Pechanec 2018-03-23 13:57:37 +01:00 committed by Gunnar Morling
parent 4051a4a439
commit 363921a7b1

View File

@ -78,7 +78,16 @@
*/
public class StreamingWal2JsonMessageDecoder implements MessageDecoder {
private static final Logger LOGGER = LoggerFactory.getLogger(StreamingWal2JsonMessageDecoder.class);
private static final Logger LOGGER = LoggerFactory.getLogger(StreamingWal2JsonMessageDecoder.class);
private static final byte TAB = 9;
private static final byte CR = 13;
private static final byte SPACE = 32;
private static final byte COMMA = 44;
private static final byte RIGHT_BRACKET = 93;
private static final byte LEFT_BRACE = 123;
private static final byte RIGHT_BRACE = 125;
private final DateTimeFormat dateTime = DateTimeFormat.get();
private boolean containsMetadata = false;
@ -88,7 +97,7 @@ public class StreamingWal2JsonMessageDecoder implements MessageDecoder {
* To identify if the last current chunk is the last one we can send the current one
* for processing only after we read the next one or the end of message fragment.
*/
private String currentChunk;
private byte[] currentChunk;
private int txId;
@ -103,14 +112,22 @@ public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor proces
throw new IllegalStateException("Invalid buffer received from PG server during streaming replication");
}
final byte[] source = buffer.array();
String content = new String(Arrays.copyOfRange(source, buffer.arrayOffset(), source.length)).trim();
LOGGER.debug("Chunk arrived from database {}", content);
// Extend the array by two as we might need to append two chars and set them to space by default
final byte[] content = Arrays.copyOfRange(source, buffer.arrayOffset(), source.length + 2);
final int lastPos = content.length - 1;
content[lastPos - 1] = 32;
content[lastPos] = 32;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Chunk arrived from database {}", new String(content));
}
if (!messageInProgress) {
// We received the beginning of a transaction
if (!content.endsWith("}")) {
if (getLastNonWhiteChar(content) != RIGHT_BRACE) {
// Chunks are enabled and we have an unfinished message, it is necessary to add a sequence of closing chars
content += "]}";
content[lastPos - 1] = RIGHT_BRACKET;
content[lastPos] = RIGHT_BRACE;
}
final Document message = DocumentReader.defaultReader().read(content);
txId = message.getInteger("xid");
@ -120,17 +137,19 @@ public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor proces
currentChunk = null;
}
else {
byte firstChar = getFirstNonWhiteChar(content);
// We are receiving changes in chunks
if (content.startsWith("{")) {
if (firstChar == LEFT_BRACE) {
// First change, this is a valid JSON
currentChunk = content;
}
else if (content.startsWith(",")) {
else if (firstChar == COMMA) {
// following changes, they have an extra comma at the start of message
doProcessMessage(processor, typeRegistry, currentChunk, false);
currentChunk = content.substring(1);
replaceFirstNonWhiteChar(content, SPACE);
currentChunk = content;
}
else if (content.startsWith("]")) {
else if (firstChar == RIGHT_BRACKET) {
// No more changes
if (currentChunk != null) {
doProcessMessage(processor, typeRegistry, currentChunk, true);
@ -147,7 +166,38 @@ else if (content.startsWith("]")) {
}
}
private void doProcessMessage(ReplicationMessageProcessor processor, TypeRegistry typeRegistry, String content, boolean lastMessage)
private byte getLastNonWhiteChar(byte[] array) throws IllegalArgumentException {
for (int i = array.length - 1; i >= 0; i--) {
if (!isWhitespace(array[i])) {
return array[i];
}
}
throw new IllegalArgumentException("No non-white char");
}
private byte getFirstNonWhiteChar(byte[] array) throws IllegalArgumentException {
for (int i = 0; i < array.length; i++) {
if (!isWhitespace(array[i])) {
return array[i];
}
}
throw new IllegalArgumentException("No non-white char");
}
private void replaceFirstNonWhiteChar(byte[] array, byte to) {
for (int i = 0; i < array.length; i++) {
if (!isWhitespace(array[i])) {
array[i] = to;
return;
}
}
}
private boolean isWhitespace(byte c) {
return (c >= TAB && c <= CR) || c == SPACE;
}
private void doProcessMessage(ReplicationMessageProcessor processor, TypeRegistry typeRegistry, byte[] content, boolean lastMessage)
throws IOException, SQLException, InterruptedException {
final Document change = DocumentReader.floatNumbersAsTextReader().read(content);
LOGGER.debug("Change arrived for decoding {}", change);