DBZ-777 Fix pgoutput decoder

This commit is contained in:
Jiri Pechanec 2019-08-14 16:45:55 +02:00 committed by Gunnar Morling
parent de2345f87b
commit 5307a5822d
3 changed files with 56 additions and 51 deletions

View File

@ -18,6 +18,7 @@
import io.debezium.connector.postgresql.connection.PostgresConnection; import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationMessage; import io.debezium.connector.postgresql.connection.ReplicationMessage;
import io.debezium.data.Envelope.Operation; import io.debezium.data.Envelope.Operation;
import io.debezium.function.Predicates;
import io.debezium.pipeline.spi.ChangeRecordEmitter; import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.Column; import io.debezium.relational.Column;
@ -111,7 +112,7 @@ public DataCollectionSchema synchronizeTableSchema(DataCollectionSchema tableSch
final boolean metadataInMessage = message.hasTypeMetadata(); final boolean metadataInMessage = message.hasTypeMetadata();
final TableId tableId = (TableId) tableSchema.id(); final TableId tableId = (TableId) tableSchema.id();
final Table table = schema.tableFor(tableId); final Table table = schema.tableFor(tableId);
if (getOperation() == Operation.DELETE || message.shouldSchemaBeSynchronized()) { if (getOperation() == Operation.DELETE || !message.shouldSchemaBeSynchronized()) {
return tableSchema; return tableSchema;
} }
final List<ReplicationMessage.Column> columns = message.getNewTupleList(); final List<ReplicationMessage.Column> columns = message.getNewTupleList();
@ -137,10 +138,12 @@ private Object[] columnValues(List<ReplicationMessage.Column> columns, TableId t
// based on the schema columns, create the values on the same position as the columns // based on the schema columns, create the values on the same position as the columns
List<Column> schemaColumns = table.columns(); List<Column> schemaColumns = table.columns();
// based on the replication message without toasted columns for now
List<ReplicationMessage.Column> columnsWithoutToasted = columns.stream().filter(Predicates.not(ReplicationMessage.Column::isToastedColumn)).collect(Collectors.toList());
// JSON does not deliver a list of all columns for REPLICA IDENTITY DEFAULT // JSON does not deliver a list of all columns for REPLICA IDENTITY DEFAULT
Object[] values = new Object[columns.size() < schemaColumns.size() ? schemaColumns.size() : columns.size()]; Object[] values = new Object[columnsWithoutToasted.size() < schemaColumns.size() ? schemaColumns.size() : columnsWithoutToasted.size()];
for (ReplicationMessage.Column column: columns) { for (ReplicationMessage.Column column: columnsWithoutToasted) {
//DBZ-298 Quoted column names will be sent like that in messages, but stored unquoted in the column names //DBZ-298 Quoted column names will be sent like that in messages, but stored unquoted in the column names
final String columnName = Strings.unquoteIdentifierPart(column.getName()); final String columnName = Strings.unquoteIdentifierPart(column.getName());
final Column tableColumn = table.columnWithName(columnName); final Column tableColumn = table.columnWithName(columnName);

View File

@ -96,6 +96,7 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
if (message == null) { if (message == null) {
LOGGER.trace("Received empty message"); LOGGER.trace("Received empty message");
lastCompletelyProcessedLsn = lsn; lastCompletelyProcessedLsn = lsn;
dispatcher.dispatchHeartbeatEvent(offsetContext);
return; return;
} }
if (message.isLastEventForLsn()) { if (message.isLastEventForLsn()) {

View File

@ -336,19 +336,19 @@ private void decodeInsert(ByteBuffer buffer, TypeRegistry typeRegistry, Replicat
Optional<Table> resolvedTable = resolveRelation(relationId); Optional<Table> resolvedTable = resolveRelation(relationId);
if (!resolvedTable.isPresent()) { if (!resolvedTable.isPresent()) {
return; processor.process(null);
}
else {
Table table = resolvedTable.get();
List<Column> columns = resolveColumnsFromStreamTupleData(buffer, typeRegistry, table);
processor.process(new PgOutputReplicationMessage(
Operation.INSERT,
table.id().toDoubleQuotedString(),
commitTimestamp,
transactionId,
null,
columns));
} }
Table table = resolvedTable.get();
List<Column> columns = resolveColumnsFromStreamTupleData(buffer, typeRegistry, table);
processor.process(new PgOutputReplicationMessage(
Operation.INSERT,
table.id().toDoubleQuotedString(),
commitTimestamp,
transactionId,
null,
columns));
} }
/** /**
@ -365,35 +365,35 @@ private void decodeUpdate(ByteBuffer buffer, TypeRegistry typeRegistry, Replicat
Optional<Table> resolvedTable = resolveRelation(relationId); Optional<Table> resolvedTable = resolveRelation(relationId);
if (!resolvedTable.isPresent()) { if (!resolvedTable.isPresent()) {
return; processor.process(null);
} }
else {
Table table = resolvedTable.get();
Table table = resolvedTable.get(); // When reading the tuple-type, we could get 3 different values, 'O', 'K', or 'N'.
// 'O' (Optional) - States the following tuple-data is the key, only for replica identity index configs.
// 'K' (Optional) - States the following tuple-data is the old tuple, only for replica identity full configs.
//
// 'N' (Not-Optional) - States the following tuple-data is the new tuple.
// This is always present.
List<Column> oldColumns = null;
char tupleType = (char) buffer.get();
if ('O' == tupleType || 'K' == tupleType) {
oldColumns = resolveColumnsFromStreamTupleData(buffer, typeRegistry, table);
// Read the 'N' tuple type
// This is necessary so the stream position is accurate for resolving the column tuple data
tupleType = (char) buffer.get();
}
// When reading the tuple-type, we could get 3 different values, 'O', 'K', or 'N'. List<Column> columns = resolveColumnsFromStreamTupleData(buffer, typeRegistry, table);
// 'O' (Optional) - States the following tuple-data is the key, only for replica identity index configs. processor.process(new PgOutputReplicationMessage(
// 'K' (Optional) - States the following tuple-data is the old tuple, only for replica identity full configs. Operation.UPDATE,
// table.id().toDoubleQuotedString(),
// 'N' (Not-Optional) - States the following tuple-data is the new tuple. commitTimestamp,
// This is always present. transactionId,
List<Column> oldColumns = null; oldColumns,
char tupleType = (char) buffer.get(); columns));
if ('O' == tupleType || 'K' == tupleType) {
oldColumns = resolveColumnsFromStreamTupleData(buffer, typeRegistry, table);
// Read the 'N' tuple type
// This is necessary so the stream position is accurate for resolving the column tuple data
tupleType = (char) buffer.get();
} }
List<Column> columns = resolveColumnsFromStreamTupleData(buffer, typeRegistry, table);
processor.process(new PgOutputReplicationMessage(
Operation.UPDATE,
table.id().toDoubleQuotedString(),
commitTimestamp,
transactionId,
oldColumns,
columns));
} }
/** /**
@ -412,18 +412,19 @@ private void decodeDelete(ByteBuffer buffer, TypeRegistry typeRegistry, Replicat
Optional<Table> resolvedTable = resolveRelation(relationId); Optional<Table> resolvedTable = resolveRelation(relationId);
if (!resolvedTable.isPresent()) { if (!resolvedTable.isPresent()) {
return; processor.process(null);
}
else {
Table table = resolvedTable.get();
List<Column> columns = resolveColumnsFromStreamTupleData(buffer, typeRegistry, table);
processor.process(new PgOutputReplicationMessage(
Operation.DELETE,
table.id().toDoubleQuotedString(),
commitTimestamp,
transactionId,
columns,
null));
} }
Table table = resolvedTable.get();
List<Column> columns = resolveColumnsFromStreamTupleData(buffer, typeRegistry, table);
processor.process(new PgOutputReplicationMessage(
Operation.DELETE,
table.id().toDoubleQuotedString(),
commitTimestamp,
transactionId,
columns,
null));
} }
/** /**