DBZ-3535: Introduce schema.name.adjustment.mode

This commit is contained in:
Sergei Morozov 2022-03-15 19:22:21 -07:00 committed by Gunnar Morling
parent 3a03ff0186
commit b258674782
40 changed files with 371 additions and 61 deletions

View File

@ -21,7 +21,7 @@ public class LegacyV1MongoDbSourceInfoStructMaker extends LegacyV1AbstractSource
public LegacyV1MongoDbSourceInfoStructMaker(String connector, String version, CommonConnectorConfig connectorConfig) {
super(connector, version, connectorConfig);
schema = commonSchemaBuilder()
.name(SchemaNameAdjuster.defaultAdjuster().adjust("io.debezium.connector.mongo.Source"))
.name(SchemaNameAdjuster.avroAdjuster().adjust("io.debezium.connector.mongo.Source"))
.version(SourceInfo.SCHEMA_VERSION)
.field(SourceInfo.SERVER_NAME_KEY, Schema.STRING_SCHEMA)
.field(SourceInfo.REPLICA_SET_NAME, Schema.STRING_SCHEMA)

View File

@ -614,7 +614,8 @@ public boolean isFullUpdate() {
.connector(
MAX_COPY_THREADS,
SNAPSHOT_MODE,
CAPTURE_MODE)
CAPTURE_MODE,
SCHEMA_NAME_ADJUSTMENT_MODE)
.create();
/**

View File

@ -71,13 +71,13 @@ public String version() {
@Override
public ChangeEventSourceCoordinator<MongoDbPartition, MongoDbOffsetContext> start(Configuration config) {
final MongoDbConnectorConfig connectorConfig = new MongoDbConnectorConfig(config);
final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
final SchemaNameAdjuster schemaNameAdjuster = connectorConfig.schemaNameAdjustmentMode().createAdjuster();
this.taskName = "task" + config.getInteger(MongoDbConnectorConfig.TASK_ID);
this.taskContext = new MongoDbTaskContext(config);
final Schema structSchema = connectorConfig.getSourceInfoStructMaker().schema();
this.schema = new MongoDbSchema(taskContext.filters(), taskContext.topicSelector(), structSchema);
this.schema = new MongoDbSchema(taskContext.filters(), taskContext.topicSelector(), structSchema, schemaNameAdjuster);
final ReplicaSets replicaSets = getReplicaSets(config);
final MongoDbOffsetContext previousOffset = getPreviousOffset(connectorConfig, replicaSets);

View File

@ -55,14 +55,16 @@ public class MongoDbSchema implements DatabaseSchema<CollectionId> {
private final Filters filters;
private final TopicSelector<CollectionId> topicSelector;
private final Schema sourceSchema;
private final SchemaNameAdjuster adjuster = SchemaNameAdjuster.create();
private final SchemaNameAdjuster adjuster;
private final ConcurrentMap<CollectionId, MongoDbCollectionSchema> collections = new ConcurrentHashMap<>();
private final JsonSerialization serialization = new JsonSerialization();
public MongoDbSchema(Filters filters, TopicSelector<CollectionId> topicSelector, Schema sourceSchema) {
public MongoDbSchema(Filters filters, TopicSelector<CollectionId> topicSelector, Schema sourceSchema,
SchemaNameAdjuster schemaNameAdjuster) {
this.filters = filters;
this.topicSelector = topicSelector;
this.sourceSchema = sourceSchema;
this.adjuster = schemaNameAdjuster;
}
@Override

View File

@ -10,7 +10,6 @@
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.AbstractSourceInfoStructMaker;
import io.debezium.util.SchemaNameAdjuster;
public class MongoDbSourceInfoStructMaker extends AbstractSourceInfoStructMaker<SourceInfo> {
@ -19,7 +18,7 @@ public class MongoDbSourceInfoStructMaker extends AbstractSourceInfoStructMaker<
public MongoDbSourceInfoStructMaker(String connector, String version, CommonConnectorConfig connectorConfig) {
super(connector, version, connectorConfig);
schema = commonSchemaBuilder()
.name(SchemaNameAdjuster.defaultAdjuster().adjust("io.debezium.connector.mongo.Source"))
.name(connectorConfig.schemaNameAdjustmentMode().createAdjuster().adjust("io.debezium.connector.mongo.Source"))
.field(SourceInfo.REPLICA_SET_NAME, Schema.STRING_SCHEMA)
.field(SourceInfo.COLLECTION, Schema.STRING_SCHEMA)
.field(SourceInfo.ORDER, Schema.INT32_SCHEMA)

View File

@ -44,6 +44,8 @@ public void shouldAlwaysProduceCollectionSchema() {
private static MongoDbSchema getSchema(Configuration config, MongoDbTaskContext taskContext) {
final MongoDbConnectorConfig connectorConfig = new MongoDbConnectorConfig(config);
return new MongoDbSchema(taskContext.filters(), taskContext.topicSelector(), connectorConfig.getSourceInfoStructMaker().schema());
return new MongoDbSchema(taskContext.filters(), taskContext.topicSelector(),
connectorConfig.getSourceInfoStructMaker().schema(),
connectorConfig.schemaNameAdjustmentMode().createAdjuster());
}
}

View File

@ -987,6 +987,7 @@ public static GtidNewChannelPosition parse(String value, String defaultValue) {
TIME_PRECISION_MODE,
ENABLE_TIME_ADJUSTER,
BINARY_HANDLING_MODE,
SCHEMA_NAME_ADJUSTMENT_MODE,
ROW_COUNT_FOR_STREAMING_RESULT_SETS,
INCREMENTAL_SNAPSHOT_CHUNK_SIZE,
INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES)

View File

@ -68,7 +68,7 @@ public ChangeEventSourceCoordinator<MySqlPartition, MySqlOffsetContext> start(Co
.with(AbstractDatabaseHistory.INTERNAL_PREFER_DDL, true)
.build());
final TopicSelector<TableId> topicSelector = MySqlTopicSelector.defaultSelector(connectorConfig);
final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
final SchemaNameAdjuster schemaNameAdjuster = connectorConfig.schemaNameAdjustmentMode().createAdjuster();
final MySqlValueConverters valueConverters = getValueConverters(connectorConfig);
// DBZ-3238: automatically set "useCursorFetch" to true when a snapshot fetch size other than the default of -1 is given
@ -159,7 +159,7 @@ public ChangeEventSourceCoordinator<MySqlPartition, MySqlOffsetContext> start(Co
default:
break;
}
});
}, schemaNameAdjuster);
}
final EventDispatcher<MySqlPartition, TableId> dispatcher = new EventDispatcher<>(
@ -172,8 +172,7 @@ public ChangeEventSourceCoordinator<MySqlPartition, MySqlOffsetContext> start(Co
null,
metadataProvider,
heartbeat,
schemaNameAdjuster
);
schemaNameAdjuster);
final MySqlStreamingChangeEventSourceMetrics streamingMetrics = new MySqlStreamingChangeEventSourceMetrics(taskContext, queue, metadataProvider);

View File

@ -86,6 +86,7 @@
import io.debezium.util.Clock;
import io.debezium.util.ElapsedTimeStrategy;
import io.debezium.util.Metronome;
import io.debezium.util.SchemaNameAdjuster;
import io.debezium.util.Strings;
import io.debezium.util.Threads;
@ -317,7 +318,8 @@ public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
// Set up for JMX ...
metrics = new BinlogReaderMetrics(client, context, name, changeEventQueueMetrics);
heartbeat = Heartbeat.create(context.getConnectorConfig().getHeartbeatInterval(),
context.topicSelector().getHeartbeatTopic(), context.getConnectorConfig().getLogicalName());
context.topicSelector().getHeartbeatTopic(), context.getConnectorConfig().getLogicalName(),
SchemaNameAdjuster.create());
}
@Override

View File

@ -51,6 +51,7 @@
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import io.debezium.util.SchemaNameAdjuster;
import io.debezium.util.Strings;
import io.debezium.util.Threads;
@ -741,7 +742,8 @@ protected void execute(MySqlPartition partition) {
.create(
context.getConnectorConfig().getHeartbeatInterval(),
context.topicSelector().getHeartbeatTopic(),
context.getConnectorConfig().getLogicalName())
context.getConnectorConfig().getLogicalName(),
SchemaNameAdjuster.create())
.forcedBeat(source.partition(), source.offset(), this::enqueueRecord);
}
finally {

View File

@ -79,7 +79,7 @@ public void beforeEach() {
tableSchemaBuilder = new TableSchemaBuilder(
converters,
new MySqlDefaultValueConverter(converters),
SchemaNameAdjuster.create(), new CustomConverterRegistry(null), SchemaBuilder.struct().build(), false, false);
SchemaNameAdjuster.NO_OP, new CustomConverterRegistry(null), SchemaBuilder.struct().build(), false, false);
}
@Test

View File

@ -57,7 +57,7 @@ public void beforeEach() {
tableSchemaBuilder = new TableSchemaBuilder(
converters,
new MySqlDefaultValueConverter(converters),
SchemaNameAdjuster.create(), new CustomConverterRegistry(null), SchemaBuilder.struct().build(), false, false);
SchemaNameAdjuster.NO_OP, new CustomConverterRegistry(null), SchemaBuilder.struct().build(), false, false);
}
@ -195,7 +195,7 @@ public void parseUnsignedBigIntDefaultValueToBigDecimal() {
final TableSchemaBuilder tableSchemaBuilder = new TableSchemaBuilder(
converters,
new MySqlDefaultValueConverter(converters),
SchemaNameAdjuster.create(), new CustomConverterRegistry(null), SchemaBuilder.struct().build(), false, false);
SchemaNameAdjuster.NO_OP, new CustomConverterRegistry(null), SchemaBuilder.struct().build(), false, false);
String sql = "CREATE TABLE UNSIGNED_BIGINT_TABLE (\n" +
" A BIGINT UNSIGNED NULL DEFAULT 0,\n" +
@ -352,7 +352,7 @@ public void parseNumericAndDecimalToDecimalDefaultValue() {
final TableSchemaBuilder tableSchemaBuilder = new TableSchemaBuilder(
converters,
new MySqlDefaultValueConverter(converters),
SchemaNameAdjuster.create(), new CustomConverterRegistry(null), SchemaBuilder.struct().build(), false, false);
SchemaNameAdjuster.NO_OP, new CustomConverterRegistry(null), SchemaBuilder.struct().build(), false, false);
String sql = "CREATE TABLE NUMERIC_DECIMAL_TABLE (\n" +
" A NUMERIC NOT NULL DEFAULT 1.23,\n" +
" B DECIMAL(5,3) NOT NULL DEFAULT 2.321,\n" +

View File

@ -0,0 +1,85 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assume.assumeFalse;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.List;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import io.debezium.config.CommonConnectorConfig.SchemaNameAdjustmentMode;
import io.debezium.config.Configuration;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.util.Testing;
public class MySqlSchemaNameAdjustmentModeIT extends AbstractConnectorTest {
private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-json.txt")
.toAbsolutePath();
private final UniqueDatabase DATABASE = new UniqueDatabase("adjustment1", "schema_name_adjustment")
.withDbHistoryPath(DB_HISTORY_PATH);
@Before
public void beforeEach() throws SQLException {
stopConnector();
DATABASE.createAndInitialize();
initializeConnectorTestFramework();
Testing.Files.delete(DB_HISTORY_PATH);
}
@After
public void afterEach() {
try {
stopConnector();
}
finally {
Testing.Files.delete(DB_HISTORY_PATH);
}
}
@Test
public void shouldAdjustNamesForAvro() throws InterruptedException {
Struct data = consume(SchemaNameAdjustmentMode.AVRO);
assertThat(data.schema().name()).contains("name_adjustment");
}
@Test
public void shouldNotAdjustNames() throws InterruptedException {
assumeFalse(MySqlConnector.LEGACY_IMPLEMENTATION.equals(System.getProperty(MySqlConnector.IMPLEMENTATION_PROP)));
skipAvroValidation();
Struct data = consume(SchemaNameAdjustmentMode.NONE);
assertThat(data.schema().name()).contains("name-adjustment");
}
private Struct consume(SchemaNameAdjustmentMode adjustmentMode) throws InterruptedException {
final Configuration config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL)
.with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("name-adjustment"))
.with(MySqlConnectorConfig.SCHEMA_NAME_ADJUSTMENT_MODE, adjustmentMode)
.build();
start(MySqlConnector.class, config);
SourceRecords records = consumeRecordsByTopic(6 + 1); // 6 DDL changes, 1 INSERT
final List<SourceRecord> results = records.recordsForTopic(DATABASE.topicForTable("name-adjustment"));
Assertions.assertThat(results).hasSize(1);
return (Struct) results.get(0).value();
}
}

View File

@ -0,0 +1,2 @@
CREATE TABLE `name-adjustment` (id INT);
INSERT INTO `name-adjustment` (id) VALUES (1);

View File

@ -509,6 +509,7 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
LOG_MINING_SCN_GAP_DETECTION_TIME_INTERVAL_MAX_MS,
UNAVAILABLE_VALUE_PLACEHOLDER,
BINARY_HANDLING_MODE,
SCHEMA_NAME_ADJUSTMENT_MODE,
LOG_MINING_LOG_QUERY_MAX_RETRIES,
LOG_MINING_LOG_BACKOFF_INITIAL_DELAY_MS,
LOG_MINING_LOG_BACKOFF_MAX_DELAY_MS)

View File

@ -50,7 +50,7 @@ public String version() {
public ChangeEventSourceCoordinator<OraclePartition, OracleOffsetContext> start(Configuration config) {
OracleConnectorConfig connectorConfig = new OracleConnectorConfig(config);
TopicSelector<TableId> topicSelector = OracleTopicSelector.defaultSelector(connectorConfig);
SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
SchemaNameAdjuster schemaNameAdjuster = connectorConfig.schemaNameAdjustmentMode().createAdjuster();
JdbcConfiguration jdbcConfig = connectorConfig.getJdbcConfig();
jdbcConnection = new OracleConnection(jdbcConfig, () -> getClass().getClassLoader());

View File

@ -290,7 +290,7 @@ private OracleDatabaseSchema createSchema(OracleConnectorConfig connectorConfig)
TableNameCaseSensitivity tableNameSensitivity = connectorConfig.getAdapter().getTableNameCaseSensitivity(connection);
TopicSelector<TableId> topicSelector = OracleTopicSelector.defaultSelector(connectorConfig);
SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
SchemaNameAdjuster schemaNameAdjuster = connectorConfig.schemaNameAdjustmentMode().createAdjuster();
return new OracleDatabaseSchema(connectorConfig, converters, defaultValueConverter, schemaNameAdjuster, topicSelector, tableNameSensitivity);
}

View File

@ -271,7 +271,7 @@ public void testAbandonTransactionHavingAnotherOne() throws Exception {
private OracleDatabaseSchema createOracleDatabaseSchema() throws Exception {
final OracleConnectorConfig connectorConfig = new OracleConnectorConfig(getConfig().build());
final TopicSelector<TableId> topicSelector = OracleTopicSelector.defaultSelector(connectorConfig);
final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
final SchemaNameAdjuster schemaNameAdjuster = connectorConfig.schemaNameAdjustmentMode().createAdjuster();
final OracleValueConverters converters = new OracleValueConverters(connectorConfig, connection);
final OracleDefaultValueConverter defaultValueConverter = new OracleDefaultValueConverter(converters, connection);
final TableNameCaseSensitivity sensitivity = connectorConfig.getAdapter().getTableNameCaseSensitivity(connection);

View File

@ -65,7 +65,7 @@ public class LogicalDecodingMessageMonitor {
private final Schema valueSchema;
public LogicalDecodingMessageMonitor(PostgresConnectorConfig connectorConfig, BlockingConsumer<SourceRecord> sender) {
this.schemaNameAdjuster = SchemaNameAdjuster.create();
this.schemaNameAdjuster = connectorConfig.schemaNameAdjustmentMode().createAdjuster();
this.sender = sender;
this.topicName = connectorConfig.getLogicalName() + LOGICAL_DECODING_MESSAGE_TOPIC_SUFFIX;
this.binaryMode = connectorConfig.binaryHandlingMode();

View File

@ -1274,6 +1274,7 @@ protected SourceInfoStructMaker<? extends AbstractSourceInfo> getSourceInfoStruc
SNAPSHOT_MODE_CLASS,
HSTORE_HANDLING_MODE,
BINARY_HANDLING_MODE,
SCHEMA_NAME_ADJUSTMENT_MODE,
INTERVAL_HANDLING_MODE,
SCHEMA_REFRESH_MODE,
TRUNCATE_HANDLING_MODE,

View File

@ -65,7 +65,7 @@ public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> st
final PostgresConnectorConfig connectorConfig = new PostgresConnectorConfig(config);
final TopicSelector<TableId> topicSelector = PostgresTopicSelector.create(connectorConfig);
final Snapshotter snapshotter = connectorConfig.getSnapshotter();
final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
final SchemaNameAdjuster schemaNameAdjuster = connectorConfig.schemaNameAdjustmentMode().createAdjuster();
if (snapshotter == null) {
throw new ConnectException("Unable to load snapshotter, if using custom snapshot mode, double check your settings");
@ -183,7 +183,7 @@ public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> st
default:
break;
}
});
}, schemaNameAdjuster);
final PostgresEventDispatcher<TableId> dispatcher = new PostgresEventDispatcher<>(
connectorConfig,

View File

@ -29,7 +29,6 @@
import io.debezium.relational.TableSchemaBuilder;
import io.debezium.relational.Tables;
import io.debezium.schema.TopicSelector;
import io.debezium.util.SchemaNameAdjuster;
/**
* Component that records the schema information for the {@link PostgresConnector}. The schema information contains
@ -70,7 +69,7 @@ protected PostgresSchema(PostgresConnectorConfig config, TypeRegistry typeRegist
private static TableSchemaBuilder getTableSchemaBuilder(PostgresConnectorConfig config, PostgresValueConverter valueConverter,
PostgresDefaultValueConverter defaultValueConverter) {
return new TableSchemaBuilder(valueConverter, defaultValueConverter, SchemaNameAdjuster.create(),
return new TableSchemaBuilder(valueConverter, defaultValueConverter, config.schemaNameAdjustmentMode().createAdjuster(),
config.customConverterRegistry(), config.getSourceInfoStructMaker().schema(),
config.getSanitizeFieldNames(), false);
}

View File

@ -356,6 +356,7 @@ public static SnapshotIsolationMode parse(String value, String defaultValue) {
SOURCE_TIMESTAMP_MODE,
MAX_TRANSACTIONS_PER_ITERATION,
BINARY_HANDLING_MODE,
SCHEMA_NAME_ADJUSTMENT_MODE,
INCREMENTAL_SNAPSHOT_OPTION_RECOMPILE,
INCREMENTAL_SNAPSHOT_CHUNK_SIZE,
INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES)

View File

@ -66,7 +66,7 @@ public ChangeEventSourceCoordinator<SqlServerPartition, SqlServerOffsetContext>
final SqlServerConnectorConfig connectorConfig = new SqlServerConnectorConfig(config);
final TopicSelector<TableId> topicSelector = SqlServerTopicSelector.defaultSelector(connectorConfig);
final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
final SchemaNameAdjuster schemaNameAdjuster = connectorConfig.schemaNameAdjustmentMode().createAdjuster();
final SqlServerValueConverters valueConverters = new SqlServerValueConverters(connectorConfig.getDecimalMode(),
connectorConfig.getTemporalPrecisionMode(), connectorConfig.binaryHandlingMode());

View File

@ -209,7 +209,7 @@ public void shouldProperlyGetDefaultColumnValues() throws Exception {
TableSchemaBuilder tableSchemaBuilder = new TableSchemaBuilder(
new SqlServerValueConverters(JdbcValueConverters.DecimalMode.PRECISE, TemporalPrecisionMode.ADAPTIVE, null),
connection.getDefaultValueConverter(),
SchemaNameAdjuster.create(), new CustomConverterRegistry(null), SchemaBuilder.struct().build(), false, false);
SchemaNameAdjuster.NO_OP, new CustomConverterRegistry(null), SchemaBuilder.struct().build(), false, false);
assertColumnHasNotDefaultValue(table, "int_no_default_not_null");
assertColumnHasDefaultValue(table, "int_no_default", null, tableSchemaBuilder);
@ -379,7 +379,7 @@ public void shouldProperlyGetDefaultColumnNullValues() throws Exception {
TableSchemaBuilder tableSchemaBuilder = new TableSchemaBuilder(
new SqlServerValueConverters(JdbcValueConverters.DecimalMode.PRECISE, TemporalPrecisionMode.ADAPTIVE, null),
connection.getDefaultValueConverter(),
SchemaNameAdjuster.create(), new CustomConverterRegistry(null), SchemaBuilder.struct().build(), false, false);
SchemaNameAdjuster.NO_OP, new CustomConverterRegistry(null), SchemaBuilder.struct().build(), false, false);
assertColumnHasNotDefaultValue(table, "int_no_default_not_null");
assertColumnHasDefaultValue(table, "int_no_default", null, tableSchemaBuilder);

View File

@ -0,0 +1,84 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import static org.fest.assertions.Assertions.assertThat;
import java.sql.SQLException;
import java.util.List;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import io.debezium.config.CommonConnectorConfig.SchemaNameAdjustmentMode;
import io.debezium.config.Configuration;
import io.debezium.connector.sqlserver.util.TestHelper;
import io.debezium.embedded.AbstractConnectorTest;
public class SqlServerSchemaNameAdjustmentModeIT extends AbstractConnectorTest {
private SqlServerConnection connection;
@Before
public void before() throws SQLException {
TestHelper.createTestDatabase();
connection = TestHelper.testConnection();
connection.execute(
"CREATE TABLE [name-adjustment] (id INT)",
"INSERT INTO [name-adjustment] (id) VALUES (1);");
TestHelper.enableTableCdc(connection, "name-adjustment");
initializeConnectorTestFramework();
Files.delete(TestHelper.DB_HISTORY_PATH);
}
@After
public void after() throws SQLException {
stopConnector();
if (connection != null) {
connection.close();
}
}
@Test
public void shouldAdjustNamesForAvro() throws InterruptedException {
Struct data = consume(SchemaNameAdjustmentMode.AVRO);
assertThat(data.schema().name()).contains("name_adjustment");
}
@Test
public void shouldNotAdjustNames() throws InterruptedException {
skipAvroValidation();
Struct data = consume(SchemaNameAdjustmentMode.NONE);
assertThat(data.schema().name()).contains("name-adjustment");
}
private Struct consume(SchemaNameAdjustmentMode adjustmentMode) throws InterruptedException {
final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL)
.with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST, "dbo\\.name-adjustment")
.with(SqlServerConnectorConfig.SCHEMA_NAME_ADJUSTMENT_MODE, adjustmentMode)
.build();
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
TestHelper.waitForSnapshotToBeCompleted();
SourceRecords records = consumeRecordsByTopic(1);
final List<SourceRecord> results = records.recordsForTopic("server1.dbo.name-adjustment");
Assertions.assertThat(results).hasSize(1);
return (Struct) results.get(0).value();
}
}

View File

@ -39,6 +39,7 @@
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.converter.ConvertedField;
import io.debezium.spi.converter.CustomConverter;
import io.debezium.util.SchemaNameAdjuster;
import io.debezium.util.Strings;
/**
@ -242,6 +243,59 @@ public static BinaryHandlingMode parse(String value, String defaultValue) {
}
}
/**
* The set of predefined SchemaNameAdjustmentMode options
*/
public enum SchemaNameAdjustmentMode implements EnumeratedValue {
/**
* Do not adjust names
*/
NONE("none"),
/**
* Adjust names for compatibility with Avro
*/
AVRO("avro");
private final String value;
SchemaNameAdjustmentMode(String value) {
this.value = value;
}
@Override
public String getValue() {
return value;
}
public SchemaNameAdjuster createAdjuster() {
if (this == SchemaNameAdjustmentMode.AVRO) {
return SchemaNameAdjuster.create();
}
return SchemaNameAdjuster.NO_OP;
}
/**
* Determine if the supplied values is one of the predefined options
*
* @param value the configuration property value ; may not be null
* @return the matching option, or null if the match is not found
*/
public static SchemaNameAdjustmentMode parse(String value) {
if (value == null) {
return null;
}
value = value.trim();
for (SchemaNameAdjustmentMode option : SchemaNameAdjustmentMode.values()) {
if (option.getValue().equalsIgnoreCase(value)) {
return option;
}
}
return null;
}
}
private static final String CONFLUENT_AVRO_CONVERTER = "io.confluent.connect.avro.AvroConverter";
private static final String APICURIO_AVRO_CONVERTER = "io.apicurio.registry.utils.converter.AvroConverter";
@ -439,6 +493,16 @@ public static BinaryHandlingMode parse(String value, String defaultValue) {
+ "'base64' represents binary data as base64-encoded string"
+ "'hex' represents binary data as hex-encoded (base16) string");
public static final Field SCHEMA_NAME_ADJUSTMENT_MODE = Field.create("schema.name.adjustment.mode")
.withDisplayName("Schema Name Adjustment")
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 7))
.withEnum(SchemaNameAdjustmentMode.class, SchemaNameAdjustmentMode.AVRO)
.withWidth(Width.MEDIUM)
.withImportance(Importance.LOW)
.withDescription("Specify how schema names should be adjusted for compatibility with the message converter used by the connector, including:"
+ "'avro' replaces the characters that cannot be used in the Avro type name with underscore (default)"
+ "'none' does not apply any adjustment");
public static final Field QUERY_FETCH_SIZE = Field.create("query.fetch.size")
.withDisplayName("Query fetch size")
.withType(Type.INT)
@ -536,6 +600,7 @@ public static BinaryHandlingMode parse(String value, String defaultValue) {
private final EventProcessingFailureHandlingMode eventProcessingFailureHandlingMode;
private final CustomConverterRegistry customConverterRegistry;
private final BinaryHandlingMode binaryHandlingMode;
private final SchemaNameAdjustmentMode schemaNameAdjustmentMode;
private final String signalingDataCollection;
private final EnumSet<Operation> skippedOperations;
private final String transactionTopic;
@ -558,6 +623,7 @@ protected CommonConnectorConfig(Configuration config, String logicalName, int de
this.queryFetchSize = config.getInteger(QUERY_FETCH_SIZE);
this.incrementalSnapshotChunkSize = config.getInteger(INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
this.incrementalSnapshotAllowSchemaChanges = config.getBoolean(INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES);
this.schemaNameAdjustmentMode = SchemaNameAdjustmentMode.parse(config.getString(SCHEMA_NAME_ADJUSTMENT_MODE));
this.sourceInfoStructMaker = getSourceInfoStructMaker(Version.parse(config.getString(SOURCE_STRUCT_MAKER_VERSION)));
this.sanitizeFieldNames = config.getBoolean(SANITIZE_FIELD_NAMES) || isUsingAvroConverter(config);
this.shouldProvideTransactionMetadata = config.getBoolean(PROVIDE_TRANSACTION_METADATA);
@ -849,6 +915,10 @@ public BinaryHandlingMode binaryHandlingMode() {
return binaryHandlingMode;
}
public SchemaNameAdjustmentMode schemaNameAdjustmentMode() {
return schemaNameAdjustmentMode;
}
public String getSignalingDataCollectionId() {
return signalingDataCollection;
}

View File

@ -17,6 +17,7 @@
import io.debezium.config.Field;
import io.debezium.function.BlockingConsumer;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.util.SchemaNameAdjuster;
/**
* Implementation of the heartbeat feature that allows for a DB query to be executed with every heartbeat.
@ -39,8 +40,8 @@ public class DatabaseHeartbeatImpl extends HeartbeatImpl {
private final HeartbeatErrorHandler errorHandler;
DatabaseHeartbeatImpl(Duration heartbeatInterval, String topicName, String key, JdbcConnection jdbcConnection, String heartBeatActionQuery,
HeartbeatErrorHandler errorHandler) {
super(heartbeatInterval, topicName, key);
HeartbeatErrorHandler errorHandler, SchemaNameAdjuster schemaNameAdjuster) {
super(heartbeatInterval, topicName, key, schemaNameAdjuster);
this.heartBeatActionQuery = heartBeatActionQuery;
this.jdbcConnection = jdbcConnection;

View File

@ -16,6 +16,7 @@
import io.debezium.config.Field;
import io.debezium.function.BlockingConsumer;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.util.SchemaNameAdjuster;
/**
* A class that is able to generate periodic heartbeat messages based on a pre-configured interval. The clients are
@ -128,20 +129,20 @@ public boolean isEnabled() {
* @param topicName topic to which the heartbeat messages will be sent
* @param key kafka partition key to use for the heartbeat message
*/
static Heartbeat create(Duration heartbeatInterval, String topicName, String key) {
return heartbeatInterval.isZero() ? DEFAULT_NOOP_HEARTBEAT : new HeartbeatImpl(heartbeatInterval, topicName, key);
static Heartbeat create(Duration heartbeatInterval, String topicName, String key, SchemaNameAdjuster schemaNameAdjuster) {
return heartbeatInterval.isZero() ? DEFAULT_NOOP_HEARTBEAT : new HeartbeatImpl(heartbeatInterval, topicName, key, schemaNameAdjuster);
}
static Heartbeat create(Duration heartbeatInterval, String heartbeatQuery, String topicName, String key, JdbcConnection jdbcConnection,
HeartbeatErrorHandler errorHandler) {
HeartbeatErrorHandler errorHandler, SchemaNameAdjuster schemaNameAdjuster) {
if (heartbeatInterval.isZero()) {
return DEFAULT_NOOP_HEARTBEAT;
}
if (heartbeatQuery != null) {
return new DatabaseHeartbeatImpl(heartbeatInterval, topicName, key, jdbcConnection, heartbeatQuery, errorHandler);
return new DatabaseHeartbeatImpl(heartbeatInterval, topicName, key, jdbcConnection, heartbeatQuery, errorHandler, schemaNameAdjuster);
}
return new HeartbeatImpl(heartbeatInterval, topicName, key);
return new HeartbeatImpl(heartbeatInterval, topicName, key, schemaNameAdjuster);
}
}

View File

@ -30,7 +30,6 @@
class HeartbeatImpl implements Heartbeat {
private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatImpl.class);
private static final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
/**
* Default length of interval in which connector generates periodically
@ -45,25 +44,30 @@ class HeartbeatImpl implements Heartbeat {
private static final String SERVER_NAME_KEY = "serverName";
private static final Schema KEY_SCHEMA = SchemaBuilder.struct()
.name(schemaNameAdjuster.adjust("io.debezium.connector.common.ServerNameKey"))
.field(SERVER_NAME_KEY, Schema.STRING_SCHEMA)
.build();
private static final Schema VALUE_SCHEMA = SchemaBuilder.struct()
.name(schemaNameAdjuster.adjust("io.debezium.connector.common.Heartbeat"))
.field(AbstractSourceInfo.TIMESTAMP_KEY, Schema.INT64_SCHEMA)
.build();
private final String topicName;
private final Duration heartbeatInterval;
private final String key;
private final Schema keySchema;
private final Schema valueSchema;
private volatile Timer heartbeatTimeout;
HeartbeatImpl(Duration heartbeatInterval, String topicName, String key) {
HeartbeatImpl(Duration heartbeatInterval, String topicName, String key, SchemaNameAdjuster schemaNameAdjuster) {
this.topicName = topicName;
this.key = key;
this.heartbeatInterval = heartbeatInterval;
keySchema = SchemaBuilder.struct()
.name(schemaNameAdjuster.adjust("io.debezium.connector.common.ServerNameKey"))
.field(SERVER_NAME_KEY, Schema.STRING_SCHEMA)
.build();
valueSchema = SchemaBuilder.struct()
.name(schemaNameAdjuster.adjust("io.debezium.connector.common.Heartbeat"))
.field(AbstractSourceInfo.TIMESTAMP_KEY, Schema.INT64_SCHEMA)
.build();
heartbeatTimeout = resetHeartbeat();
}
@ -104,7 +108,7 @@ public boolean isEnabled() {
*
*/
private Struct serverNameKey(String serverName) {
Struct result = new Struct(KEY_SCHEMA);
Struct result = new Struct(keySchema);
result.put(SERVER_NAME_KEY, serverName);
return result;
}
@ -114,7 +118,7 @@ private Struct serverNameKey(String serverName) {
*
*/
private Struct messageValue() {
Struct result = new Struct(VALUE_SCHEMA);
Struct result = new Struct(valueSchema);
result.put(AbstractSourceInfo.TIMESTAMP_KEY, Instant.now().toEpochMilli());
return result;
}
@ -127,7 +131,7 @@ private SourceRecord heartbeatRecord(Map<String, ?> sourcePartition, Map<String,
final Integer partition = 0;
return new SourceRecord(sourcePartition, sourceOffset,
topicName, partition, KEY_SCHEMA, serverNameKey(key), VALUE_SCHEMA, messageValue());
topicName, partition, keySchema, serverNameKey(key), valueSchema, messageValue());
}
private Timer resetHeartbeat() {

View File

@ -130,7 +130,8 @@ public EventDispatcher(CommonConnectorConfig connectorConfig, TopicSelector<T> t
heartbeat = Heartbeat.create(
connectorConfig.getHeartbeatInterval(),
topicSelector.getHeartbeatTopic(),
connectorConfig.getLogicalName());
connectorConfig.getLogicalName(),
schemaNameAdjuster);
}
schemaChangeKeySchema = SchemaBuilder.struct()

View File

@ -123,7 +123,9 @@ default ReplacementOccurred andThen(ReplacementOccurred next) {
}
}
public static final SchemaNameAdjuster DEFAULT = create();
SchemaNameAdjuster NO_OP = proposedName -> proposedName;
SchemaNameAdjuster AVRO = create();
/**
* Create a stateful Avro fullname adjuster that logs a warning the first time an invalid fullname is seen and replaced
@ -132,16 +134,14 @@ default ReplacementOccurred andThen(ReplacementOccurred next) {
*
* @return the validator; never null
*/
public static SchemaNameAdjuster defaultAdjuster() {
return DEFAULT;
public static SchemaNameAdjuster avroAdjuster() {
return AVRO;
}
/**
* Create a stateful Avro fullname adjuster that logs a warning the first time an invalid fullname is seen and replaced
* with a valid fullname, and throws an error if the replacement conflicts with that of a different original. This method
* replaces all invalid characters with the underscore character ('_').
*
* @param logger the logger to use; may not be null * @return the validator; never null
*/
public static SchemaNameAdjuster create() {
return create((original, replacement, conflict) -> {

View File

@ -790,14 +790,16 @@ public static void isValid(SourceRecord record, boolean ignoreAvro) {
msg = "comparing value to its schema";
schemaMatchesStruct(valueWithSchema);
// Introduced due to https://github.com/confluentinc/schema-registry/issues/1693
// and https://issues.redhat.com/browse/DBZ-3535
if (ignoreAvro) {
return;
}
// Make sure the schema names are valid Avro schema names ...
validateSchemaNames(record.keySchema());
validateSchemaNames(record.valueSchema());
// Introduced due to https://github.com/confluentinc/schema-registry/issues/1693
if (ignoreAvro) {
return;
}
// Serialize and deserialize the key using the Avro converter, and check that we got the same result ...
msg = "serializing key using Avro converter";
byte[] avroKeyBytes = avroValueConverter.fromConnectData(record.topic(), record.keySchema(), record.key());

View File

@ -2155,6 +2155,13 @@ The following example sets the message key for the tables `inventory.customers`
+
In the preceding example, the columns `pk1` and `pk2` are specified as the message key for the table `inventory.customer`.
For `purchaseorders` tables in any schema, the columns `pk3` and `pk4` serve as the message key.
|[[db2-property-schema-name-adjustment-mode]]<<db2-property-schema-name-adjustment-mode,`+schema.name.adjustment.mode+`>>
|avro
|Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. Possible settings: +
* `avro` replaces the characters that cannot be used in the Avro type name with underscore. +
* `none` does not apply any adjustment. +
|===
[id="db2-advanced-configuration-properties"]

View File

@ -1488,6 +1488,13 @@ Can be used to avoid snapshot interruptions when starting multiple connectors in
The connector will read the collection contents in multiple batches of this size. +
Defaults to 0, which indicates that the server chooses an appropriate fetch size.
|[[mongodb-property-schema-name-adjustment-mode]]<<mongodb-property-schema-name-adjustment-mode,`+schema.name.adjustment.mode+`>>
|avro
|Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. Possible settings: +
* `avro` replaces the characters that cannot be used in the Avro type name with underscore. +
* `none` does not apply any adjustment. +
|===
The following _advanced_ configuration properties have good defaults that will work in most situations and therefore rarely need to be specified in the connector's configuration.

View File

@ -2615,6 +2615,13 @@ However, it's best to use the minimum number that are required to specify a uniq
+
`hex` represents binary data as a hex-encoded (base16) String.
|[[mysql-property-schema-name-adjustment-mode]]<<mysql-property-schema-name-adjustment-mode,`+schema.name.adjustment.mode+`>>
|avro
|Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. Possible settings: +
* `avro` replaces the characters that cannot be used in the Avro type name with underscore. +
* `none` does not apply any adjustment. +
|===
[id="mysql-advanced-connector-configuration-properties"]

View File

@ -2589,6 +2589,13 @@ Hashing strategy version 2 should be used to ensure fidelity if the value is bei
|bytes
|Specifies how binary (`blob`) columns should be represented in change events, including: `bytes` represents binary data as byte array (default), `base64` represents binary data as base64-encoded String, `hex` represents binary data as hex-encoded (base16) String
|[[oracle-property-schema-name-adjustment-mode]]<<oracle-property-schema-name-adjustment-mode,`+schema.name.adjustment.mode+`>>
|avro
|Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. Possible settings: +
* `avro` replaces the characters that cannot be used in the Avro type name with underscore. +
* `none` does not apply any adjustment. +
|[[oracle-property-decimal-handling-mode]]<<oracle-property-decimal-handling-mode, `+decimal.handling.mode+`>>
|`precise`
| Specifies how the connector should handle floating point values for `NUMBER`, `DECIMAL` and `NUMERIC` columns.

View File

@ -2876,6 +2876,13 @@ However, it's best to use the minimum number that are required to specify a uniq
+
`hex` represents binary data as hex-encoded (base16) strings.
|[[postgresql-property-schema-name-adjustment-mode]]<<postgresql-property-schema-name-adjustment-mode,`+schema.name.adjustment.mode+`>>
|avro
|Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. Possible settings: +
* `avro` replaces the characters that cannot be used in the Avro type name with underscore. +
* `none` does not apply any adjustment. +
|[[postgresql-property-truncate-handling-mode]]<<postgresql-property-truncate-handling-mode, `+truncate.handling.mode+`>>
|skip
|Specifies how whether `TRUNCATE` events should be propagated or not (only available when using the `pgoutput` plug-in with Postgres 11 or later): +

View File

@ -2230,6 +2230,14 @@ However, it's best to use the minimum number that are required to specify a uniq
|[[sqlserver-property-binary-handling-mode]]<<sqlserver-property-binary-handling-mode, `+binary.handling.mode+`>>
|bytes
|Specifies how binary (`binary`, `varbinary`) columns should be represented in change events, including: `bytes` represents binary data as byte array (default), `base64` represents binary data as base64-encoded String, `hex` represents binary data as hex-encoded (base16) String
|[[sqlserver-property-schema-name-adjustment-mode]]<<sqlserver-property-schema-name-adjustment-mode,`+schema.name.adjustment.mode+`>>
|avro
|Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. Possible settings: +
* `avro` replaces the characters that cannot be used in the Avro type name with underscore. +
* `none` does not apply any adjustment. +
|===
[id="sqlserver-advanced-connector-configuration-properties"]

View File

@ -1210,6 +1210,13 @@ For example, +
`keyspaceA.table_a:regex_1;keyspaceA.table_b:regex_2;keyspaceA.table_c:regex_3` +
+
If `table_a` has a an `id` column, and `regex_1` is `^i` (matches any column that starts with `i`), the connector maps the value in ``table_a``'s `id` column to a key field in change events that the connector sends to Kafka.
|[[vitess-property-schema-name-adjustment-mode]]<<vitess-property-schema-name-adjustment-mode,`+schema.name.adjustment.mode+`>>
|avro
|Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. Possible settings: +
* `avro` replaces the characters that cannot be used in the Avro type name with underscore. +
* `none` does not apply any adjustment. +
|===
[id="vitess-advanced-configuration-properties"]