DBZ-2588 Introducing dedicated functional interface for key/value generators;

Also keys are always KC Struct instead of generic j.l.Object.
This commit is contained in:
Gunnar Morling 2020-09-24 10:43:34 +02:00 committed by Jiri Pechanec
parent bb0b3fd158
commit c1b057f792
5 changed files with 46 additions and 26 deletions

View File

@ -225,7 +225,7 @@ public boolean assign(long tableNumber, TableId id) {
public int read(SourceInfo source, Object[] row, int rowNumber, int numberOfRows, BitSet includedColumns, Instant ts,
BlockingConsumer<SourceRecord> consumer)
throws InterruptedException {
Object key = tableSchema.keyFromColumnData(row);
Struct key = tableSchema.keyFromColumnData(row);
Struct value = tableSchema.valueFromColumnData(row);
if (value != null || key != null) {
Schema keySchema = tableSchema.keySchema();
@ -246,7 +246,7 @@ public int insert(SourceInfo source, Object[] row, int rowNumber, int numberOfRo
BlockingConsumer<SourceRecord> consumer)
throws InterruptedException {
validateColumnCount(tableSchema, row);
Object key = tableSchema.keyFromColumnData(row);
Struct key = tableSchema.keyFromColumnData(row);
Struct value = tableSchema.valueFromColumnData(row);
if (value != null || key != null) {
Schema keySchema = tableSchema.keySchema();
@ -269,7 +269,7 @@ public int update(SourceInfo source, Object[] before, Object[] after, int rowNum
throws InterruptedException {
int count = 0;
validateColumnCount(tableSchema, after);
Object newkey = tableSchema.keyFromColumnData(after);
Struct newkey = tableSchema.keyFromColumnData(after);
Struct valueAfter = tableSchema.valueFromColumnData(after);
if (valueAfter != null || newkey != null) {
Object oldKey = tableSchema.keyFromColumnData(before);
@ -325,7 +325,7 @@ public int delete(SourceInfo source, Object[] row, int rowNumber, int numberOfRo
throws InterruptedException {
int count = 0;
validateColumnCount(tableSchema, row);
Object key = tableSchema.keyFromColumnData(row);
Struct key = tableSchema.keyFromColumnData(row);
Struct value = tableSchema.valueFromColumnData(row);
if (value != null || key != null) {
Schema keySchema = tableSchema.keySchema();

View File

@ -62,7 +62,7 @@ public void emitChangeRecords(DataCollectionSchema schema, Receiver receiver) th
protected void emitCreateRecord(Receiver receiver, TableSchema tableSchema)
throws InterruptedException {
Object[] newColumnValues = getNewColumnValues();
Object newKey = tableSchema.keyFromColumnData(newColumnValues);
Struct newKey = tableSchema.keyFromColumnData(newColumnValues);
Struct newValue = tableSchema.valueFromColumnData(newColumnValues);
Struct envelope = tableSchema.getEnvelopeSchema().create(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
@ -78,7 +78,7 @@ protected void emitCreateRecord(Receiver receiver, TableSchema tableSchema)
protected void emitReadRecord(Receiver receiver, TableSchema tableSchema)
throws InterruptedException {
Object[] newColumnValues = getNewColumnValues();
Object newKey = tableSchema.keyFromColumnData(newColumnValues);
Struct newKey = tableSchema.keyFromColumnData(newColumnValues);
Struct newValue = tableSchema.valueFromColumnData(newColumnValues);
Struct envelope = tableSchema.getEnvelopeSchema().read(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
@ -91,8 +91,8 @@ protected void emitUpdateRecord(Receiver receiver, TableSchema tableSchema)
Object[] oldColumnValues = getOldColumnValues();
Object[] newColumnValues = getNewColumnValues();
Object oldKey = tableSchema.keyFromColumnData(oldColumnValues);
Object newKey = tableSchema.keyFromColumnData(newColumnValues);
Struct oldKey = tableSchema.keyFromColumnData(oldColumnValues);
Struct newKey = tableSchema.keyFromColumnData(newColumnValues);
Struct newValue = tableSchema.valueFromColumnData(newColumnValues);
Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues);
@ -126,7 +126,7 @@ protected void emitUpdateRecord(Receiver receiver, TableSchema tableSchema)
@Override
protected void emitDeleteRecord(Receiver receiver, TableSchema tableSchema) throws InterruptedException {
Object[] oldColumnValues = getOldColumnValues();
Object oldKey = tableSchema.keyFromColumnData(oldColumnValues);
Struct oldKey = tableSchema.keyFromColumnData(oldColumnValues);
Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues);
if (skipEmptyMessages() && (oldColumnValues == null || oldColumnValues.length == 0)) {
@ -141,6 +141,7 @@ protected void emitDeleteRecord(Receiver receiver, TableSchema tableSchema) thro
/**
* Returns the operation done by the represented change.
*/
@Override
protected abstract Operation getOperation();
/**

View File

@ -0,0 +1,22 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.relational;
import org.apache.kafka.connect.data.Struct;
/**
* A function that converts one change event row (from a snapshot select, or
* from before/after state of a log event) into the corresponding Kafka Connect
* key or value {@link Struct}.
*/
@FunctionalInterface
public interface StructGenerator {
/**
* Converts the given tuple into a corresponding change event key or value struct.
*/
Struct generateValue(Object[] values);
}

View File

@ -7,7 +7,6 @@
import java.util.Arrays;
import java.util.Objects;
import java.util.function.Function;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
@ -52,14 +51,14 @@
@Immutable
public class TableSchema implements DataCollectionSchema {
private static final Logger logger = LoggerFactory.getLogger(TableSchema.class);
private static final Logger LOGGER = LoggerFactory.getLogger(TableSchema.class);
private final TableId id;
private final Schema keySchema;
private final Envelope envelopeSchema;
private final Schema valueSchema;
private final Function<Object[], Object> keyGenerator;
private final Function<Object[], Struct> valueGenerator;
private final StructGenerator keyGenerator;
private final StructGenerator valueGenerator;
/**
* Create an instance with the specified {@link Schema}s for the keys and values, and the functions that generate the
@ -73,8 +72,7 @@ public class TableSchema implements DataCollectionSchema {
* @param valueGenerator the function that converts a row into a single value object for Kafka Connect; may not be null but
* may return nulls
*/
public TableSchema(TableId id, Schema keySchema, Function<Object[], Object> keyGenerator,
Envelope envelopeSchema, Schema valueSchema, Function<Object[], Struct> valueGenerator) {
public TableSchema(TableId id, Schema keySchema, StructGenerator keyGenerator, Envelope envelopeSchema, Schema valueSchema, StructGenerator valueGenerator) {
this.id = id;
this.keySchema = keySchema;
this.envelopeSchema = envelopeSchema;
@ -125,11 +123,11 @@ public Envelope getEnvelopeSchema() {
* @param columnData the column values for the table
* @return the key, or null if the {@code columnData}
*/
public Object keyFromColumnData(Object[] columnData) {
if (logger.isTraceEnabled()) {
logger.trace("columnData from current stack: {}", Arrays.toString(columnData));
public Struct keyFromColumnData(Object[] columnData) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("columnData from current stack: {}", Arrays.toString(columnData));
}
return columnData == null ? null : keyGenerator.apply(columnData);
return columnData == null ? null : keyGenerator.generateValue(columnData);
}
/**
@ -140,7 +138,7 @@ public Object keyFromColumnData(Object[] columnData) {
* @return the value, or null if the {@code columnData}
*/
public Struct valueFromColumnData(Object[] columnData) {
return columnData == null ? null : valueGenerator.apply(columnData);
return columnData == null ? null : valueGenerator.generateValue(columnData);
}
@Override

View File

@ -9,7 +9,6 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Field;
@ -135,8 +134,8 @@ public TableSchema create(String schemaPrefix, String envelopSchemaName, Table t
.build();
// Create the generators ...
Function<Object[], Object> keyGenerator = createKeyGenerator(keySchema, tableId, tableKey.keyColumns());
Function<Object[], Struct> valueGenerator = createValueGenerator(valSchema, tableId, table.columns(), filter, mappers);
StructGenerator keyGenerator = createKeyGenerator(keySchema, tableId, tableKey.keyColumns());
StructGenerator valueGenerator = createValueGenerator(valSchema, tableId, table.columns(), filter, mappers);
// And the table schema ...
return new TableSchema(tableId, keySchema, keyGenerator, envelope, valSchema, valueGenerator);
@ -172,7 +171,7 @@ else if (Strings.isNullOrEmpty(tableId.schema())) {
* @param columns the column definitions for the table that defines the row; may not be null
* @return the key-generating function, or null if there is no key schema
*/
protected Function<Object[], Object> createKeyGenerator(Schema schema, TableId columnSetName, List<Column> columns) {
protected StructGenerator createKeyGenerator(Schema schema, TableId columnSetName, List<Column> columns) {
if (schema != null) {
int[] recordIndexes = indexesForColumns(columns);
Field[] fields = fieldsForColumns(schema, columns);
@ -235,7 +234,7 @@ private void validateIncomingRowToInternalMetadata(int[] recordIndexes, Field[]
* @param mappers the mapping functions for columns; may be null if none of the columns are to be mapped to different values
* @return the value-generating function, or null if there is no value schema
*/
protected Function<Object[], Struct> createValueGenerator(Schema schema, TableId tableId, List<Column> columns,
protected StructGenerator createValueGenerator(Schema schema, TableId tableId, List<Column> columns,
ColumnNameFilter filter, ColumnMappers mappers) {
if (schema != null) {
List<Column> columnsThatShouldBeAdded = columns.stream()