From b1e6eb1028cb0023acf10b3c9fe28cf6dbf6ad82 Mon Sep 17 00:00:00 2001 From: Randall Hauch Date: Thu, 12 May 2016 12:26:04 -0500 Subject: [PATCH] DBZ-29 Refactored ColumnMappers and enabled ColumnMapper impls to add parameters to the Kafka Connect Schema. --- .../connector/mysql/MySqlConnectorTask.java | 2 +- .../connector/mysql/TableConverters.java | 2 +- .../connector/mysql/MySqlConnectorIT.java | 4 +- .../io/debezium/relational/ColumnMapper.java | 24 ---- .../relational/TableSchemaBuilder.java | 136 ++++++++++-------- .../relational/mapping/ColumnMapper.java | 46 ++++++ .../{ => mapping}/ColumnMappers.java | 102 ++++++++----- .../relational/mapping/MaskStrings.java | 73 ++++++++++ .../relational/mapping/TruncateStrings.java | 77 ++++++++++ .../{ => mapping}/ColumnMappersTest.java | 19 +-- .../relational/mapping/MaskStringsTest.java | 40 ++++++ .../mapping/TruncateStringsTest.java | 37 +++++ .../embedded/AbstractConnectorTest.java | 69 +++++++++ 13 files changed, 501 insertions(+), 130 deletions(-) delete mode 100644 debezium-core/src/main/java/io/debezium/relational/ColumnMapper.java create mode 100644 debezium-core/src/main/java/io/debezium/relational/mapping/ColumnMapper.java rename debezium-core/src/main/java/io/debezium/relational/{ => mapping}/ColumnMappers.java (71%) create mode 100644 debezium-core/src/main/java/io/debezium/relational/mapping/MaskStrings.java create mode 100644 debezium-core/src/main/java/io/debezium/relational/mapping/TruncateStrings.java rename debezium-core/src/test/java/io/debezium/relational/{ => mapping}/ColumnMappersTest.java (89%) create mode 100644 debezium-core/src/test/java/io/debezium/relational/mapping/MaskStringsTest.java create mode 100644 debezium-core/src/test/java/io/debezium/relational/mapping/TruncateStringsTest.java diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java index e3a7c7194..470a9904b 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java @@ -41,12 +41,12 @@ import io.debezium.annotation.NotThreadSafe; import io.debezium.config.Configuration; import io.debezium.relational.ColumnId; -import io.debezium.relational.ColumnMappers; import io.debezium.relational.Selectors; import io.debezium.relational.TableId; import io.debezium.relational.Tables; import io.debezium.relational.ddl.DdlParser; import io.debezium.relational.history.DatabaseHistory; +import io.debezium.relational.mapping.ColumnMappers; import io.debezium.util.Clock; import io.debezium.util.Collect; import io.debezium.util.Metronome; diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TableConverters.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TableConverters.java index c7da82315..cd0853f7f 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TableConverters.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TableConverters.java @@ -33,7 +33,6 @@ import io.debezium.annotation.NotThreadSafe; import io.debezium.relational.ColumnId; -import io.debezium.relational.ColumnMappers; import io.debezium.relational.Table; import io.debezium.relational.TableId; import io.debezium.relational.TableSchema; @@ -41,6 +40,7 @@ import io.debezium.relational.Tables; import io.debezium.relational.history.DatabaseHistory; import io.debezium.relational.history.HistoryRecord; +import io.debezium.relational.mapping.ColumnMappers; import io.debezium.text.ParsingException; import io.debezium.util.Collect; diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java index fd38af065..bc4ed2b91 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java @@ -153,7 +153,7 @@ public void shouldConsumeEventsWithMaskedAndBlacklistedColumns() throws SQLExcep start(MySqlConnector.class, config); // Wait for records to become available ... - //Testing.Print.enable(); + Testing.Print.enable(); waitForAvailableRecords(15, TimeUnit.SECONDS); // Now consume the records ... @@ -166,10 +166,12 @@ public void shouldConsumeEventsWithMaskedAndBlacklistedColumns() throws SQLExcep fail("The 'order_number' field was found but should not exist"); } catch ( DataException e ) { // expected + printJson(record); } } else if ( record.topic().endsWith(".customers")) { Struct value = (Struct) record.value(); assertThat(value.getString("email")).isEqualTo("************"); + printJson(record); } }); stopConnector(); diff --git a/debezium-core/src/main/java/io/debezium/relational/ColumnMapper.java b/debezium-core/src/main/java/io/debezium/relational/ColumnMapper.java deleted file mode 100644 index d75e168a9..000000000 --- a/debezium-core/src/main/java/io/debezium/relational/ColumnMapper.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.relational; - -/** - * A factory for a function used to map values of a column. - * - * @author Randall Hauch - */ -@FunctionalInterface -public interface ColumnMapper { - - /** - * Create for the given column a function that maps values. - * - * @param column the column description; never null - * @return the function that converts the value; may be null - */ - ValueConverter create(Column column); - -} diff --git a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java index 158bfbf50..6fd21961b 100644 --- a/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java +++ b/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java @@ -41,6 +41,8 @@ import io.debezium.data.IsoTime; import io.debezium.data.IsoTimestamp; import io.debezium.jdbc.JdbcConnection; +import io.debezium.relational.mapping.ColumnMapper; +import io.debezium.relational.mapping.ColumnMappers; /** * Builder that constructs {@link TableSchema} instances for {@link Table} definitions. @@ -84,7 +86,7 @@ public TableSchema create(ResultSet resultSet, String name) throws SQLException // Create a schema that represents these columns ... SchemaBuilder schemaBuilder = SchemaBuilder.struct().name(name); - columns.forEach(column -> addField(schemaBuilder, column)); + columns.forEach(column -> addField(schemaBuilder, column, null)); Schema valueSchema = schemaBuilder.build(); // And a generator that can be used to create values from rows in the result set ... @@ -134,12 +136,13 @@ public TableSchema create(Table table, Predicate filter, ColumnMappers table.columns().forEach(column -> { if (table.isPrimaryKeyColumn(column.name())) { // The column is part of the primary key, so ALWAYS add it to the PK schema ... - addField(keySchemaBuilder, column); + addField(keySchemaBuilder, column, null); hasPrimaryKey.set(true); } if (filter == null || filter.test(new ColumnId(tableId, column.name()))) { // Add the column to the value schema only if the column has not been filtered ... - addField(valSchemaBuilder, column); + ColumnMapper mapper = mappers == null ? null : mappers.mapperFor(tableId, column); + addField(valSchemaBuilder, column, mapper); } }); Schema valSchema = valSchemaBuilder.build(); @@ -248,7 +251,7 @@ protected Field[] fieldsForColumns(Schema schema, List columns) { Field[] fields = new Field[columns.size()]; AtomicInteger i = new AtomicInteger(0); columns.forEach(column -> { - Field field = schema.field(column.name()); // may be null if the field is unused ... + Field field = schema.field(column.name()); // may be null if the field is unused ... fields[i.getAndIncrement()] = field; }); return fields; @@ -277,7 +280,7 @@ protected ValueConverter[] convertersForColumns(Schema schema, TableId tableId, ValueConverter valueConverter = createValueConverterFor(column, field); assert valueConverter != null; if (mappers != null) { - ValueConverter mappingConverter = mappers.mapperFor(tableId, column); + ValueConverter mappingConverter = mappers.mappingConverterFor(tableId, column); if (mappingConverter != null) { converter = (value) -> mappingConverter.convert(valueConverter.convert(value)); } @@ -295,9 +298,11 @@ protected ValueConverter[] convertersForColumns(Schema schema, TableId tableId, * * @param builder the schema builder; never null * @param column the column definition + * @param mapper the mapping function for the column; may be null if the columns is not to be mapped to different values */ - protected void addField(SchemaBuilder builder, Column column) { - addField(builder, column.name(), column.jdbcType(), column.typeName(), column.length(), column.scale(), column.isOptional()); + protected void addField(SchemaBuilder builder, Column column, ColumnMapper mapper) { + addField(builder, column.name(), column.jdbcType(), column.typeName(), column.length(), + column.scale(), column.isOptional(), mapper); } /** @@ -307,16 +312,19 @@ protected void addField(SchemaBuilder builder, Column column) { * this method and delegate to this method before and/or after the custom logic. Similar behavior should be addressed * in a specialized {@link #createValueConverterFor(Column, Field)} as well. * - * @param builder the schema builder; never null + * @param parentBuilder the builder for the schema used to {@link SchemaBuilder#field(String, Schema) define} the new field; + * never null * @param columnName the name of the column * @param jdbcType the column's {@link Types JDBC type} * @param typeName the column's DBMS-specific type name * @param columnLength the length of the column * @param columnScale the scale of the column values, or 0 if not a decimal value * @param optional {@code true} if the column is optional, or {@code false} if the column is known to always have a value + * @param mapper the mapping function for the column; may be null if the columns is not to be mapped to different values */ - protected void addField(SchemaBuilder builder, String columnName, int jdbcType, String typeName, int columnLength, - int columnScale, boolean optional) { + protected void addField(SchemaBuilder parentBuilder, String columnName, int jdbcType, String typeName, int columnLength, + int columnScale, boolean optional, ColumnMapper mapper) { + SchemaBuilder fieldBuilder = null; switch (jdbcType) { case Types.NULL: LOGGER.warn("Unexpected JDBC type: NULL"); @@ -325,106 +333,101 @@ protected void addField(SchemaBuilder builder, String columnName, int jdbcType, // Single- and multi-bit values ... case Types.BIT: if (columnLength > 1) { - SchemaBuilder bitBuilder = Bits.builder(); - if (optional) bitBuilder.optional(); - builder.field(columnName, bitBuilder.build()); + fieldBuilder = Bits.builder(); + fieldBuilder.parameter("length", Integer.toString(columnLength)); break; } // otherwise, it is just one bit so use a boolean ... case Types.BOOLEAN: - builder.field(columnName, optional ? Schema.OPTIONAL_BOOLEAN_SCHEMA : Schema.BOOLEAN_SCHEMA); + fieldBuilder = SchemaBuilder.bool(); break; - // Binary values ... + // Fixed-length binary values ... case Types.BLOB: case Types.BINARY: + fieldBuilder = SchemaBuilder.bytes(); + break; + + // Variable-length binary values ... case Types.VARBINARY: case Types.LONGVARBINARY: - builder.field(columnName, optional ? Schema.OPTIONAL_BYTES_SCHEMA : Schema.BYTES_SCHEMA); + fieldBuilder = SchemaBuilder.bytes(); break; // Numeric integers case Types.TINYINT: // values are an 8-bit unsigned integer value between 0 and 255 - builder.field(columnName, optional ? Schema.OPTIONAL_INT8_SCHEMA : Schema.INT8_SCHEMA); + fieldBuilder = SchemaBuilder.int8(); break; case Types.SMALLINT: // values are a 16-bit signed integer value between -32768 and 32767 - builder.field(columnName, optional ? Schema.OPTIONAL_INT16_SCHEMA : Schema.INT16_SCHEMA); + fieldBuilder = SchemaBuilder.int16(); break; case Types.INTEGER: // values are a 32-bit signed integer value between - 2147483648 and 2147483647 - builder.field(columnName, optional ? Schema.OPTIONAL_INT32_SCHEMA : Schema.INT32_SCHEMA); + fieldBuilder = SchemaBuilder.int32(); break; case Types.BIGINT: // values are a 64-bit signed integer value between -9223372036854775808 and 9223372036854775807 - builder.field(columnName, optional ? Schema.OPTIONAL_INT64_SCHEMA : Schema.INT64_SCHEMA); + fieldBuilder = SchemaBuilder.int64(); break; // Numeric decimal numbers case Types.REAL: // values are single precision floating point number which supports 7 digits of mantissa. - builder.field(columnName, optional ? Schema.OPTIONAL_FLOAT32_SCHEMA : Schema.FLOAT32_SCHEMA); + fieldBuilder = SchemaBuilder.float32(); break; case Types.FLOAT: case Types.DOUBLE: // values are double precision floating point number which supports 15 digits of mantissa. - builder.field(columnName, optional ? Schema.OPTIONAL_FLOAT64_SCHEMA : Schema.OPTIONAL_FLOAT64_SCHEMA); + fieldBuilder = SchemaBuilder.float64(); break; case Types.NUMERIC: case Types.DECIMAL: // values are fixed-precision decimal values with exact precision. // Use Kafka Connect's arbitrary precision decimal type and use the column's specified scale ... - SchemaBuilder decBuilder = Decimal.builder(columnScale); - if (optional) decBuilder.optional(); - builder.field(columnName, decBuilder.build()); + fieldBuilder = Decimal.builder(columnScale); break; - // String values - case Types.CHAR: // variable-length - case Types.VARCHAR: // variable-length - case Types.LONGVARCHAR: // variable-length - case Types.CLOB: // variable-length - case Types.NCHAR: // fixed-length - case Types.NVARCHAR: // fixed-length - case Types.LONGNVARCHAR: // fixed-length - case Types.NCLOB: // fixed-length + // Fixed-length string values + case Types.CHAR: + case Types.NCHAR: + case Types.NVARCHAR: + case Types.LONGNVARCHAR: + case Types.NCLOB: + fieldBuilder = SchemaBuilder.string(); + break; + + // Variable-length string values + case Types.VARCHAR: + case Types.LONGVARCHAR: + case Types.CLOB: case Types.DATALINK: case Types.SQLXML: - builder.field(columnName, optional ? Schema.OPTIONAL_STRING_SCHEMA : Schema.STRING_SCHEMA); + fieldBuilder = SchemaBuilder.string(); break; // Date and time values case Types.DATE: - SchemaBuilder dateBuilder = Date.builder(); - if (optional) dateBuilder.optional(); - builder.field(columnName, dateBuilder.build()); + fieldBuilder = Date.builder(); break; case Types.TIME: - SchemaBuilder timeBuilder = Time.builder(); - if (optional) timeBuilder.optional(); - builder.field(columnName, timeBuilder.build()); + fieldBuilder = Time.builder(); break; case Types.TIMESTAMP: - SchemaBuilder timestampBuilder = Timestamp.builder(); - if (optional) timestampBuilder.optional(); - builder.field(columnName, timestampBuilder.build()); + fieldBuilder = Timestamp.builder(); break; case Types.TIME_WITH_TIMEZONE: - SchemaBuilder offsetTimeBuilder = IsoTime.builder(); - if (optional) offsetTimeBuilder.optional(); - builder.field(columnName, offsetTimeBuilder.build()); + fieldBuilder = IsoTime.builder(); break; case Types.TIMESTAMP_WITH_TIMEZONE: - SchemaBuilder tsWithTzBuilder = IsoTimestamp.builder(); - if (optional) tsWithTzBuilder.optional(); - builder.field(columnName, tsWithTzBuilder.build()); + fieldBuilder = IsoTimestamp.builder(); break; // Other types ... case Types.ROWID: // often treated as a string, but we'll generalize and treat it as a byte array - builder.field(columnName, optional ? Schema.OPTIONAL_BYTES_SCHEMA : Schema.BYTES_SCHEMA); + fieldBuilder = SchemaBuilder.bytes(); break; // Unhandled types @@ -436,29 +439,39 @@ protected void addField(SchemaBuilder builder, String columnName, int jdbcType, case Types.REF_CURSOR: case Types.STRUCT: default: - addOtherField(builder, columnName, jdbcType, typeName, columnLength, columnScale, optional); + fieldBuilder = addOtherField(columnName, jdbcType, typeName, columnLength, columnScale, optional, mapper); break; } + if (fieldBuilder != null) { + if (mapper != null) { + // Let the mapper add properties to the schema ... + mapper.alterFieldSchema(fieldBuilder); + } + if (optional) fieldBuilder.optional(); + parentBuilder.field(columnName, fieldBuilder.build()); + } } /** - * Add to the supplied {@link SchemaBuilder} a field for the column with the given information. + * Return a {@link SchemaBuilder} for a field for the column with the given information. *

* Subclasses that wish to override or extend the mappings of JDBC/DBMS types to Kafka Connect value types can override * this method and delegate to this method before and/or after the custom logic. Similar behavior should be addressed - * in a specialized {@link #addField(SchemaBuilder, String, int, String, int, int, boolean)} as well. + * in a specialized {@link #addField(SchemaBuilder, String, int, String, int, int, boolean, ColumnMapper)} as well. * - * @param builder the schema builder; never null * @param columnName the name of the column * @param jdbcType the column's {@link Types JDBC type} * @param typeName the column's DBMS-specific type name * @param columnLength the length of the column * @param columnScale the scale of the column values, or 0 if not a decimal value * @param optional {@code true} if the column is optional, or {@code false} if the column is known to always have a value + * @param mapper the mapping function for the column; may be null if the columns is not to be mapped to different values + * @return the {@link SchemaBuilder} for the new field, ready to be {@link SchemaBuilder#build() build}; may be null */ - protected void addOtherField(SchemaBuilder builder, String columnName, int jdbcType, String typeName, int columnLength, - int columnScale, boolean optional) { + protected SchemaBuilder addOtherField(String columnName, int jdbcType, String typeName, int columnLength, + int columnScale, boolean optional, ColumnMapper mapper) { LOGGER.warn("Unexpected JDBC type: {}", jdbcType); + return null; } /** @@ -856,8 +869,13 @@ protected Object convertDate(Field fieldDefn, Object data) { java.util.Date date = null; if (data instanceof java.sql.Date) { // JDBC specification indicates that this will be the nominal object for this JDBC type. - // Contains only date info, with all time values set to all zeros (e.g. midnight) - date = (java.sql.Date) data; + // Contains only date info, with all time values set to all zeros (e.g. midnight). + // However, the java.sql.Date object *may* contain timezone information for some DBMS+Driver combinations. + // Therefore, first convert it to a local LocalDate, then to a LocalDateTime at midnight, and then to an + // instant in UTC ... + java.sql.Date sqlDate = (java.sql.Date) data; + LocalDate localDate = sqlDate.toLocalDate(); + date = java.util.Date.from(localDate.atStartOfDay().toInstant(ZoneOffset.UTC)); } else if (data instanceof java.util.Date) { // Possible that some implementations might use this. We should be prepared to ignore any time, // information by truncating to days and creating a new java.util.Date ... diff --git a/debezium-core/src/main/java/io/debezium/relational/mapping/ColumnMapper.java b/debezium-core/src/main/java/io/debezium/relational/mapping/ColumnMapper.java new file mode 100644 index 000000000..b08e26e04 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/relational/mapping/ColumnMapper.java @@ -0,0 +1,46 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.relational.mapping; + +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.SchemaBuilder; + +import io.debezium.config.Configuration; +import io.debezium.relational.Column; +import io.debezium.relational.ValueConverter; + +/** + * A factory for a function used to map values of a column. + * + * @author Randall Hauch + */ +@FunctionalInterface +public interface ColumnMapper { + + /** + * Initialize the ColumnMapper instance based upon the connector's configuration. + * @param config the connector's configuration + */ + default void initialize( Configuration config ) { + // do nothing + } + + /** + * Create for the given column a function that maps values. + * + * @param column the column description; never null + * @return the function that converts the value; may be null + */ + ValueConverter create(Column column); + + /** + * Optionally annotate the schema with properties to better capture the mapping behavior. + * @param schemaBuilder the builder for the {@link Field}'s schema; never null + */ + default void alterFieldSchema( SchemaBuilder schemaBuilder) { + // do nothing + } +} diff --git a/debezium-core/src/main/java/io/debezium/relational/ColumnMappers.java b/debezium-core/src/main/java/io/debezium/relational/mapping/ColumnMappers.java similarity index 71% rename from debezium-core/src/main/java/io/debezium/relational/ColumnMappers.java rename to debezium-core/src/main/java/io/debezium/relational/mapping/ColumnMappers.java index 93bf13b0b..c9d091f51 100644 --- a/debezium-core/src/main/java/io/debezium/relational/ColumnMappers.java +++ b/debezium-core/src/main/java/io/debezium/relational/mapping/ColumnMappers.java @@ -3,9 +3,8 @@ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 */ -package io.debezium.relational; +package io.debezium.relational.mapping; -import java.sql.Types; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -14,7 +13,14 @@ import org.apache.kafka.connect.errors.ConnectException; import io.debezium.annotation.Immutable; +import io.debezium.config.Configuration; import io.debezium.function.Predicates; +import io.debezium.relational.Column; +import io.debezium.relational.ColumnId; +import io.debezium.relational.Selectors; +import io.debezium.relational.Table; +import io.debezium.relational.TableId; +import io.debezium.relational.ValueConverter; import io.debezium.util.Strings; /** @@ -68,7 +74,21 @@ public Builder map(String fullyQualifiedColumnNames, ColumnMapper mapper) { * @return this object so that methods can be chained together; never null */ public Builder map(String fullyQualifiedColumnNames, Class mapperClass) { - return map(fullyQualifiedColumnNames, instantiateMapper(mapperClass)); + return map(fullyQualifiedColumnNames,mapperClass,null); + } + + /** + * Set a mapping function for the columns with fully-qualified names that match the given comma-separated list of regular + * expression patterns. + * + * @param fullyQualifiedColumnNames the comma-separated list of fully-qualified column names; may not be null + * @param mapperClass the Java class that implements {@code BiFunction} and that + * will be used to map actual values into values used in the output record; may not be null + * @param config the configuration to pass to the {@link ColumnMapper} instance; may be null + * @return this object so that methods can be chained together; never null + */ + public Builder map(String fullyQualifiedColumnNames, Class mapperClass, Configuration config) { + return map(fullyQualifiedColumnNames, instantiateMapper(mapperClass, config)); } /** @@ -80,15 +100,7 @@ public Builder map(String fullyQualifiedColumnNames, Class mapperC * @return this object so that methods can be chained together; never null */ public Builder truncateStrings(String fullyQualifiedColumnNames, int maxLength) { - return map(fullyQualifiedColumnNames, (column) -> { - return (value) -> { - if (value instanceof String) { - String str = (String) value; - if (str.length() > maxLength) return str.substring(0, maxLength); - } - return value; - }; - }); + return map(fullyQualifiedColumnNames, new TruncateStrings(maxLength)); } /** @@ -125,22 +137,7 @@ public Builder maskStrings(String fullyQualifiedColumnNames, int numberOfChars, * @return this object so that methods can be chained together; never null */ public Builder maskStrings(String fullyQualifiedColumnNames, String maskValue) { - return map(fullyQualifiedColumnNames, (column) -> { - switch (column.jdbcType()) { - case Types.CHAR: // variable-length - case Types.VARCHAR: // variable-length - case Types.LONGVARCHAR: // variable-length - case Types.CLOB: // variable-length - case Types.NCHAR: // fixed-length - case Types.NVARCHAR: // fixed-length - case Types.LONGNVARCHAR: // fixed-length - case Types.NCLOB: // fixed-length - case Types.DATALINK: - return (input) -> maskValue; - default: - return (input) -> input; - } - }); + return map(fullyQualifiedColumnNames, new MaskStrings(maskValue)); } /** @@ -153,8 +150,23 @@ public Builder maskStrings(String fullyQualifiedColumnNames, String maskValue) { * an existing mapping function should be removed * @return this object so that methods can be chained together; never null */ - @SuppressWarnings("unchecked") public Builder map(String fullyQualifiedColumnNames, String mapperClassName) { + return map(fullyQualifiedColumnNames,mapperClassName,null); + } + + /** + * Set a mapping function for the columns with fully-qualified names that match the given comma-separated list of regular + * expression patterns. + * + * @param fullyQualifiedColumnNames the comma-separated list of fully-qualified column names; may not be null + * @param mapperClassName the name of the Java class that implements {@code BiFunction} and that + * will be used to map actual values into values used in the output record; null if + * an existing mapping function should be removed + * @param config the configuration to pass to the {@link ColumnMapper} instance; may be null + * @return this object so that methods can be chained together; never null + */ + @SuppressWarnings("unchecked") + public Builder map(String fullyQualifiedColumnNames, String mapperClassName, Configuration config) { Class mapperClass = null; if (mapperClassName != null) { try { @@ -167,7 +179,7 @@ public Builder map(String fullyQualifiedColumnNames, String mapperClassName) { e); } } - return map(fullyQualifiedColumnNames, mapperClass); + return map(fullyQualifiedColumnNames, mapperClass, config); } /** @@ -194,8 +206,8 @@ private ColumnMappers(List rules) { * @param column the column; may not be null * @return the mapping function, or null if there is no mapping function */ - public ValueConverter mapperFor(Table table, Column column) { - return mapperFor(table.id(),column); + public ValueConverter mappingConverterFor(Table table, Column column) { + return mappingConverterFor(table.id(), column); } /** @@ -205,11 +217,23 @@ public ValueConverter mapperFor(Table table, Column column) { * @param column the column; may not be null * @return the mapping function, or null if there is no mapping function */ - public ValueConverter mapperFor(TableId tableId, Column column) { + public ValueConverter mappingConverterFor(TableId tableId, Column column) { + ColumnMapper mapper = mapperFor(tableId,column); + return mapper != null ? mapper.create(column) : null; + } + + /** + * Get the value mapping function for the given column. + * + * @param tableId the identifier of the table to which the column belongs; may not be null + * @param column the column; may not be null + * @return the mapping function, or null if there is no mapping function + */ + public ColumnMapper mapperFor(TableId tableId, Column column) { ColumnId id = new ColumnId(tableId, column.name()); Optional matchingRule = rules.stream().filter(rule -> rule.matches(id)).findFirst(); if (matchingRule.isPresent()) { - return matchingRule.get().mapper.create(column); + return matchingRule.get().mapper; } return null; } @@ -229,13 +253,19 @@ protected boolean matches(ColumnId id) { } } - protected static T instantiateMapper(Class clazz) { + protected static ColumnMapper instantiateMapper(Class clazz, Configuration config) { try { - return clazz.newInstance(); + ColumnMapper mapper = clazz.newInstance(); + if ( config != null ) { + mapper.initialize(config); + } + return mapper; } catch (InstantiationException e) { throw new ConnectException("Unable to instantiate column mapper class " + clazz.getName() + ": " + e.getMessage(), e); } catch (IllegalAccessException e) { throw new ConnectException("Unable to access column mapper class " + clazz.getName() + ": " + e.getMessage(), e); + } catch (Throwable e) { + throw new ConnectException("Unable to initialize the column mapper class " + clazz.getName() + ": " + e.getMessage(), e); } } } diff --git a/debezium-core/src/main/java/io/debezium/relational/mapping/MaskStrings.java b/debezium-core/src/main/java/io/debezium/relational/mapping/MaskStrings.java new file mode 100644 index 000000000..32d33530a --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/relational/mapping/MaskStrings.java @@ -0,0 +1,73 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.relational.mapping; + +import java.sql.Types; + +import org.apache.kafka.connect.data.SchemaBuilder; + +import io.debezium.annotation.Immutable; +import io.debezium.relational.Column; +import io.debezium.relational.ValueConverter; + +/** + * A {@link ColumnMapper} implementation that ensures that string values are masked by a predefined value. + * + * @author Randall Hauch + */ +public class MaskStrings implements ColumnMapper { + + private final MaskingValueConverter converter; + + /** + * Create a {@link ColumnMapper} that masks string values with a predefined value. + * + * @param maskValue the value that should be used in place of the actual value; may not be null + * @throws IllegalArgumentException if the {@code maxLength} is not positive + */ + public MaskStrings(String maskValue) { + if (maskValue == null) throw new IllegalArgumentException("Mask value may not be null"); + this.converter = new MaskingValueConverter(maskValue); + } + + @Override + public ValueConverter create(Column column) { + switch (column.jdbcType()) { + case Types.CHAR: // variable-length + case Types.VARCHAR: // variable-length + case Types.LONGVARCHAR: // variable-length + case Types.CLOB: // variable-length + case Types.NCHAR: // fixed-length + case Types.NVARCHAR: // fixed-length + case Types.LONGNVARCHAR: // fixed-length + case Types.NCLOB: // fixed-length + case Types.DATALINK: + return converter; + default: + return ValueConverter.passthrough(); + } + } + + @Override + public void alterFieldSchema(SchemaBuilder schemaBuilder) { + schemaBuilder.parameter("masked", "true"); + } + + @Immutable + protected static final class MaskingValueConverter implements ValueConverter { + protected final String maskValue; + + public MaskingValueConverter(String maskValue) { + this.maskValue = maskValue; + assert this.maskValue != null; + } + + @Override + public Object convert(Object value) { + return maskValue; + } + } +} diff --git a/debezium-core/src/main/java/io/debezium/relational/mapping/TruncateStrings.java b/debezium-core/src/main/java/io/debezium/relational/mapping/TruncateStrings.java new file mode 100644 index 000000000..e7d551541 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/relational/mapping/TruncateStrings.java @@ -0,0 +1,77 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.relational.mapping; + +import org.apache.kafka.connect.data.SchemaBuilder; + +import io.debezium.annotation.Immutable; +import io.debezium.config.Configuration; +import io.debezium.relational.Column; +import io.debezium.relational.ValueConverter; + +/** + * A {@link ColumnMapper} implementation that ensures that string values longer than a specified length will be truncated. + * + * @author Randall Hauch + */ +public class TruncateStrings implements ColumnMapper { + + private final TruncatingValueConverter converter; + + /** + * Create a {@link ColumnMapper} that truncates string values to a maximum length. + * + * @param maxLength the maximum number of characters allowed in values + * @throws IllegalArgumentException if the {@code maxLength} is not positive + */ + public TruncateStrings(int maxLength) { + if (maxLength <= 0) throw new IllegalArgumentException("Maximum length must be positive"); + this.converter = new TruncatingValueConverter(maxLength); + } + + @Override + public ValueConverter create(Column column) { + return converter; + } + + @Override + public void alterFieldSchema(SchemaBuilder schemaBuilder) { + Configuration params = Configuration.from(schemaBuilder.parameters()); + Integer length = params.getInteger("length"); + if ( length != null && converter.maxLength < length ) { + // Overwrite the parameter ... + schemaBuilder.parameter("length",Integer.toString(converter.maxLength)); + } + Integer maxLength = params.getInteger("maxLength"); + if ( maxLength != null && converter.maxLength < maxLength ) { + // Overwrite the parameter ... + schemaBuilder.parameter("maxLength",Integer.toString(converter.maxLength)); + } + if ( maxLength == null && length == null ) { + schemaBuilder.parameter("maxLength",Integer.toString(converter.maxLength)); + } + } + + @Immutable + protected static final class TruncatingValueConverter implements ValueConverter { + protected final int maxLength; + + public TruncatingValueConverter(int maxLength) { + this.maxLength = maxLength; + assert this.maxLength > 0; + } + + @Override + public Object convert(Object value) { + if (value instanceof String) { + String str = (String) value; + if (str.length() > maxLength) return str.substring(0, maxLength); + } + return value; + } + } + +} diff --git a/debezium-core/src/test/java/io/debezium/relational/ColumnMappersTest.java b/debezium-core/src/test/java/io/debezium/relational/mapping/ColumnMappersTest.java similarity index 89% rename from debezium-core/src/test/java/io/debezium/relational/ColumnMappersTest.java rename to debezium-core/src/test/java/io/debezium/relational/mapping/ColumnMappersTest.java index 44e1064ae..3371489d6 100644 --- a/debezium-core/src/test/java/io/debezium/relational/ColumnMappersTest.java +++ b/debezium-core/src/test/java/io/debezium/relational/mapping/ColumnMappersTest.java @@ -3,7 +3,7 @@ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 */ -package io.debezium.relational; +package io.debezium.relational.mapping; import java.sql.Types; @@ -12,6 +12,9 @@ import static org.fest.assertions.Assertions.assertThat; +import io.debezium.relational.Column; +import io.debezium.relational.TableId; +import io.debezium.relational.ValueConverter; import io.debezium.util.Strings; /** @@ -39,14 +42,14 @@ public void beforeEach() { @Test public void shouldNotFindMapperForUnmatchedColumn() { mappers = ColumnMappers.create().truncateStrings(fullyQualifiedNames, 10).build(); - converter = mappers.mapperFor(tableId, column2); + converter = mappers.mappingConverterFor(tableId, column2); assertThat(converter).isNull(); } @Test public void shouldTruncateStrings() { mappers = ColumnMappers.create().truncateStrings(fullyQualifiedNames.toUpperCase(), 10).build(); // inexact case - converter = mappers.mapperFor(tableId, column); + converter = mappers.mappingConverterFor(tableId, column); assertThat(converter).isNotNull(); assertThat(converter.convert("12345678901234567890").toString()).isEqualTo("1234567890"); assertThat(converter.convert("12345678901234567890").toString().length()).isEqualTo(10); @@ -63,7 +66,7 @@ public void shouldTruncateStrings() { public void shouldMaskStringsToFixedLength() { String maskValue = "**********"; mappers = ColumnMappers.create().maskStrings(fullyQualifiedNames, maskValue.length()).build(); // exact case - converter = mappers.mapperFor(tableId, column); + converter = mappers.mappingConverterFor(tableId, column); assertThat(converter).isNotNull(); assertThat(converter.convert("12345678901234567890")).isEqualTo(maskValue); assertThat(converter.convert("12345678901")).isEqualTo(maskValue); @@ -77,7 +80,7 @@ public void shouldMaskStringsToFixedNumberOfSpecifiedCharacters() { char maskChar = '='; String maskValue = Strings.createString(maskChar, 10); mappers = ColumnMappers.create().maskStrings(fullyQualifiedNames, maskValue.length(), maskChar).build(); - converter = mappers.mapperFor(tableId, column); + converter = mappers.mappingConverterFor(tableId, column); assertThat(converter).isNotNull(); assertThat(converter.convert("12345678901234567890")).isEqualTo(maskValue); assertThat(converter.convert("12345678901")).isEqualTo(maskValue); @@ -90,7 +93,7 @@ public void shouldMaskStringsToFixedNumberOfSpecifiedCharacters() { public void shouldMaskStringsWithSpecificValue() { String maskValue = "*-*-*-*-*"; mappers = ColumnMappers.create().maskStrings(fullyQualifiedNames, maskValue).build(); // exact case - converter = mappers.mapperFor(tableId, column); + converter = mappers.mappingConverterFor(tableId, column); assertThat(converter).isNotNull(); assertThat(converter.convert("12345678901234567890")).isEqualTo(maskValue); assertThat(converter.convert("12345678901")).isEqualTo(maskValue); @@ -103,7 +106,7 @@ public void shouldMaskStringsWithSpecificValue() { public void shouldMapValuesUsingColumnMapperInstance() { RepeatingColumnMapper mapper = new RepeatingColumnMapper(); mappers = ColumnMappers.create().map(fullyQualifiedNames, mapper).build(); - converter = mappers.mapperFor(tableId, column); + converter = mappers.mappingConverterFor(tableId, column); assertThat(converter).isNotNull(); assertThat(converter.convert("1234")).isEqualTo("12341234"); assertThat(converter.convert("a")).isEqualTo("aa"); @@ -113,7 +116,7 @@ public void shouldMapValuesUsingColumnMapperInstance() { @Test public void shouldMapValuesUsingFunctionByClassName() { mappers = ColumnMappers.create().map(fullyQualifiedNames, RepeatingColumnMapper.class.getName()).build(); - converter = mappers.mapperFor(tableId, column); + converter = mappers.mappingConverterFor(tableId, column); assertThat(converter).isNotNull(); assertThat(converter.convert("1234")).isEqualTo("12341234"); assertThat(converter.convert("a")).isEqualTo("aa"); diff --git a/debezium-core/src/test/java/io/debezium/relational/mapping/MaskStringsTest.java b/debezium-core/src/test/java/io/debezium/relational/mapping/MaskStringsTest.java new file mode 100644 index 000000000..1cfc9bd88 --- /dev/null +++ b/debezium-core/src/test/java/io/debezium/relational/mapping/MaskStringsTest.java @@ -0,0 +1,40 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.relational.mapping; + +import java.sql.Types; + +import org.junit.Test; + +import static org.fest.assertions.Assertions.assertThat; + +import io.debezium.relational.Column; +import io.debezium.relational.ValueConverter; + +/** + * @author Randall Hauch + * + */ +public class MaskStringsTest { + + private final Column column = Column.editor().name("col").jdbcType(Types.VARCHAR).create(); + private ValueConverter converter; + + @Test + public void shouldTruncateStrings() { + String maskValue = "*****"; + converter = new MaskStrings(maskValue).create(column); + assertThat(converter.convert("1234567890").toString()).isEqualTo(maskValue); + assertThat(converter.convert("123456").toString()).isEqualTo(maskValue); + assertThat(converter.convert("12345").toString()).isEqualTo(maskValue); + assertThat(converter.convert("1234").toString()).isEqualTo(maskValue); + assertThat(converter.convert("123").toString()).isEqualTo(maskValue); + assertThat(converter.convert("12").toString()).isEqualTo(maskValue); + assertThat(converter.convert("1").toString()).isEqualTo(maskValue); + assertThat(converter.convert(null).toString()).isEqualTo(maskValue); + } + +} diff --git a/debezium-core/src/test/java/io/debezium/relational/mapping/TruncateStringsTest.java b/debezium-core/src/test/java/io/debezium/relational/mapping/TruncateStringsTest.java new file mode 100644 index 000000000..bd1cb0cee --- /dev/null +++ b/debezium-core/src/test/java/io/debezium/relational/mapping/TruncateStringsTest.java @@ -0,0 +1,37 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.relational.mapping; + +import org.junit.Test; + +import static org.fest.assertions.Assertions.assertThat; + +import io.debezium.relational.Column; +import io.debezium.relational.ValueConverter; + +/** + * @author Randall Hauch + * + */ +public class TruncateStringsTest { + + private final Column column = Column.editor().name("col").create(); + private ValueConverter converter; + + @Test + public void shouldTruncateStrings() { + converter = new TruncateStrings(5).create(column); + assertThat(converter.convert("1234567890").toString()).isEqualTo("12345"); + assertThat(converter.convert("123456").toString()).isEqualTo("12345"); + assertThat(converter.convert("12345").toString()).isEqualTo("12345"); + assertThat(converter.convert("1234").toString()).isEqualTo("1234"); + assertThat(converter.convert("123").toString()).isEqualTo("123"); + assertThat(converter.convert("12").toString()).isEqualTo("12"); + assertThat(converter.convert("1").toString()).isEqualTo("1"); + assertThat(converter.convert(null)).isNull(); + } + +} diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java b/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java index caefc1443..e3f744108 100644 --- a/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java +++ b/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java @@ -5,6 +5,8 @@ */ package io.debezium.embedded; +import static org.junit.Assert.fail; + import java.nio.file.Path; import java.util.LinkedList; import java.util.List; @@ -20,6 +22,8 @@ import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.json.JsonDeserializer; import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.storage.FileOffsetBackingStore; @@ -28,6 +32,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; + import static org.fest.assertions.Assertions.assertThat; import io.debezium.config.Configuration; @@ -56,9 +65,24 @@ public abstract class AbstractConnectorTest implements Testing { protected long pollTimeoutInMs = TimeUnit.SECONDS.toMillis(5); protected final Logger logger = LoggerFactory.getLogger(getClass()); private CountDownLatch latch; + private JsonConverter keyJsonConverter = new JsonConverter(); + private JsonConverter valueJsonConverter = new JsonConverter(); + private JsonDeserializer keyJsonDeserializer = new JsonDeserializer(); + private JsonDeserializer valueJsonDeserializer = new JsonDeserializer(); @Before public final void initializeConnectorTestFramework() { + keyJsonConverter = new JsonConverter(); + valueJsonConverter = new JsonConverter(); + keyJsonDeserializer = new JsonDeserializer(); + valueJsonDeserializer = new JsonDeserializer(); + Configuration converterConfig = Configuration.create().build(); + Configuration deserializerConfig = Configuration.create().build(); + keyJsonConverter.configure(converterConfig.asMap(), true); + valueJsonConverter.configure(converterConfig.asMap(), false); + keyJsonDeserializer.configure(deserializerConfig.asMap(), true); + valueJsonDeserializer.configure(deserializerConfig.asMap(), false); + resetBeforeEachTest(); consumedLines = new ArrayBlockingQueue<>(getMaximumEnqueuedRecordCount()); Testing.Files.delete(OFFSET_STORE_PATH); @@ -308,6 +332,51 @@ protected void print(SourceRecord record) { Testing.print(sb.toString()); } + protected void printJson(SourceRecord record) { + JsonNode keyJson = null; + JsonNode valueJson = null; + try { + // First serialize and deserialize the key ... + byte[] keyBytes = keyJsonConverter.fromConnectData(record.topic(), record.keySchema(), record.key()); + keyJson = keyJsonDeserializer.deserialize(record.topic(), keyBytes); + // then the value ... + byte[] valueBytes = valueJsonConverter.fromConnectData(record.topic(), record.valueSchema(), record.value()); + valueJson = valueJsonDeserializer.deserialize(record.topic(), valueBytes); + // And finally get ready to print it ... + JsonNodeFactory nodeFactory = new JsonNodeFactory(false); + ObjectNode message = nodeFactory.objectNode(); + message.set("key", keyJson); + message.set("value", valueJson); + Testing.print("Message on topic '" + record.topic() + "':"); + Testing.print(prettyJson(message)); + } catch (Throwable t) { + Testing.printError(t); + Testing.print("Problem with message on topic '" + record.topic() + "':"); + if ( keyJson != null ) { + Testing.print("valid key = " + prettyJson(keyJson)); + } else { + Testing.print("invalid key"); + } + if ( valueJson != null ) { + Testing.print("valid value = " + prettyJson(valueJson)); + } else { + Testing.print("invalid value"); + } + fail(t.getMessage()); + } + } + + protected String prettyJson(JsonNode json) { + try { + return new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(json); + } catch (Throwable t) { + Testing.printError(t); + fail(t.getMessage()); + assert false : "Will not get here"; + return null; + } + } + protected void append(Object obj, StringBuilder sb) { if (obj == null) { sb.append("null");