DBZ-777 Fix wal2json decoder

This commit is contained in:
Jiri Pechanec 2019-08-14 14:17:22 +02:00 committed by Gunnar Morling
parent 037d581844
commit de2345f87b
2 changed files with 5 additions and 2 deletions

View File

@ -111,7 +111,10 @@ public DataCollectionSchema synchronizeTableSchema(DataCollectionSchema tableSch
final boolean metadataInMessage = message.hasTypeMetadata();
final TableId tableId = (TableId) tableSchema.id();
final Table table = schema.tableFor(tableId);
final List<ReplicationMessage.Column> columns = getOperation() == Operation.DELETE ? message.getOldTupleList() : message.getNewTupleList();
if (getOperation() == Operation.DELETE || message.shouldSchemaBeSynchronized()) {
return tableSchema;
}
final List<ReplicationMessage.Column> columns = message.getNewTupleList();
// check if we need to refresh our local schema due to DB schema changes for this table
if (schemaChanged(columns, table, metadataInMessage)) {
// Refresh the schema so we get information about primary keys

View File

@ -51,8 +51,8 @@ public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor proces
}
final byte[] source = buffer.array();
final byte[] content = Arrays.copyOfRange(source, buffer.arrayOffset(), source.length);
LOGGER.trace("Message arrived for decoding {}", new String(content));
final Document message = DocumentReader.floatNumbersAsTextReader().read(content);
LOGGER.debug("Message arrived for decoding {}", message);
final long txId = message.getLong("xid");
final String timestamp = message.getString("timestamp");
final Instant commitTime = Conversions.toInstant(dateTime.systemTimestamp(timestamp));