From c1b057f79276c2b460951860339225fba7ad2b22 Mon Sep 17 00:00:00 2001 From: Gunnar Morling Date: Thu, 24 Sep 2020 10:43:34 +0200 Subject: [PATCH] DBZ-2588 Introducing dedicated functional interface for key/value generators; Also keys are always KC Struct instead of generic j.l.Object. --- .../connector/mysql/RecordMakers.java | 8 +++---- .../RelationalChangeRecordEmitter.java | 11 +++++----- .../debezium/relational/StructGenerator.java | 22 +++++++++++++++++++ .../io/debezium/relational/TableSchema.java | 20 ++++++++--------- .../relational/TableSchemaBuilder.java | 11 +++++----- 5 files changed, 46 insertions(+), 26 deletions(-) create mode 100644 debezium-core/src/main/java/io/debezium/relational/StructGenerator.java diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java index b227ff17c..0d7e4ba20 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java @@ -225,7 +225,7 @@ public boolean assign(long tableNumber, TableId id) { public int read(SourceInfo source, Object[] row, int rowNumber, int numberOfRows, BitSet includedColumns, Instant ts, BlockingConsumer consumer) throws InterruptedException { - Object key = tableSchema.keyFromColumnData(row); + Struct key = tableSchema.keyFromColumnData(row); Struct value = tableSchema.valueFromColumnData(row); if (value != null || key != null) { Schema keySchema = tableSchema.keySchema(); @@ -246,7 +246,7 @@ public int insert(SourceInfo source, Object[] row, int rowNumber, int numberOfRo BlockingConsumer consumer) throws InterruptedException { validateColumnCount(tableSchema, row); - Object key = tableSchema.keyFromColumnData(row); + Struct key = tableSchema.keyFromColumnData(row); Struct value = tableSchema.valueFromColumnData(row); if (value != null || key != null) { Schema keySchema = tableSchema.keySchema(); @@ -269,7 +269,7 @@ public int update(SourceInfo source, Object[] before, Object[] after, int rowNum throws InterruptedException { int count = 0; validateColumnCount(tableSchema, after); - Object newkey = tableSchema.keyFromColumnData(after); + Struct newkey = tableSchema.keyFromColumnData(after); Struct valueAfter = tableSchema.valueFromColumnData(after); if (valueAfter != null || newkey != null) { Object oldKey = tableSchema.keyFromColumnData(before); @@ -325,7 +325,7 @@ public int delete(SourceInfo source, Object[] row, int rowNumber, int numberOfRo throws InterruptedException { int count = 0; validateColumnCount(tableSchema, row); - Object key = tableSchema.keyFromColumnData(row); + Struct key = tableSchema.keyFromColumnData(row); Struct value = tableSchema.valueFromColumnData(row); if (value != null || key != null) { Schema keySchema = tableSchema.keySchema(); diff --git a/debezium-core/src/main/java/io/debezium/relational/RelationalChangeRecordEmitter.java b/debezium-core/src/main/java/io/debezium/relational/RelationalChangeRecordEmitter.java index 18f329f11..11eb1b790 100644 --- a/debezium-core/src/main/java/io/debezium/relational/RelationalChangeRecordEmitter.java +++ b/debezium-core/src/main/java/io/debezium/relational/RelationalChangeRecordEmitter.java @@ -62,7 +62,7 @@ public void emitChangeRecords(DataCollectionSchema schema, Receiver receiver) th protected void emitCreateRecord(Receiver receiver, TableSchema tableSchema) throws InterruptedException { Object[] newColumnValues = getNewColumnValues(); - Object newKey = tableSchema.keyFromColumnData(newColumnValues); + Struct newKey = tableSchema.keyFromColumnData(newColumnValues); Struct newValue = tableSchema.valueFromColumnData(newColumnValues); Struct envelope = tableSchema.getEnvelopeSchema().create(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant()); @@ -78,7 +78,7 @@ protected void emitCreateRecord(Receiver receiver, TableSchema tableSchema) protected void emitReadRecord(Receiver receiver, TableSchema tableSchema) throws InterruptedException { Object[] newColumnValues = getNewColumnValues(); - Object newKey = tableSchema.keyFromColumnData(newColumnValues); + Struct newKey = tableSchema.keyFromColumnData(newColumnValues); Struct newValue = tableSchema.valueFromColumnData(newColumnValues); Struct envelope = tableSchema.getEnvelopeSchema().read(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant()); @@ -91,8 +91,8 @@ protected void emitUpdateRecord(Receiver receiver, TableSchema tableSchema) Object[] oldColumnValues = getOldColumnValues(); Object[] newColumnValues = getNewColumnValues(); - Object oldKey = tableSchema.keyFromColumnData(oldColumnValues); - Object newKey = tableSchema.keyFromColumnData(newColumnValues); + Struct oldKey = tableSchema.keyFromColumnData(oldColumnValues); + Struct newKey = tableSchema.keyFromColumnData(newColumnValues); Struct newValue = tableSchema.valueFromColumnData(newColumnValues); Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues); @@ -126,7 +126,7 @@ protected void emitUpdateRecord(Receiver receiver, TableSchema tableSchema) @Override protected void emitDeleteRecord(Receiver receiver, TableSchema tableSchema) throws InterruptedException { Object[] oldColumnValues = getOldColumnValues(); - Object oldKey = tableSchema.keyFromColumnData(oldColumnValues); + Struct oldKey = tableSchema.keyFromColumnData(oldColumnValues); Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues); if (skipEmptyMessages() && (oldColumnValues == null || oldColumnValues.length == 0)) { @@ -141,6 +141,7 @@ protected void emitDeleteRecord(Receiver receiver, TableSchema tableSchema) thro /** * Returns the operation done by the represented change. */ + @Override protected abstract Operation getOperation(); /** diff --git a/debezium-core/src/main/java/io/debezium/relational/StructGenerator.java b/debezium-core/src/main/java/io/debezium/relational/StructGenerator.java new file mode 100644 index 000000000..cb2a33f80 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/relational/StructGenerator.java @@ -0,0 +1,22 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.relational; + +import org.apache.kafka.connect.data.Struct; + +/** + * A function that converts one change event row (from a snapshot select, or + * from before/after state of a log event) into the corresponding Kafka Connect + * key or value {@link Struct}. + */ +@FunctionalInterface +public interface StructGenerator { + + /** + * Converts the given tuple into a corresponding change event key or value struct. + */ + Struct generateValue(Object[] values); +} diff --git a/debezium-core/src/main/java/io/debezium/relational/TableSchema.java b/debezium-core/src/main/java/io/debezium/relational/TableSchema.java index bcbd015bd..be6cedb07 100644 --- a/debezium-core/src/main/java/io/debezium/relational/TableSchema.java +++ b/debezium-core/src/main/java/io/debezium/relational/TableSchema.java @@ -7,7 +7,6 @@ import java.util.Arrays; import java.util.Objects; -import java.util.function.Function; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; @@ -52,14 +51,14 @@ @Immutable public class TableSchema implements DataCollectionSchema { - private static final Logger logger = LoggerFactory.getLogger(TableSchema.class); + private static final Logger LOGGER = LoggerFactory.getLogger(TableSchema.class); private final TableId id; private final Schema keySchema; private final Envelope envelopeSchema; private final Schema valueSchema; - private final Function keyGenerator; - private final Function valueGenerator; + private final StructGenerator keyGenerator; + private final StructGenerator valueGenerator; /** * Create an instance with the specified {@link Schema}s for the keys and values, and the functions that generate the @@ -73,8 +72,7 @@ public class TableSchema implements DataCollectionSchema { * @param valueGenerator the function that converts a row into a single value object for Kafka Connect; may not be null but * may return nulls */ - public TableSchema(TableId id, Schema keySchema, Function keyGenerator, - Envelope envelopeSchema, Schema valueSchema, Function valueGenerator) { + public TableSchema(TableId id, Schema keySchema, StructGenerator keyGenerator, Envelope envelopeSchema, Schema valueSchema, StructGenerator valueGenerator) { this.id = id; this.keySchema = keySchema; this.envelopeSchema = envelopeSchema; @@ -125,11 +123,11 @@ public Envelope getEnvelopeSchema() { * @param columnData the column values for the table * @return the key, or null if the {@code columnData} */ - public Object keyFromColumnData(Object[] columnData) { - if (logger.isTraceEnabled()) { - logger.trace("columnData from current stack: {}", Arrays.toString(columnData)); + public Struct keyFromColumnData(Object[] columnData) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("columnData from current stack: {}", Arrays.toString(columnData)); } - return columnData == null ? null : keyGenerator.apply(columnData); + return columnData == null ? null : keyGenerator.generateValue(columnData); } /** @@ -140,7 +138,7 @@ public Object keyFromColumnData(Object[] columnData) { * @return the value, or null if the {@code columnData} */ public Struct valueFromColumnData(Object[] columnData) { - return columnData == null ? null : valueGenerator.apply(columnData); + return columnData == null ? null : valueGenerator.generateValue(columnData); } @Override diff --git a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java index 886676641..58d76ff63 100644 --- a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java +++ b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java @@ -9,7 +9,6 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Function; import java.util.stream.Collectors; import org.apache.kafka.connect.data.Field; @@ -135,8 +134,8 @@ public TableSchema create(String schemaPrefix, String envelopSchemaName, Table t .build(); // Create the generators ... - Function keyGenerator = createKeyGenerator(keySchema, tableId, tableKey.keyColumns()); - Function valueGenerator = createValueGenerator(valSchema, tableId, table.columns(), filter, mappers); + StructGenerator keyGenerator = createKeyGenerator(keySchema, tableId, tableKey.keyColumns()); + StructGenerator valueGenerator = createValueGenerator(valSchema, tableId, table.columns(), filter, mappers); // And the table schema ... return new TableSchema(tableId, keySchema, keyGenerator, envelope, valSchema, valueGenerator); @@ -172,7 +171,7 @@ else if (Strings.isNullOrEmpty(tableId.schema())) { * @param columns the column definitions for the table that defines the row; may not be null * @return the key-generating function, or null if there is no key schema */ - protected Function createKeyGenerator(Schema schema, TableId columnSetName, List columns) { + protected StructGenerator createKeyGenerator(Schema schema, TableId columnSetName, List columns) { if (schema != null) { int[] recordIndexes = indexesForColumns(columns); Field[] fields = fieldsForColumns(schema, columns); @@ -235,8 +234,8 @@ private void validateIncomingRowToInternalMetadata(int[] recordIndexes, Field[] * @param mappers the mapping functions for columns; may be null if none of the columns are to be mapped to different values * @return the value-generating function, or null if there is no value schema */ - protected Function createValueGenerator(Schema schema, TableId tableId, List columns, - ColumnNameFilter filter, ColumnMappers mappers) { + protected StructGenerator createValueGenerator(Schema schema, TableId tableId, List columns, + ColumnNameFilter filter, ColumnMappers mappers) { if (schema != null) { List columnsThatShouldBeAdded = columns.stream() .filter(column -> filter == null || filter.matches(tableId.catalog(), tableId.schema(), tableId.table(), column.name()))