DBZ-626 Passing schema validator as AvroValidator instance instead of Function
This commit is contained in:
parent
40a5422998
commit
cddd5fcc8a
@ -106,7 +106,7 @@ public MySqlSchema(Configuration config, String serverName, Predicate<String> gt
|
||||
BigIntUnsignedHandlingMode bigIntUnsignedHandlingMode = BigIntUnsignedHandlingMode.parse(bigIntUnsignedHandlingModeStr);
|
||||
BigIntUnsignedMode bigIntUnsignedMode = bigIntUnsignedHandlingMode.asBigIntUnsignedMode();
|
||||
MySqlValueConverters valueConverters = new MySqlValueConverters(decimalMode, timePrecisionMode, bigIntUnsignedMode);
|
||||
this.schemaBuilder = new TableSchemaBuilder(valueConverters, schemaNameValidator::validate, SourceInfo.SCHEMA);
|
||||
this.schemaBuilder = new TableSchemaBuilder(valueConverters, schemaNameValidator, SourceInfo.SCHEMA);
|
||||
|
||||
// Set up the server name and schema prefix ...
|
||||
if (serverName != null) serverName = serverName.trim();
|
||||
|
@ -11,7 +11,6 @@
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
@ -48,7 +47,7 @@ public class PostgresSchema {
|
||||
private final TableSchemaBuilder schemaBuilder;
|
||||
private final String schemaPrefix;
|
||||
private final Tables tables;
|
||||
private final Function<String, String> schemaNameValidator;
|
||||
private final AvroValidator schemaNameValidator;
|
||||
private final PostgresValueConverter valueConverter;
|
||||
|
||||
private Map<String, Integer> typeInfo;
|
||||
@ -67,7 +66,7 @@ protected PostgresSchema(PostgresConnectorConfig config, TypeRegistry typeRegist
|
||||
|
||||
this.valueConverter = new PostgresValueConverter(config.decimalHandlingMode(), config.temporalPrecisionMode(),
|
||||
ZoneOffset.UTC, null, config.includeUnknownDatatypes(), typeRegistry);
|
||||
this.schemaNameValidator = AvroValidator.create(LOGGER)::validate;
|
||||
this.schemaNameValidator = AvroValidator.create(LOGGER);
|
||||
this.schemaBuilder = new TableSchemaBuilder(valueConverter, this.schemaNameValidator, SourceInfo.SCHEMA);
|
||||
|
||||
// Set up the server name and schema prefix ...
|
||||
@ -168,7 +167,7 @@ public Table tableFor(TableId id) {
|
||||
}
|
||||
|
||||
protected String validateSchemaName(String name) {
|
||||
return this.schemaNameValidator.apply(name);
|
||||
return this.schemaNameValidator.validate(name);
|
||||
}
|
||||
|
||||
protected TableSchema schemaFor(TableId id) {
|
||||
|
@ -26,6 +26,7 @@
|
||||
import io.debezium.data.SchemaUtil;
|
||||
import io.debezium.relational.mapping.ColumnMapper;
|
||||
import io.debezium.relational.mapping.ColumnMappers;
|
||||
import io.debezium.util.AvroValidator;
|
||||
|
||||
/**
|
||||
* Builder that constructs {@link TableSchema} instances for {@link Table} definitions.
|
||||
@ -46,7 +47,7 @@ public class TableSchemaBuilder {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(TableSchemaBuilder.class);
|
||||
|
||||
private final Function<String, String> schemaNameValidator;
|
||||
private final AvroValidator schemaNameValidator;
|
||||
private final ValueConverterProvider valueConverterProvider;
|
||||
private final Schema sourceInfoSchema;
|
||||
|
||||
@ -55,9 +56,9 @@ public class TableSchemaBuilder {
|
||||
*
|
||||
* @param valueConverterProvider the provider for obtaining {@link ValueConverter}s and {@link SchemaBuilder}s; may not be
|
||||
* null
|
||||
* @param schemaNameValidator the validation function for schema names; may not be null
|
||||
* @param schemaNameValidator the validator for schema names; may not be null
|
||||
*/
|
||||
public TableSchemaBuilder(ValueConverterProvider valueConverterProvider, Function<String, String> schemaNameValidator, Schema sourceInfoSchema) {
|
||||
public TableSchemaBuilder(ValueConverterProvider valueConverterProvider, AvroValidator schemaNameValidator, Schema sourceInfoSchema) {
|
||||
this.schemaNameValidator = schemaNameValidator;
|
||||
this.valueConverterProvider = valueConverterProvider;
|
||||
this.sourceInfoSchema = sourceInfoSchema;
|
||||
@ -87,8 +88,8 @@ public TableSchema create(String schemaPrefix, String envelopSchemaName, Table t
|
||||
final String tableIdStr = tableId.toString();
|
||||
final String schemaNamePrefix = schemaPrefix + tableIdStr;
|
||||
LOGGER.debug("Mapping table '{}' to schemas under '{}'", tableId, schemaNamePrefix);
|
||||
SchemaBuilder valSchemaBuilder = SchemaBuilder.struct().name(schemaNameValidator.apply(schemaNamePrefix + ".Value"));
|
||||
SchemaBuilder keySchemaBuilder = SchemaBuilder.struct().name(schemaNameValidator.apply(schemaNamePrefix + ".Key"));
|
||||
SchemaBuilder valSchemaBuilder = SchemaBuilder.struct().name(schemaNameValidator.validate(schemaNamePrefix + ".Value"));
|
||||
SchemaBuilder keySchemaBuilder = SchemaBuilder.struct().name(schemaNameValidator.validate(schemaNamePrefix + ".Key"));
|
||||
AtomicBoolean hasPrimaryKey = new AtomicBoolean(false);
|
||||
table.columns().forEach(column -> {
|
||||
if (table.isPrimaryKeyColumn(column.name())) {
|
||||
@ -111,7 +112,7 @@ public TableSchema create(String schemaPrefix, String envelopSchemaName, Table t
|
||||
}
|
||||
|
||||
Envelope envelope = Envelope.defineSchema()
|
||||
.withName(schemaNameValidator.apply(envelopSchemaName))
|
||||
.withName(schemaNameValidator.validate(envelopSchemaName))
|
||||
.withRecord(valSchema)
|
||||
.withSource(sourceInfoSchema)
|
||||
.build();
|
||||
|
@ -95,13 +95,13 @@ public void checkPreconditions() {
|
||||
|
||||
@Test(expected = NullPointerException.class)
|
||||
public void shouldFailToBuildTableSchemaFromNullTable() {
|
||||
new TableSchemaBuilder(new JdbcValueConverters(), validator::validate, SchemaBuilder.struct().build())
|
||||
new TableSchemaBuilder(new JdbcValueConverters(), validator, SchemaBuilder.struct().build())
|
||||
.create(prefix, "sometopic", null, null, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldBuildTableSchemaFromTable() {
|
||||
schema = new TableSchemaBuilder(new JdbcValueConverters(), validator::validate, SchemaBuilder.struct().build())
|
||||
schema = new TableSchemaBuilder(new JdbcValueConverters(), validator, SchemaBuilder.struct().build())
|
||||
.create(prefix, "sometopic", table, null, null);
|
||||
assertThat(schema).isNotNull();
|
||||
}
|
||||
@ -109,7 +109,7 @@ public void shouldBuildTableSchemaFromTable() {
|
||||
@Test
|
||||
public void shouldBuildTableSchemaFromTableWithoutPrimaryKey() {
|
||||
table = table.edit().setPrimaryKeyNames().create();
|
||||
schema = new TableSchemaBuilder(new JdbcValueConverters(), validator::validate, SchemaBuilder.struct().build())
|
||||
schema = new TableSchemaBuilder(new JdbcValueConverters(), validator, SchemaBuilder.struct().build())
|
||||
.create(prefix, "sometopic", table, null, null);
|
||||
assertThat(schema).isNotNull();
|
||||
// Check the keys ...
|
||||
|
Loading…
Reference in New Issue
Block a user