Merge pull request #40 from rhauch/dbz-29b
DBZ-29 Refactored ColumnMappers
This commit is contained in:
commit
7ce096adaa
@ -41,12 +41,12 @@
|
||||
import io.debezium.annotation.NotThreadSafe;
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.relational.ColumnId;
|
||||
import io.debezium.relational.ColumnMappers;
|
||||
import io.debezium.relational.Selectors;
|
||||
import io.debezium.relational.TableId;
|
||||
import io.debezium.relational.Tables;
|
||||
import io.debezium.relational.ddl.DdlParser;
|
||||
import io.debezium.relational.history.DatabaseHistory;
|
||||
import io.debezium.relational.mapping.ColumnMappers;
|
||||
import io.debezium.util.Clock;
|
||||
import io.debezium.util.Collect;
|
||||
import io.debezium.util.Metronome;
|
||||
|
@ -33,7 +33,6 @@
|
||||
|
||||
import io.debezium.annotation.NotThreadSafe;
|
||||
import io.debezium.relational.ColumnId;
|
||||
import io.debezium.relational.ColumnMappers;
|
||||
import io.debezium.relational.Table;
|
||||
import io.debezium.relational.TableId;
|
||||
import io.debezium.relational.TableSchema;
|
||||
@ -41,6 +40,7 @@
|
||||
import io.debezium.relational.Tables;
|
||||
import io.debezium.relational.history.DatabaseHistory;
|
||||
import io.debezium.relational.history.HistoryRecord;
|
||||
import io.debezium.relational.mapping.ColumnMappers;
|
||||
import io.debezium.text.ParsingException;
|
||||
import io.debezium.util.Collect;
|
||||
|
||||
|
@ -153,7 +153,7 @@ public void shouldConsumeEventsWithMaskedAndBlacklistedColumns() throws SQLExcep
|
||||
start(MySqlConnector.class, config);
|
||||
|
||||
// Wait for records to become available ...
|
||||
//Testing.Print.enable();
|
||||
Testing.Print.enable();
|
||||
waitForAvailableRecords(15, TimeUnit.SECONDS);
|
||||
|
||||
// Now consume the records ...
|
||||
@ -166,10 +166,12 @@ public void shouldConsumeEventsWithMaskedAndBlacklistedColumns() throws SQLExcep
|
||||
fail("The 'order_number' field was found but should not exist");
|
||||
} catch ( DataException e ) {
|
||||
// expected
|
||||
printJson(record);
|
||||
}
|
||||
} else if ( record.topic().endsWith(".customers")) {
|
||||
Struct value = (Struct) record.value();
|
||||
assertThat(value.getString("email")).isEqualTo("************");
|
||||
printJson(record);
|
||||
}
|
||||
});
|
||||
stopConnector();
|
||||
|
@ -1,24 +0,0 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.relational;
|
||||
|
||||
/**
|
||||
* A factory for a function used to map values of a column.
|
||||
*
|
||||
* @author Randall Hauch
|
||||
*/
|
||||
@FunctionalInterface
|
||||
public interface ColumnMapper {
|
||||
|
||||
/**
|
||||
* Create for the given column a function that maps values.
|
||||
*
|
||||
* @param column the column description; never null
|
||||
* @return the function that converts the value; may be null
|
||||
*/
|
||||
ValueConverter create(Column column);
|
||||
|
||||
}
|
@ -41,6 +41,8 @@
|
||||
import io.debezium.data.IsoTime;
|
||||
import io.debezium.data.IsoTimestamp;
|
||||
import io.debezium.jdbc.JdbcConnection;
|
||||
import io.debezium.relational.mapping.ColumnMapper;
|
||||
import io.debezium.relational.mapping.ColumnMappers;
|
||||
|
||||
/**
|
||||
* Builder that constructs {@link TableSchema} instances for {@link Table} definitions.
|
||||
@ -84,7 +86,7 @@ public TableSchema create(ResultSet resultSet, String name) throws SQLException
|
||||
|
||||
// Create a schema that represents these columns ...
|
||||
SchemaBuilder schemaBuilder = SchemaBuilder.struct().name(name);
|
||||
columns.forEach(column -> addField(schemaBuilder, column));
|
||||
columns.forEach(column -> addField(schemaBuilder, column, null));
|
||||
Schema valueSchema = schemaBuilder.build();
|
||||
|
||||
// And a generator that can be used to create values from rows in the result set ...
|
||||
@ -134,12 +136,13 @@ public TableSchema create(Table table, Predicate<ColumnId> filter, ColumnMappers
|
||||
table.columns().forEach(column -> {
|
||||
if (table.isPrimaryKeyColumn(column.name())) {
|
||||
// The column is part of the primary key, so ALWAYS add it to the PK schema ...
|
||||
addField(keySchemaBuilder, column);
|
||||
addField(keySchemaBuilder, column, null);
|
||||
hasPrimaryKey.set(true);
|
||||
}
|
||||
if (filter == null || filter.test(new ColumnId(tableId, column.name()))) {
|
||||
// Add the column to the value schema only if the column has not been filtered ...
|
||||
addField(valSchemaBuilder, column);
|
||||
ColumnMapper mapper = mappers == null ? null : mappers.mapperFor(tableId, column);
|
||||
addField(valSchemaBuilder, column, mapper);
|
||||
}
|
||||
});
|
||||
Schema valSchema = valSchemaBuilder.build();
|
||||
@ -248,7 +251,7 @@ protected Field[] fieldsForColumns(Schema schema, List<Column> columns) {
|
||||
Field[] fields = new Field[columns.size()];
|
||||
AtomicInteger i = new AtomicInteger(0);
|
||||
columns.forEach(column -> {
|
||||
Field field = schema.field(column.name()); // may be null if the field is unused ...
|
||||
Field field = schema.field(column.name()); // may be null if the field is unused ...
|
||||
fields[i.getAndIncrement()] = field;
|
||||
});
|
||||
return fields;
|
||||
@ -277,7 +280,7 @@ protected ValueConverter[] convertersForColumns(Schema schema, TableId tableId,
|
||||
ValueConverter valueConverter = createValueConverterFor(column, field);
|
||||
assert valueConverter != null;
|
||||
if (mappers != null) {
|
||||
ValueConverter mappingConverter = mappers.mapperFor(tableId, column);
|
||||
ValueConverter mappingConverter = mappers.mappingConverterFor(tableId, column);
|
||||
if (mappingConverter != null) {
|
||||
converter = (value) -> mappingConverter.convert(valueConverter.convert(value));
|
||||
}
|
||||
@ -295,9 +298,11 @@ protected ValueConverter[] convertersForColumns(Schema schema, TableId tableId,
|
||||
*
|
||||
* @param builder the schema builder; never null
|
||||
* @param column the column definition
|
||||
* @param mapper the mapping function for the column; may be null if the columns is not to be mapped to different values
|
||||
*/
|
||||
protected void addField(SchemaBuilder builder, Column column) {
|
||||
addField(builder, column.name(), column.jdbcType(), column.typeName(), column.length(), column.scale(), column.isOptional());
|
||||
protected void addField(SchemaBuilder builder, Column column, ColumnMapper mapper) {
|
||||
addField(builder, column.name(), column.jdbcType(), column.typeName(), column.length(),
|
||||
column.scale(), column.isOptional(), mapper);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -307,16 +312,19 @@ protected void addField(SchemaBuilder builder, Column column) {
|
||||
* this method and delegate to this method before and/or after the custom logic. Similar behavior should be addressed
|
||||
* in a specialized {@link #createValueConverterFor(Column, Field)} as well.
|
||||
*
|
||||
* @param builder the schema builder; never null
|
||||
* @param parentBuilder the builder for the schema used to {@link SchemaBuilder#field(String, Schema) define} the new field;
|
||||
* never null
|
||||
* @param columnName the name of the column
|
||||
* @param jdbcType the column's {@link Types JDBC type}
|
||||
* @param typeName the column's DBMS-specific type name
|
||||
* @param columnLength the length of the column
|
||||
* @param columnScale the scale of the column values, or 0 if not a decimal value
|
||||
* @param optional {@code true} if the column is optional, or {@code false} if the column is known to always have a value
|
||||
* @param mapper the mapping function for the column; may be null if the columns is not to be mapped to different values
|
||||
*/
|
||||
protected void addField(SchemaBuilder builder, String columnName, int jdbcType, String typeName, int columnLength,
|
||||
int columnScale, boolean optional) {
|
||||
protected void addField(SchemaBuilder parentBuilder, String columnName, int jdbcType, String typeName, int columnLength,
|
||||
int columnScale, boolean optional, ColumnMapper mapper) {
|
||||
SchemaBuilder fieldBuilder = null;
|
||||
switch (jdbcType) {
|
||||
case Types.NULL:
|
||||
LOGGER.warn("Unexpected JDBC type: NULL");
|
||||
@ -325,106 +333,101 @@ protected void addField(SchemaBuilder builder, String columnName, int jdbcType,
|
||||
// Single- and multi-bit values ...
|
||||
case Types.BIT:
|
||||
if (columnLength > 1) {
|
||||
SchemaBuilder bitBuilder = Bits.builder();
|
||||
if (optional) bitBuilder.optional();
|
||||
builder.field(columnName, bitBuilder.build());
|
||||
fieldBuilder = Bits.builder();
|
||||
fieldBuilder.parameter("length", Integer.toString(columnLength));
|
||||
break;
|
||||
}
|
||||
// otherwise, it is just one bit so use a boolean ...
|
||||
case Types.BOOLEAN:
|
||||
builder.field(columnName, optional ? Schema.OPTIONAL_BOOLEAN_SCHEMA : Schema.BOOLEAN_SCHEMA);
|
||||
fieldBuilder = SchemaBuilder.bool();
|
||||
break;
|
||||
|
||||
// Binary values ...
|
||||
// Fixed-length binary values ...
|
||||
case Types.BLOB:
|
||||
case Types.BINARY:
|
||||
fieldBuilder = SchemaBuilder.bytes();
|
||||
break;
|
||||
|
||||
// Variable-length binary values ...
|
||||
case Types.VARBINARY:
|
||||
case Types.LONGVARBINARY:
|
||||
builder.field(columnName, optional ? Schema.OPTIONAL_BYTES_SCHEMA : Schema.BYTES_SCHEMA);
|
||||
fieldBuilder = SchemaBuilder.bytes();
|
||||
break;
|
||||
|
||||
// Numeric integers
|
||||
case Types.TINYINT:
|
||||
// values are an 8-bit unsigned integer value between 0 and 255
|
||||
builder.field(columnName, optional ? Schema.OPTIONAL_INT8_SCHEMA : Schema.INT8_SCHEMA);
|
||||
fieldBuilder = SchemaBuilder.int8();
|
||||
break;
|
||||
case Types.SMALLINT:
|
||||
// values are a 16-bit signed integer value between -32768 and 32767
|
||||
builder.field(columnName, optional ? Schema.OPTIONAL_INT16_SCHEMA : Schema.INT16_SCHEMA);
|
||||
fieldBuilder = SchemaBuilder.int16();
|
||||
break;
|
||||
case Types.INTEGER:
|
||||
// values are a 32-bit signed integer value between - 2147483648 and 2147483647
|
||||
builder.field(columnName, optional ? Schema.OPTIONAL_INT32_SCHEMA : Schema.INT32_SCHEMA);
|
||||
fieldBuilder = SchemaBuilder.int32();
|
||||
break;
|
||||
case Types.BIGINT:
|
||||
// values are a 64-bit signed integer value between -9223372036854775808 and 9223372036854775807
|
||||
builder.field(columnName, optional ? Schema.OPTIONAL_INT64_SCHEMA : Schema.INT64_SCHEMA);
|
||||
fieldBuilder = SchemaBuilder.int64();
|
||||
break;
|
||||
|
||||
// Numeric decimal numbers
|
||||
case Types.REAL:
|
||||
// values are single precision floating point number which supports 7 digits of mantissa.
|
||||
builder.field(columnName, optional ? Schema.OPTIONAL_FLOAT32_SCHEMA : Schema.FLOAT32_SCHEMA);
|
||||
fieldBuilder = SchemaBuilder.float32();
|
||||
break;
|
||||
case Types.FLOAT:
|
||||
case Types.DOUBLE:
|
||||
// values are double precision floating point number which supports 15 digits of mantissa.
|
||||
builder.field(columnName, optional ? Schema.OPTIONAL_FLOAT64_SCHEMA : Schema.OPTIONAL_FLOAT64_SCHEMA);
|
||||
fieldBuilder = SchemaBuilder.float64();
|
||||
break;
|
||||
case Types.NUMERIC:
|
||||
case Types.DECIMAL:
|
||||
// values are fixed-precision decimal values with exact precision.
|
||||
// Use Kafka Connect's arbitrary precision decimal type and use the column's specified scale ...
|
||||
SchemaBuilder decBuilder = Decimal.builder(columnScale);
|
||||
if (optional) decBuilder.optional();
|
||||
builder.field(columnName, decBuilder.build());
|
||||
fieldBuilder = Decimal.builder(columnScale);
|
||||
break;
|
||||
|
||||
// String values
|
||||
case Types.CHAR: // variable-length
|
||||
case Types.VARCHAR: // variable-length
|
||||
case Types.LONGVARCHAR: // variable-length
|
||||
case Types.CLOB: // variable-length
|
||||
case Types.NCHAR: // fixed-length
|
||||
case Types.NVARCHAR: // fixed-length
|
||||
case Types.LONGNVARCHAR: // fixed-length
|
||||
case Types.NCLOB: // fixed-length
|
||||
// Fixed-length string values
|
||||
case Types.CHAR:
|
||||
case Types.NCHAR:
|
||||
case Types.NVARCHAR:
|
||||
case Types.LONGNVARCHAR:
|
||||
case Types.NCLOB:
|
||||
fieldBuilder = SchemaBuilder.string();
|
||||
break;
|
||||
|
||||
// Variable-length string values
|
||||
case Types.VARCHAR:
|
||||
case Types.LONGVARCHAR:
|
||||
case Types.CLOB:
|
||||
case Types.DATALINK:
|
||||
case Types.SQLXML:
|
||||
builder.field(columnName, optional ? Schema.OPTIONAL_STRING_SCHEMA : Schema.STRING_SCHEMA);
|
||||
fieldBuilder = SchemaBuilder.string();
|
||||
break;
|
||||
|
||||
// Date and time values
|
||||
case Types.DATE:
|
||||
SchemaBuilder dateBuilder = Date.builder();
|
||||
if (optional) dateBuilder.optional();
|
||||
builder.field(columnName, dateBuilder.build());
|
||||
fieldBuilder = Date.builder();
|
||||
break;
|
||||
case Types.TIME:
|
||||
SchemaBuilder timeBuilder = Time.builder();
|
||||
if (optional) timeBuilder.optional();
|
||||
builder.field(columnName, timeBuilder.build());
|
||||
fieldBuilder = Time.builder();
|
||||
break;
|
||||
case Types.TIMESTAMP:
|
||||
SchemaBuilder timestampBuilder = Timestamp.builder();
|
||||
if (optional) timestampBuilder.optional();
|
||||
builder.field(columnName, timestampBuilder.build());
|
||||
fieldBuilder = Timestamp.builder();
|
||||
break;
|
||||
case Types.TIME_WITH_TIMEZONE:
|
||||
SchemaBuilder offsetTimeBuilder = IsoTime.builder();
|
||||
if (optional) offsetTimeBuilder.optional();
|
||||
builder.field(columnName, offsetTimeBuilder.build());
|
||||
fieldBuilder = IsoTime.builder();
|
||||
break;
|
||||
case Types.TIMESTAMP_WITH_TIMEZONE:
|
||||
SchemaBuilder tsWithTzBuilder = IsoTimestamp.builder();
|
||||
if (optional) tsWithTzBuilder.optional();
|
||||
builder.field(columnName, tsWithTzBuilder.build());
|
||||
fieldBuilder = IsoTimestamp.builder();
|
||||
break;
|
||||
|
||||
// Other types ...
|
||||
case Types.ROWID:
|
||||
// often treated as a string, but we'll generalize and treat it as a byte array
|
||||
builder.field(columnName, optional ? Schema.OPTIONAL_BYTES_SCHEMA : Schema.BYTES_SCHEMA);
|
||||
fieldBuilder = SchemaBuilder.bytes();
|
||||
break;
|
||||
|
||||
// Unhandled types
|
||||
@ -436,29 +439,39 @@ protected void addField(SchemaBuilder builder, String columnName, int jdbcType,
|
||||
case Types.REF_CURSOR:
|
||||
case Types.STRUCT:
|
||||
default:
|
||||
addOtherField(builder, columnName, jdbcType, typeName, columnLength, columnScale, optional);
|
||||
fieldBuilder = addOtherField(columnName, jdbcType, typeName, columnLength, columnScale, optional, mapper);
|
||||
break;
|
||||
}
|
||||
if (fieldBuilder != null) {
|
||||
if (mapper != null) {
|
||||
// Let the mapper add properties to the schema ...
|
||||
mapper.alterFieldSchema(fieldBuilder);
|
||||
}
|
||||
if (optional) fieldBuilder.optional();
|
||||
parentBuilder.field(columnName, fieldBuilder.build());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add to the supplied {@link SchemaBuilder} a field for the column with the given information.
|
||||
* Return a {@link SchemaBuilder} for a field for the column with the given information.
|
||||
* <p>
|
||||
* Subclasses that wish to override or extend the mappings of JDBC/DBMS types to Kafka Connect value types can override
|
||||
* this method and delegate to this method before and/or after the custom logic. Similar behavior should be addressed
|
||||
* in a specialized {@link #addField(SchemaBuilder, String, int, String, int, int, boolean)} as well.
|
||||
* in a specialized {@link #addField(SchemaBuilder, String, int, String, int, int, boolean, ColumnMapper)} as well.
|
||||
*
|
||||
* @param builder the schema builder; never null
|
||||
* @param columnName the name of the column
|
||||
* @param jdbcType the column's {@link Types JDBC type}
|
||||
* @param typeName the column's DBMS-specific type name
|
||||
* @param columnLength the length of the column
|
||||
* @param columnScale the scale of the column values, or 0 if not a decimal value
|
||||
* @param optional {@code true} if the column is optional, or {@code false} if the column is known to always have a value
|
||||
* @param mapper the mapping function for the column; may be null if the columns is not to be mapped to different values
|
||||
* @return the {@link SchemaBuilder} for the new field, ready to be {@link SchemaBuilder#build() build}; may be null
|
||||
*/
|
||||
protected void addOtherField(SchemaBuilder builder, String columnName, int jdbcType, String typeName, int columnLength,
|
||||
int columnScale, boolean optional) {
|
||||
protected SchemaBuilder addOtherField(String columnName, int jdbcType, String typeName, int columnLength,
|
||||
int columnScale, boolean optional, ColumnMapper mapper) {
|
||||
LOGGER.warn("Unexpected JDBC type: {}", jdbcType);
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -856,8 +869,13 @@ protected Object convertDate(Field fieldDefn, Object data) {
|
||||
java.util.Date date = null;
|
||||
if (data instanceof java.sql.Date) {
|
||||
// JDBC specification indicates that this will be the nominal object for this JDBC type.
|
||||
// Contains only date info, with all time values set to all zeros (e.g. midnight)
|
||||
date = (java.sql.Date) data;
|
||||
// Contains only date info, with all time values set to all zeros (e.g. midnight).
|
||||
// However, the java.sql.Date object *may* contain timezone information for some DBMS+Driver combinations.
|
||||
// Therefore, first convert it to a local LocalDate, then to a LocalDateTime at midnight, and then to an
|
||||
// instant in UTC ...
|
||||
java.sql.Date sqlDate = (java.sql.Date) data;
|
||||
LocalDate localDate = sqlDate.toLocalDate();
|
||||
date = java.util.Date.from(localDate.atStartOfDay().toInstant(ZoneOffset.UTC));
|
||||
} else if (data instanceof java.util.Date) {
|
||||
// Possible that some implementations might use this. We should be prepared to ignore any time,
|
||||
// information by truncating to days and creating a new java.util.Date ...
|
||||
|
@ -0,0 +1,46 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.relational.mapping;
|
||||
|
||||
import org.apache.kafka.connect.data.Field;
|
||||
import org.apache.kafka.connect.data.SchemaBuilder;
|
||||
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.relational.Column;
|
||||
import io.debezium.relational.ValueConverter;
|
||||
|
||||
/**
|
||||
* A factory for a function used to map values of a column.
|
||||
*
|
||||
* @author Randall Hauch
|
||||
*/
|
||||
@FunctionalInterface
|
||||
public interface ColumnMapper {
|
||||
|
||||
/**
|
||||
* Initialize the ColumnMapper instance based upon the connector's configuration.
|
||||
* @param config the connector's configuration
|
||||
*/
|
||||
default void initialize( Configuration config ) {
|
||||
// do nothing
|
||||
}
|
||||
|
||||
/**
|
||||
* Create for the given column a function that maps values.
|
||||
*
|
||||
* @param column the column description; never null
|
||||
* @return the function that converts the value; may be null
|
||||
*/
|
||||
ValueConverter create(Column column);
|
||||
|
||||
/**
|
||||
* Optionally annotate the schema with properties to better capture the mapping behavior.
|
||||
* @param schemaBuilder the builder for the {@link Field}'s schema; never null
|
||||
*/
|
||||
default void alterFieldSchema( SchemaBuilder schemaBuilder) {
|
||||
// do nothing
|
||||
}
|
||||
}
|
@ -3,9 +3,8 @@
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.relational;
|
||||
package io.debezium.relational.mapping;
|
||||
|
||||
import java.sql.Types;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
@ -14,7 +13,14 @@
|
||||
import org.apache.kafka.connect.errors.ConnectException;
|
||||
|
||||
import io.debezium.annotation.Immutable;
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.function.Predicates;
|
||||
import io.debezium.relational.Column;
|
||||
import io.debezium.relational.ColumnId;
|
||||
import io.debezium.relational.Selectors;
|
||||
import io.debezium.relational.Table;
|
||||
import io.debezium.relational.TableId;
|
||||
import io.debezium.relational.ValueConverter;
|
||||
import io.debezium.util.Strings;
|
||||
|
||||
/**
|
||||
@ -68,7 +74,21 @@ public Builder map(String fullyQualifiedColumnNames, ColumnMapper mapper) {
|
||||
* @return this object so that methods can be chained together; never null
|
||||
*/
|
||||
public Builder map(String fullyQualifiedColumnNames, Class<ColumnMapper> mapperClass) {
|
||||
return map(fullyQualifiedColumnNames, instantiateMapper(mapperClass));
|
||||
return map(fullyQualifiedColumnNames,mapperClass,null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set a mapping function for the columns with fully-qualified names that match the given comma-separated list of regular
|
||||
* expression patterns.
|
||||
*
|
||||
* @param fullyQualifiedColumnNames the comma-separated list of fully-qualified column names; may not be null
|
||||
* @param mapperClass the Java class that implements {@code BiFunction<Column, Object, Object>} and that
|
||||
* will be used to map actual values into values used in the output record; may not be null
|
||||
* @param config the configuration to pass to the {@link ColumnMapper} instance; may be null
|
||||
* @return this object so that methods can be chained together; never null
|
||||
*/
|
||||
public Builder map(String fullyQualifiedColumnNames, Class<ColumnMapper> mapperClass, Configuration config) {
|
||||
return map(fullyQualifiedColumnNames, instantiateMapper(mapperClass, config));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -80,15 +100,7 @@ public Builder map(String fullyQualifiedColumnNames, Class<ColumnMapper> mapperC
|
||||
* @return this object so that methods can be chained together; never null
|
||||
*/
|
||||
public Builder truncateStrings(String fullyQualifiedColumnNames, int maxLength) {
|
||||
return map(fullyQualifiedColumnNames, (column) -> {
|
||||
return (value) -> {
|
||||
if (value instanceof String) {
|
||||
String str = (String) value;
|
||||
if (str.length() > maxLength) return str.substring(0, maxLength);
|
||||
}
|
||||
return value;
|
||||
};
|
||||
});
|
||||
return map(fullyQualifiedColumnNames, new TruncateStrings(maxLength));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -125,22 +137,7 @@ public Builder maskStrings(String fullyQualifiedColumnNames, int numberOfChars,
|
||||
* @return this object so that methods can be chained together; never null
|
||||
*/
|
||||
public Builder maskStrings(String fullyQualifiedColumnNames, String maskValue) {
|
||||
return map(fullyQualifiedColumnNames, (column) -> {
|
||||
switch (column.jdbcType()) {
|
||||
case Types.CHAR: // variable-length
|
||||
case Types.VARCHAR: // variable-length
|
||||
case Types.LONGVARCHAR: // variable-length
|
||||
case Types.CLOB: // variable-length
|
||||
case Types.NCHAR: // fixed-length
|
||||
case Types.NVARCHAR: // fixed-length
|
||||
case Types.LONGNVARCHAR: // fixed-length
|
||||
case Types.NCLOB: // fixed-length
|
||||
case Types.DATALINK:
|
||||
return (input) -> maskValue;
|
||||
default:
|
||||
return (input) -> input;
|
||||
}
|
||||
});
|
||||
return map(fullyQualifiedColumnNames, new MaskStrings(maskValue));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -153,8 +150,23 @@ public Builder maskStrings(String fullyQualifiedColumnNames, String maskValue) {
|
||||
* an existing mapping function should be removed
|
||||
* @return this object so that methods can be chained together; never null
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public Builder map(String fullyQualifiedColumnNames, String mapperClassName) {
|
||||
return map(fullyQualifiedColumnNames,mapperClassName,null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set a mapping function for the columns with fully-qualified names that match the given comma-separated list of regular
|
||||
* expression patterns.
|
||||
*
|
||||
* @param fullyQualifiedColumnNames the comma-separated list of fully-qualified column names; may not be null
|
||||
* @param mapperClassName the name of the Java class that implements {@code BiFunction<Column, Object, Object>} and that
|
||||
* will be used to map actual values into values used in the output record; null if
|
||||
* an existing mapping function should be removed
|
||||
* @param config the configuration to pass to the {@link ColumnMapper} instance; may be null
|
||||
* @return this object so that methods can be chained together; never null
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public Builder map(String fullyQualifiedColumnNames, String mapperClassName, Configuration config) {
|
||||
Class<ColumnMapper> mapperClass = null;
|
||||
if (mapperClassName != null) {
|
||||
try {
|
||||
@ -167,7 +179,7 @@ public Builder map(String fullyQualifiedColumnNames, String mapperClassName) {
|
||||
e);
|
||||
}
|
||||
}
|
||||
return map(fullyQualifiedColumnNames, mapperClass);
|
||||
return map(fullyQualifiedColumnNames, mapperClass, config);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -194,8 +206,8 @@ private ColumnMappers(List<MapperRule> rules) {
|
||||
* @param column the column; may not be null
|
||||
* @return the mapping function, or null if there is no mapping function
|
||||
*/
|
||||
public ValueConverter mapperFor(Table table, Column column) {
|
||||
return mapperFor(table.id(),column);
|
||||
public ValueConverter mappingConverterFor(Table table, Column column) {
|
||||
return mappingConverterFor(table.id(), column);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -205,11 +217,23 @@ public ValueConverter mapperFor(Table table, Column column) {
|
||||
* @param column the column; may not be null
|
||||
* @return the mapping function, or null if there is no mapping function
|
||||
*/
|
||||
public ValueConverter mapperFor(TableId tableId, Column column) {
|
||||
public ValueConverter mappingConverterFor(TableId tableId, Column column) {
|
||||
ColumnMapper mapper = mapperFor(tableId,column);
|
||||
return mapper != null ? mapper.create(column) : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the value mapping function for the given column.
|
||||
*
|
||||
* @param tableId the identifier of the table to which the column belongs; may not be null
|
||||
* @param column the column; may not be null
|
||||
* @return the mapping function, or null if there is no mapping function
|
||||
*/
|
||||
public ColumnMapper mapperFor(TableId tableId, Column column) {
|
||||
ColumnId id = new ColumnId(tableId, column.name());
|
||||
Optional<MapperRule> matchingRule = rules.stream().filter(rule -> rule.matches(id)).findFirst();
|
||||
if (matchingRule.isPresent()) {
|
||||
return matchingRule.get().mapper.create(column);
|
||||
return matchingRule.get().mapper;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
@ -229,13 +253,19 @@ protected boolean matches(ColumnId id) {
|
||||
}
|
||||
}
|
||||
|
||||
protected static <T> T instantiateMapper(Class<T> clazz) {
|
||||
protected static ColumnMapper instantiateMapper(Class<ColumnMapper> clazz, Configuration config) {
|
||||
try {
|
||||
return clazz.newInstance();
|
||||
ColumnMapper mapper = clazz.newInstance();
|
||||
if ( config != null ) {
|
||||
mapper.initialize(config);
|
||||
}
|
||||
return mapper;
|
||||
} catch (InstantiationException e) {
|
||||
throw new ConnectException("Unable to instantiate column mapper class " + clazz.getName() + ": " + e.getMessage(), e);
|
||||
} catch (IllegalAccessException e) {
|
||||
throw new ConnectException("Unable to access column mapper class " + clazz.getName() + ": " + e.getMessage(), e);
|
||||
} catch (Throwable e) {
|
||||
throw new ConnectException("Unable to initialize the column mapper class " + clazz.getName() + ": " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,73 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.relational.mapping;
|
||||
|
||||
import java.sql.Types;
|
||||
|
||||
import org.apache.kafka.connect.data.SchemaBuilder;
|
||||
|
||||
import io.debezium.annotation.Immutable;
|
||||
import io.debezium.relational.Column;
|
||||
import io.debezium.relational.ValueConverter;
|
||||
|
||||
/**
|
||||
* A {@link ColumnMapper} implementation that ensures that string values are masked by a predefined value.
|
||||
*
|
||||
* @author Randall Hauch
|
||||
*/
|
||||
public class MaskStrings implements ColumnMapper {
|
||||
|
||||
private final MaskingValueConverter converter;
|
||||
|
||||
/**
|
||||
* Create a {@link ColumnMapper} that masks string values with a predefined value.
|
||||
*
|
||||
* @param maskValue the value that should be used in place of the actual value; may not be null
|
||||
* @throws IllegalArgumentException if the {@code maxLength} is not positive
|
||||
*/
|
||||
public MaskStrings(String maskValue) {
|
||||
if (maskValue == null) throw new IllegalArgumentException("Mask value may not be null");
|
||||
this.converter = new MaskingValueConverter(maskValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ValueConverter create(Column column) {
|
||||
switch (column.jdbcType()) {
|
||||
case Types.CHAR: // variable-length
|
||||
case Types.VARCHAR: // variable-length
|
||||
case Types.LONGVARCHAR: // variable-length
|
||||
case Types.CLOB: // variable-length
|
||||
case Types.NCHAR: // fixed-length
|
||||
case Types.NVARCHAR: // fixed-length
|
||||
case Types.LONGNVARCHAR: // fixed-length
|
||||
case Types.NCLOB: // fixed-length
|
||||
case Types.DATALINK:
|
||||
return converter;
|
||||
default:
|
||||
return ValueConverter.passthrough();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void alterFieldSchema(SchemaBuilder schemaBuilder) {
|
||||
schemaBuilder.parameter("masked", "true");
|
||||
}
|
||||
|
||||
@Immutable
|
||||
protected static final class MaskingValueConverter implements ValueConverter {
|
||||
protected final String maskValue;
|
||||
|
||||
public MaskingValueConverter(String maskValue) {
|
||||
this.maskValue = maskValue;
|
||||
assert this.maskValue != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object convert(Object value) {
|
||||
return maskValue;
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,77 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.relational.mapping;
|
||||
|
||||
import org.apache.kafka.connect.data.SchemaBuilder;
|
||||
|
||||
import io.debezium.annotation.Immutable;
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.relational.Column;
|
||||
import io.debezium.relational.ValueConverter;
|
||||
|
||||
/**
|
||||
* A {@link ColumnMapper} implementation that ensures that string values longer than a specified length will be truncated.
|
||||
*
|
||||
* @author Randall Hauch
|
||||
*/
|
||||
public class TruncateStrings implements ColumnMapper {
|
||||
|
||||
private final TruncatingValueConverter converter;
|
||||
|
||||
/**
|
||||
* Create a {@link ColumnMapper} that truncates string values to a maximum length.
|
||||
*
|
||||
* @param maxLength the maximum number of characters allowed in values
|
||||
* @throws IllegalArgumentException if the {@code maxLength} is not positive
|
||||
*/
|
||||
public TruncateStrings(int maxLength) {
|
||||
if (maxLength <= 0) throw new IllegalArgumentException("Maximum length must be positive");
|
||||
this.converter = new TruncatingValueConverter(maxLength);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ValueConverter create(Column column) {
|
||||
return converter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void alterFieldSchema(SchemaBuilder schemaBuilder) {
|
||||
Configuration params = Configuration.from(schemaBuilder.parameters());
|
||||
Integer length = params.getInteger("length");
|
||||
if ( length != null && converter.maxLength < length ) {
|
||||
// Overwrite the parameter ...
|
||||
schemaBuilder.parameter("length",Integer.toString(converter.maxLength));
|
||||
}
|
||||
Integer maxLength = params.getInteger("maxLength");
|
||||
if ( maxLength != null && converter.maxLength < maxLength ) {
|
||||
// Overwrite the parameter ...
|
||||
schemaBuilder.parameter("maxLength",Integer.toString(converter.maxLength));
|
||||
}
|
||||
if ( maxLength == null && length == null ) {
|
||||
schemaBuilder.parameter("maxLength",Integer.toString(converter.maxLength));
|
||||
}
|
||||
}
|
||||
|
||||
@Immutable
|
||||
protected static final class TruncatingValueConverter implements ValueConverter {
|
||||
protected final int maxLength;
|
||||
|
||||
public TruncatingValueConverter(int maxLength) {
|
||||
this.maxLength = maxLength;
|
||||
assert this.maxLength > 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object convert(Object value) {
|
||||
if (value instanceof String) {
|
||||
String str = (String) value;
|
||||
if (str.length() > maxLength) return str.substring(0, maxLength);
|
||||
}
|
||||
return value;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -3,7 +3,7 @@
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.relational;
|
||||
package io.debezium.relational.mapping;
|
||||
|
||||
import java.sql.Types;
|
||||
|
||||
@ -12,6 +12,9 @@
|
||||
|
||||
import static org.fest.assertions.Assertions.assertThat;
|
||||
|
||||
import io.debezium.relational.Column;
|
||||
import io.debezium.relational.TableId;
|
||||
import io.debezium.relational.ValueConverter;
|
||||
import io.debezium.util.Strings;
|
||||
|
||||
/**
|
||||
@ -39,14 +42,14 @@ public void beforeEach() {
|
||||
@Test
|
||||
public void shouldNotFindMapperForUnmatchedColumn() {
|
||||
mappers = ColumnMappers.create().truncateStrings(fullyQualifiedNames, 10).build();
|
||||
converter = mappers.mapperFor(tableId, column2);
|
||||
converter = mappers.mappingConverterFor(tableId, column2);
|
||||
assertThat(converter).isNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldTruncateStrings() {
|
||||
mappers = ColumnMappers.create().truncateStrings(fullyQualifiedNames.toUpperCase(), 10).build(); // inexact case
|
||||
converter = mappers.mapperFor(tableId, column);
|
||||
converter = mappers.mappingConverterFor(tableId, column);
|
||||
assertThat(converter).isNotNull();
|
||||
assertThat(converter.convert("12345678901234567890").toString()).isEqualTo("1234567890");
|
||||
assertThat(converter.convert("12345678901234567890").toString().length()).isEqualTo(10);
|
||||
@ -63,7 +66,7 @@ public void shouldTruncateStrings() {
|
||||
public void shouldMaskStringsToFixedLength() {
|
||||
String maskValue = "**********";
|
||||
mappers = ColumnMappers.create().maskStrings(fullyQualifiedNames, maskValue.length()).build(); // exact case
|
||||
converter = mappers.mapperFor(tableId, column);
|
||||
converter = mappers.mappingConverterFor(tableId, column);
|
||||
assertThat(converter).isNotNull();
|
||||
assertThat(converter.convert("12345678901234567890")).isEqualTo(maskValue);
|
||||
assertThat(converter.convert("12345678901")).isEqualTo(maskValue);
|
||||
@ -77,7 +80,7 @@ public void shouldMaskStringsToFixedNumberOfSpecifiedCharacters() {
|
||||
char maskChar = '=';
|
||||
String maskValue = Strings.createString(maskChar, 10);
|
||||
mappers = ColumnMappers.create().maskStrings(fullyQualifiedNames, maskValue.length(), maskChar).build();
|
||||
converter = mappers.mapperFor(tableId, column);
|
||||
converter = mappers.mappingConverterFor(tableId, column);
|
||||
assertThat(converter).isNotNull();
|
||||
assertThat(converter.convert("12345678901234567890")).isEqualTo(maskValue);
|
||||
assertThat(converter.convert("12345678901")).isEqualTo(maskValue);
|
||||
@ -90,7 +93,7 @@ public void shouldMaskStringsToFixedNumberOfSpecifiedCharacters() {
|
||||
public void shouldMaskStringsWithSpecificValue() {
|
||||
String maskValue = "*-*-*-*-*";
|
||||
mappers = ColumnMappers.create().maskStrings(fullyQualifiedNames, maskValue).build(); // exact case
|
||||
converter = mappers.mapperFor(tableId, column);
|
||||
converter = mappers.mappingConverterFor(tableId, column);
|
||||
assertThat(converter).isNotNull();
|
||||
assertThat(converter.convert("12345678901234567890")).isEqualTo(maskValue);
|
||||
assertThat(converter.convert("12345678901")).isEqualTo(maskValue);
|
||||
@ -103,7 +106,7 @@ public void shouldMaskStringsWithSpecificValue() {
|
||||
public void shouldMapValuesUsingColumnMapperInstance() {
|
||||
RepeatingColumnMapper mapper = new RepeatingColumnMapper();
|
||||
mappers = ColumnMappers.create().map(fullyQualifiedNames, mapper).build();
|
||||
converter = mappers.mapperFor(tableId, column);
|
||||
converter = mappers.mappingConverterFor(tableId, column);
|
||||
assertThat(converter).isNotNull();
|
||||
assertThat(converter.convert("1234")).isEqualTo("12341234");
|
||||
assertThat(converter.convert("a")).isEqualTo("aa");
|
||||
@ -113,7 +116,7 @@ public void shouldMapValuesUsingColumnMapperInstance() {
|
||||
@Test
|
||||
public void shouldMapValuesUsingFunctionByClassName() {
|
||||
mappers = ColumnMappers.create().map(fullyQualifiedNames, RepeatingColumnMapper.class.getName()).build();
|
||||
converter = mappers.mapperFor(tableId, column);
|
||||
converter = mappers.mappingConverterFor(tableId, column);
|
||||
assertThat(converter).isNotNull();
|
||||
assertThat(converter.convert("1234")).isEqualTo("12341234");
|
||||
assertThat(converter.convert("a")).isEqualTo("aa");
|
@ -0,0 +1,40 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.relational.mapping;
|
||||
|
||||
import java.sql.Types;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.fest.assertions.Assertions.assertThat;
|
||||
|
||||
import io.debezium.relational.Column;
|
||||
import io.debezium.relational.ValueConverter;
|
||||
|
||||
/**
|
||||
* @author Randall Hauch
|
||||
*
|
||||
*/
|
||||
public class MaskStringsTest {
|
||||
|
||||
private final Column column = Column.editor().name("col").jdbcType(Types.VARCHAR).create();
|
||||
private ValueConverter converter;
|
||||
|
||||
@Test
|
||||
public void shouldTruncateStrings() {
|
||||
String maskValue = "*****";
|
||||
converter = new MaskStrings(maskValue).create(column);
|
||||
assertThat(converter.convert("1234567890").toString()).isEqualTo(maskValue);
|
||||
assertThat(converter.convert("123456").toString()).isEqualTo(maskValue);
|
||||
assertThat(converter.convert("12345").toString()).isEqualTo(maskValue);
|
||||
assertThat(converter.convert("1234").toString()).isEqualTo(maskValue);
|
||||
assertThat(converter.convert("123").toString()).isEqualTo(maskValue);
|
||||
assertThat(converter.convert("12").toString()).isEqualTo(maskValue);
|
||||
assertThat(converter.convert("1").toString()).isEqualTo(maskValue);
|
||||
assertThat(converter.convert(null).toString()).isEqualTo(maskValue);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,37 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.relational.mapping;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.fest.assertions.Assertions.assertThat;
|
||||
|
||||
import io.debezium.relational.Column;
|
||||
import io.debezium.relational.ValueConverter;
|
||||
|
||||
/**
|
||||
* @author Randall Hauch
|
||||
*
|
||||
*/
|
||||
public class TruncateStringsTest {
|
||||
|
||||
private final Column column = Column.editor().name("col").create();
|
||||
private ValueConverter converter;
|
||||
|
||||
@Test
|
||||
public void shouldTruncateStrings() {
|
||||
converter = new TruncateStrings(5).create(column);
|
||||
assertThat(converter.convert("1234567890").toString()).isEqualTo("12345");
|
||||
assertThat(converter.convert("123456").toString()).isEqualTo("12345");
|
||||
assertThat(converter.convert("12345").toString()).isEqualTo("12345");
|
||||
assertThat(converter.convert("1234").toString()).isEqualTo("1234");
|
||||
assertThat(converter.convert("123").toString()).isEqualTo("123");
|
||||
assertThat(converter.convert("12").toString()).isEqualTo("12");
|
||||
assertThat(converter.convert("1").toString()).isEqualTo("1");
|
||||
assertThat(converter.convert(null)).isNull();
|
||||
}
|
||||
|
||||
}
|
@ -5,6 +5,8 @@
|
||||
*/
|
||||
package io.debezium.embedded;
|
||||
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.nio.file.Path;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
@ -20,6 +22,8 @@
|
||||
import org.apache.kafka.connect.data.Field;
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
import org.apache.kafka.connect.json.JsonConverter;
|
||||
import org.apache.kafka.connect.json.JsonDeserializer;
|
||||
import org.apache.kafka.connect.source.SourceConnector;
|
||||
import org.apache.kafka.connect.source.SourceRecord;
|
||||
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
|
||||
@ -28,6 +32,11 @@
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
|
||||
import static org.fest.assertions.Assertions.assertThat;
|
||||
|
||||
import io.debezium.config.Configuration;
|
||||
@ -56,9 +65,24 @@ public abstract class AbstractConnectorTest implements Testing {
|
||||
protected long pollTimeoutInMs = TimeUnit.SECONDS.toMillis(5);
|
||||
protected final Logger logger = LoggerFactory.getLogger(getClass());
|
||||
private CountDownLatch latch;
|
||||
private JsonConverter keyJsonConverter = new JsonConverter();
|
||||
private JsonConverter valueJsonConverter = new JsonConverter();
|
||||
private JsonDeserializer keyJsonDeserializer = new JsonDeserializer();
|
||||
private JsonDeserializer valueJsonDeserializer = new JsonDeserializer();
|
||||
|
||||
@Before
|
||||
public final void initializeConnectorTestFramework() {
|
||||
keyJsonConverter = new JsonConverter();
|
||||
valueJsonConverter = new JsonConverter();
|
||||
keyJsonDeserializer = new JsonDeserializer();
|
||||
valueJsonDeserializer = new JsonDeserializer();
|
||||
Configuration converterConfig = Configuration.create().build();
|
||||
Configuration deserializerConfig = Configuration.create().build();
|
||||
keyJsonConverter.configure(converterConfig.asMap(), true);
|
||||
valueJsonConverter.configure(converterConfig.asMap(), false);
|
||||
keyJsonDeserializer.configure(deserializerConfig.asMap(), true);
|
||||
valueJsonDeserializer.configure(deserializerConfig.asMap(), false);
|
||||
|
||||
resetBeforeEachTest();
|
||||
consumedLines = new ArrayBlockingQueue<>(getMaximumEnqueuedRecordCount());
|
||||
Testing.Files.delete(OFFSET_STORE_PATH);
|
||||
@ -308,6 +332,51 @@ protected void print(SourceRecord record) {
|
||||
Testing.print(sb.toString());
|
||||
}
|
||||
|
||||
protected void printJson(SourceRecord record) {
|
||||
JsonNode keyJson = null;
|
||||
JsonNode valueJson = null;
|
||||
try {
|
||||
// First serialize and deserialize the key ...
|
||||
byte[] keyBytes = keyJsonConverter.fromConnectData(record.topic(), record.keySchema(), record.key());
|
||||
keyJson = keyJsonDeserializer.deserialize(record.topic(), keyBytes);
|
||||
// then the value ...
|
||||
byte[] valueBytes = valueJsonConverter.fromConnectData(record.topic(), record.valueSchema(), record.value());
|
||||
valueJson = valueJsonDeserializer.deserialize(record.topic(), valueBytes);
|
||||
// And finally get ready to print it ...
|
||||
JsonNodeFactory nodeFactory = new JsonNodeFactory(false);
|
||||
ObjectNode message = nodeFactory.objectNode();
|
||||
message.set("key", keyJson);
|
||||
message.set("value", valueJson);
|
||||
Testing.print("Message on topic '" + record.topic() + "':");
|
||||
Testing.print(prettyJson(message));
|
||||
} catch (Throwable t) {
|
||||
Testing.printError(t);
|
||||
Testing.print("Problem with message on topic '" + record.topic() + "':");
|
||||
if ( keyJson != null ) {
|
||||
Testing.print("valid key = " + prettyJson(keyJson));
|
||||
} else {
|
||||
Testing.print("invalid key");
|
||||
}
|
||||
if ( valueJson != null ) {
|
||||
Testing.print("valid value = " + prettyJson(valueJson));
|
||||
} else {
|
||||
Testing.print("invalid value");
|
||||
}
|
||||
fail(t.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
protected String prettyJson(JsonNode json) {
|
||||
try {
|
||||
return new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(json);
|
||||
} catch (Throwable t) {
|
||||
Testing.printError(t);
|
||||
fail(t.getMessage());
|
||||
assert false : "Will not get here";
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
protected void append(Object obj, StringBuilder sb) {
|
||||
if (obj == null) {
|
||||
sb.append("null");
|
||||
|
Loading…
Reference in New Issue
Block a user