DBZ-6070 DDL events not stored in schema history topic for excluded tables

This commit is contained in:
harveyyue 2023-02-09 10:32:51 +08:00 committed by Jiri Pechanec
parent 39ee498e64
commit c525f34cd0
16 changed files with 202 additions and 128 deletions

View File

@ -858,13 +858,17 @@ public static SecureConnectionMode parse(String value, String defaultValue) {
.withImportance(Importance.LOW)
.withDescription("Switched connector to use alternative methods to deliver signals to Debezium instead of writing to signaling table");
public static final Field STORE_ONLY_CAPTURED_DATABASES_DDL = HistorizedRelationalDatabaseConnectorConfig.STORE_ONLY_CAPTURED_DATABASES_DDL
.withDefault(true);
private static final ConfigDefinition CONFIG_DEFINITION = HistorizedRelationalDatabaseConnectorConfig.CONFIG_DEFINITION.edit()
.name("MySQL")
.excluding(
SCHEMA_INCLUDE_LIST,
SCHEMA_EXCLUDE_LIST,
RelationalDatabaseConnectorConfig.TIME_PRECISION_MODE,
RelationalDatabaseConnectorConfig.TABLE_IGNORE_BUILTIN)
RelationalDatabaseConnectorConfig.TABLE_IGNORE_BUILTIN,
HistorizedRelationalDatabaseConnectorConfig.STORE_ONLY_CAPTURED_DATABASES_DDL)
.type(
HOSTNAME,
PORT,
@ -893,7 +897,8 @@ public static SecureConnectionMode parse(String value, String defaultValue) {
SCHEMA_NAME_ADJUSTMENT_MODE,
ROW_COUNT_FOR_STREAMING_RESULT_SETS,
INCREMENTAL_SNAPSHOT_CHUNK_SIZE,
INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES)
INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES,
STORE_ONLY_CAPTURED_DATABASES_DDL)
.events(
INCLUDE_SQL_QUERY,
TABLE_IGNORE_BUILTIN,
@ -969,6 +974,8 @@ public MySqlConnectorConfig(Configuration config) {
final String gtidSetExcludes = config.getString(MySqlConnectorConfig.GTID_SOURCE_EXCLUDES);
this.gtidSourceFilter = gtidSetIncludes != null ? Predicates.includesUuids(gtidSetIncludes)
: (gtidSetExcludes != null ? Predicates.excludesUuids(gtidSetExcludes) : null);
this.storeOnlyCapturedDatabasesDdl = config.getBoolean(STORE_ONLY_CAPTURED_DATABASES_DDL);
}
public boolean useCursorFetch() {

View File

@ -180,7 +180,7 @@ public void applySchemaChange(SchemaChangeEvent schemaChange) {
// - all DDLs if configured
// - or global SET variables
// - or DDLs for monitored objects
if (!schemaHistory.storeOnlyCapturedTables() || isGlobalSetVariableStatement(schemaChange.getDdl(), schemaChange.getDatabase())
if (!storeOnlyCapturedTables() || isGlobalSetVariableStatement(schemaChange.getDdl(), schemaChange.getDatabase())
|| schemaChange.getTables().stream().map(Table::id).anyMatch(filters.dataCollectionFilter()::isIncluded)) {
LOGGER.debug("Recorded DDL statements for database '{}': {}", schemaChange.getDatabase(), schemaChange.getDdl());
record(schemaChange, schemaChange.getTableChanges());
@ -213,7 +213,7 @@ private List<SchemaChangeEvent> parseDdl(MySqlPartition partition, String ddlSta
this.ddlParser.parse(ddlStatements, tables());
}
catch (ParsingException | MultipleParsingExceptions e) {
if (schemaHistory.skipUnparseableDdlStatements()) {
if (skipUnparseableDdlStatements()) {
LOGGER.warn("Ignoring unparseable DDL statement '{}'", ddlStatements, e);
}
else {
@ -222,7 +222,7 @@ private List<SchemaChangeEvent> parseDdl(MySqlPartition partition, String ddlSta
}
// No need to send schema events or store DDL if no table has changed
if (!schemaHistory.storeOnlyCapturedTables() || isGlobalSetVariableStatement(ddlStatements, databaseName) || ddlChanges.anyMatch(filters)) {
if (!storeOnlyCapturedTables() || isGlobalSetVariableStatement(ddlStatements, databaseName) || ddlChanges.anyMatch(filters)) {
// We are supposed to _also_ record the schema changes as SourceRecords, but these need to be filtered
// by database. Unfortunately, the databaseName on the event might not be the same database as that
@ -351,11 +351,6 @@ public boolean historyExists() {
return schemaHistory.exists();
}
@Override
public boolean storeOnlyCapturedTables() {
return schemaHistory.storeOnlyCapturedTables();
}
/**
* Assign the given table number to the table with the specified {@link TableId table ID}.
*
@ -378,7 +373,7 @@ public boolean assignTableNumber(long tableNumber, TableId id) {
/**
* Return the table id associated with MySQL-specific table number.
*
* @param tableNumber
* @param tableNumber the table number from binlog
* @return the table id or null if not known
*/
public TableId getTableId(long tableNumber) {
@ -388,7 +383,7 @@ public TableId getTableId(long tableNumber) {
/**
* Return the excluded table id associated with MySQL-specific table number.
*
* @param tableNumber
* @param tableNumber the table number from binlog
* @return the table id or null if not known
*/
public TableId getExcludeTableId(long tableNumber) {
@ -415,8 +410,9 @@ public boolean isStorageInitializationExecuted() {
return storageInitializationExecuted;
}
@Override
public boolean skipSchemaChangeEvent(SchemaChangeEvent event) {
if (!Strings.isNullOrEmpty(event.getDatabase())
if (storeOnlyCapturedDatabases() && !Strings.isNullOrEmpty(event.getDatabase())
&& !connectorConfig.getTableFilters().databaseFilter().test(event.getDatabase())) {
LOGGER.debug("Skipping schema event as it belongs to a non-captured database: '{}'", event);
return true;

View File

@ -101,7 +101,7 @@ public void applySchemaChange(SchemaChangeEvent schemaChange) {
default:
}
if (!schemaHistory.storeOnlyCapturedTables() ||
if (!storeOnlyCapturedTables() ||
schemaChange.getTables().stream().map(Table::id).anyMatch(getTableFilter()::isIncluded)) {
LOGGER.debug("Recorded DDL statements for database '{}': {}", schemaChange.getDatabase(), schemaChange.getDdl());
record(schemaChange, schemaChange.getTableChanges());

View File

@ -119,7 +119,9 @@ public void emitSchemaChangeEvent(Receiver receiver) throws InterruptedException
});
for (SchemaChangeEvent event : changeEvents) {
receiver.schemaChangeEvent(event);
if (!schema.skipSchemaChangeEvent(event)) {
receiver.schemaChangeEvent(event);
}
}
}
}

View File

@ -21,14 +21,20 @@ public class SqlServerSchemaChangeEventEmitter implements SchemaChangeEventEmitt
private final SqlServerOffsetContext offsetContext;
private final SqlServerChangeTable changeTable;
private final Table tableSchema;
private final SqlServerDatabaseSchema schema;
private final SchemaChangeEventType eventType;
public SqlServerSchemaChangeEventEmitter(SqlServerPartition partition, SqlServerOffsetContext offsetContext, SqlServerChangeTable changeTable, Table tableSchema,
public SqlServerSchemaChangeEventEmitter(SqlServerPartition partition,
SqlServerOffsetContext offsetContext,
SqlServerChangeTable changeTable,
Table tableSchema,
SqlServerDatabaseSchema schema,
SchemaChangeEventType eventType) {
this.partition = partition;
this.offsetContext = offsetContext;
this.changeTable = changeTable;
this.tableSchema = tableSchema;
this.schema = schema;
this.eventType = eventType;
}
@ -44,6 +50,8 @@ public void emitSchemaChangeEvent(Receiver receiver) throws InterruptedException
tableSchema,
false);
receiver.schemaChangeEvent(event);
if (!schema.skipSchemaChangeEvent(event)) {
receiver.schemaChangeEvent(event);
}
}
}

View File

@ -358,7 +358,7 @@ private void migrateTable(SqlServerPartition partition, final Queue<SqlServerCha
return;
}
dispatcher.dispatchSchemaChangeEvent(partition, newTable.getSourceTableId(),
new SqlServerSchemaChangeEventEmitter(partition, offsetContext, newTable, tableSchema,
new SqlServerSchemaChangeEventEmitter(partition, offsetContext, newTable, tableSchema, schema,
SchemaChangeEventType.ALTER));
newTable.setSourceTable(tableSchema);
}
@ -435,6 +435,7 @@ private SqlServerChangeTable[] getChangeTablesToQuery(SqlServerPartition partiti
offsetContext,
currentTable,
dataConnection.getTableSchemaFromTable(databaseName, currentTable),
schema,
SchemaChangeEventType.CREATE));
}

View File

@ -32,7 +32,6 @@
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.connect.data.Field;
@ -2900,20 +2899,5 @@ public boolean storageExists() {
public void initializeStorage() {
delegate.initializeStorage();
}
@Override
public boolean storeOnlyCapturedTables() {
return false;
}
@Override
public boolean skipUnparseableDdlStatements() {
return false;
}
@Override
public Predicate<String> ddlFilter() {
return ddl -> false;
}
}
}

View File

@ -5,6 +5,9 @@
*/
package io.debezium.relational;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
@ -14,9 +17,9 @@
import io.debezium.config.ConfigDefinition;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.function.Predicates;
import io.debezium.relational.Selectors.TableIdToStringMapper;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.history.AbstractSchemaHistory;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.relational.history.SchemaHistoryMetrics;
@ -32,9 +35,13 @@ public abstract class HistorizedRelationalDatabaseConnectorConfig extends Relati
private static final String DEFAULT_SCHEMA_HISTORY = "io.debezium.storage.kafka.history.KafkaSchemaHistory";
private boolean useCatalogBeforeSchema;
private final boolean useCatalogBeforeSchema;
private final Class<? extends SourceConnector> connectorClass;
private final boolean multiPartitionMode;
private final Predicate<String> ddlFilter;
protected boolean skipUnparseableDDL;
protected boolean storeOnlyCapturedTablesDdl;
protected boolean storeOnlyCapturedDatabasesDdl;
/**
* The database schema history class is hidden in the {@link #configDef()} since that is designed to work with a user interface,
@ -51,11 +58,18 @@ public abstract class HistorizedRelationalDatabaseConnectorConfig extends Relati
+ SchemaHistory.CONFIGURATION_FIELD_PREFIX_STRING + "' string.")
.withDefault(DEFAULT_SCHEMA_HISTORY);
public static final Field SKIP_UNPARSEABLE_DDL_STATEMENTS = SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS;
public static final Field STORE_ONLY_CAPTURED_TABLES_DDL = SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL;
public static final Field STORE_ONLY_CAPTURED_DATABASES_DDL = SchemaHistory.STORE_ONLY_CAPTURED_DATABASES_DDL;
protected static final ConfigDefinition CONFIG_DEFINITION = RelationalDatabaseConnectorConfig.CONFIG_DEFINITION.edit()
.history(
SCHEMA_HISTORY,
SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS,
SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL)
SKIP_UNPARSEABLE_DDL_STATEMENTS,
STORE_ONLY_CAPTURED_TABLES_DDL,
STORE_ONLY_CAPTURED_DATABASES_DDL)
.create();
protected HistorizedRelationalDatabaseConnectorConfig(Class<? extends SourceConnector> connectorClass,
@ -65,10 +79,8 @@ protected HistorizedRelationalDatabaseConnectorConfig(Class<? extends SourceConn
int defaultSnapshotFetchSize,
ColumnFilterMode columnFilterMode,
boolean multiPartitionMode) {
super(config, systemTablesFilter, TableId::toString, defaultSnapshotFetchSize, columnFilterMode, useCatalogBeforeSchema);
this.useCatalogBeforeSchema = useCatalogBeforeSchema;
this.connectorClass = connectorClass;
this.multiPartitionMode = multiPartitionMode;
this(connectorClass, config, systemTablesFilter, TableId::toString, useCatalogBeforeSchema,
defaultSnapshotFetchSize, columnFilterMode, multiPartitionMode);
}
protected HistorizedRelationalDatabaseConnectorConfig(Class<? extends SourceConnector> connectorClass,
@ -78,10 +90,26 @@ protected HistorizedRelationalDatabaseConnectorConfig(Class<? extends SourceConn
boolean useCatalogBeforeSchema,
ColumnFilterMode columnFilterMode,
boolean multiPartitionMode) {
super(config, systemTablesFilter, tableIdMapper, DEFAULT_SNAPSHOT_FETCH_SIZE, columnFilterMode, useCatalogBeforeSchema);
this(connectorClass, config, systemTablesFilter, tableIdMapper, useCatalogBeforeSchema,
DEFAULT_SNAPSHOT_FETCH_SIZE, columnFilterMode, multiPartitionMode);
}
protected HistorizedRelationalDatabaseConnectorConfig(Class<? extends SourceConnector> connectorClass,
Configuration config,
TableFilter systemTablesFilter,
TableIdToStringMapper tableIdMapper,
boolean useCatalogBeforeSchema,
int defaultSnapshotFetchSize,
ColumnFilterMode columnFilterMode,
boolean multiPartitionMode) {
super(config, systemTablesFilter, tableIdMapper, defaultSnapshotFetchSize, columnFilterMode, useCatalogBeforeSchema);
this.useCatalogBeforeSchema = useCatalogBeforeSchema;
this.connectorClass = connectorClass;
this.multiPartitionMode = multiPartitionMode;
this.ddlFilter = createDdlFilter(config);
this.skipUnparseableDDL = config.getBoolean(SKIP_UNPARSEABLE_DDL_STATEMENTS);
this.storeOnlyCapturedTablesDdl = config.getBoolean(STORE_ONLY_CAPTURED_TABLES_DDL);
this.storeOnlyCapturedDatabasesDdl = config.getBoolean(STORE_ONLY_CAPTURED_DATABASES_DDL);
}
/**
@ -90,10 +118,10 @@ protected HistorizedRelationalDatabaseConnectorConfig(Class<? extends SourceConn
public SchemaHistory getSchemaHistory() {
Configuration config = getConfig();
SchemaHistory schemaHistory = config.getInstance(HistorizedRelationalDatabaseConnectorConfig.SCHEMA_HISTORY, SchemaHistory.class);
SchemaHistory schemaHistory = config.getInstance(SCHEMA_HISTORY, SchemaHistory.class);
if (schemaHistory == null) {
throw new ConnectException("Unable to instantiate the database schema history class " +
config.getString(HistorizedRelationalDatabaseConnectorConfig.SCHEMA_HISTORY));
config.getString(SCHEMA_HISTORY));
}
// Do not remove the prefix from the subset of config properties ...
@ -101,8 +129,8 @@ public SchemaHistory getSchemaHistory() {
.edit()
.with(config.subset(Field.INTERNAL_PREFIX + SchemaHistory.CONFIGURATION_FIELD_PREFIX_STRING, false))
.withDefault(SchemaHistory.NAME, getLogicalName() + "-schemahistory")
.withDefault(AbstractSchemaHistory.INTERNAL_CONNECTOR_CLASS, connectorClass.getName())
.withDefault(AbstractSchemaHistory.INTERNAL_CONNECTOR_ID, logicalName)
.withDefault(SchemaHistory.INTERNAL_CONNECTOR_CLASS, connectorClass.getName())
.withDefault(SchemaHistory.INTERNAL_CONNECTOR_ID, logicalName)
.build();
HistoryRecordComparator historyComparator = getHistoryRecordComparator();
@ -120,6 +148,28 @@ public boolean multiPartitionMode() {
return multiPartitionMode;
}
private Predicate<String> createDdlFilter(Configuration config) {
// Set up the DDL filter
final String ddlFilter = config.getString(SchemaHistory.DDL_FILTER);
return (ddlFilter != null) ? Predicates.includes(ddlFilter, Pattern.CASE_INSENSITIVE | Pattern.DOTALL) : (x -> false);
}
public Predicate<String> ddlFilter() {
return ddlFilter;
}
public boolean skipUnparseableDdlStatements() {
return skipUnparseableDDL;
}
public boolean storeOnlyCapturedTables() {
return storeOnlyCapturedTablesDdl;
}
public boolean storeOnlyCapturedDatabases() {
return storeOnlyCapturedDatabasesDdl;
}
/**
* Returns a comparator to be used when recovering records from the schema history, making sure no history entries
* newer than the offset we resume from are recovered (which could happen when restarting a connector after history

View File

@ -8,6 +8,9 @@
import java.util.Objects;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.relational.Key.KeyMapper;
@ -20,6 +23,7 @@
import io.debezium.schema.HistorizedDatabaseSchema;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.Strings;
/**
* A {@link DatabaseSchema} or a relational database which has a schema history, that can be recovered to the current
@ -31,7 +35,10 @@
public abstract class HistorizedRelationalDatabaseSchema extends RelationalDatabaseSchema
implements HistorizedDatabaseSchema<TableId> {
private final static Logger LOGGER = LoggerFactory.getLogger(HistorizedRelationalDatabaseSchema.class);
protected final SchemaHistory schemaHistory;
private final HistorizedRelationalDatabaseConnectorConfig historizedConnectorConfig;
private boolean recoveredTables;
protected HistorizedRelationalDatabaseSchema(HistorizedRelationalDatabaseConnectorConfig config, TopicNamingStrategy<TableId> topicNamingStrategy,
@ -41,6 +48,7 @@ protected HistorizedRelationalDatabaseSchema(HistorizedRelationalDatabaseConnect
this.schemaHistory = config.getSchemaHistory();
this.schemaHistory.start();
this.historizedConnectorConfig = config;
}
@Override
@ -110,21 +118,35 @@ public boolean tableInformationComplete() {
@Override
public boolean storeOnlyCapturedTables() {
return schemaHistory.storeOnlyCapturedTables();
return historizedConnectorConfig.storeOnlyCapturedTables();
}
@Override
public boolean storeOnlyCapturedDatabases() {
return historizedConnectorConfig.storeOnlyCapturedDatabases();
}
@Override
public boolean skipUnparseableDdlStatements() {
return schemaHistory.skipUnparseableDdlStatements();
return historizedConnectorConfig.skipUnparseableDdlStatements();
}
@Override
public Predicate<String> ddlFilter() {
return schemaHistory.ddlFilter();
return historizedConnectorConfig.ddlFilter();
}
@Override
public boolean isHistorized() {
return true;
}
public boolean skipSchemaChangeEvent(SchemaChangeEvent event) {
if (storeOnlyCapturedDatabases() && !Strings.isNullOrEmpty(event.getSchema())
&& !historizedConnectorConfig.getTableFilters().schemaFilter().test(event.getSchema())) {
LOGGER.debug("Skipping schema event as it belongs to a non-captured schema: '{}'", event);
return true;
}
return false;
}
}

View File

@ -350,9 +350,15 @@ protected void createSchemaChangeEventsForTables(ChangeEventSourceContext source
lastSnapshotRecord(snapshotContext);
}
SchemaChangeEvent event = getCreateTableEvent(snapshotContext, snapshotContext.tables.forTable(tableId));
if (HistorizedRelationalDatabaseSchema.class.isAssignableFrom(schema.getClass()) &&
((HistorizedRelationalDatabaseSchema) schema).skipSchemaChangeEvent(event)) {
continue;
}
dispatcher.dispatchSchemaChangeEvent(snapshotContext.partition, tableId, (receiver) -> {
try {
receiver.schemaChangeEvent(getCreateTableEvent(snapshotContext, snapshotContext.tables.forTable(tableId)));
receiver.schemaChangeEvent(event);
}
catch (Exception e) {
throw new DebeziumException(e);

View File

@ -18,12 +18,13 @@
public class RelationalTableFilters implements DataCollectionFilters {
// Filter that filters tables based only on datbase/schema/system table filters but not table filters
// Filter that filters tables based only on database/schema/system table filters but not table filters
// Represents the list of tables whose schema needs to be captured
private final TableFilter eligibleTableFilter;
// Filter that filters tables based on table filters
private final TableFilter tableFilter;
private final Predicate<String> databaseFilter;
private final Predicate<String> schemaFilter;
private final String excludeColumns;
/**
@ -77,6 +78,10 @@ public RelationalTableFilters(Configuration config, TableFilter systemTablesFilt
.includeDatabases(config.getString(RelationalDatabaseConnectorConfig.DATABASE_INCLUDE_LIST))
.excludeDatabases(config.getString(RelationalDatabaseConnectorConfig.DATABASE_EXCLUDE_LIST))
.build();
this.schemaFilter = Selectors.databaseSelector()
.includeDatabases(config.getString(RelationalDatabaseConnectorConfig.SCHEMA_INCLUDE_LIST))
.excludeDatabases(config.getString(RelationalDatabaseConnectorConfig.SCHEMA_EXCLUDE_LIST))
.build();
Predicate<TableId> eligibleSchemaPredicate = config.getBoolean(RelationalDatabaseConnectorConfig.TABLE_IGNORE_BUILTIN)
? systemTablesFilter::isIncluded
@ -106,6 +111,10 @@ public Predicate<String> databaseFilter() {
return databaseFilter;
}
public Predicate<String> schemaFilter() {
return schemaFilter;
}
public String getExcludeColumns() {
return excludeColumns;
}

View File

@ -12,9 +12,6 @@
import java.util.function.Predicate;
import java.util.regex.Pattern;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -27,6 +24,7 @@
import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.history.TableChanges.TableChange;
import io.debezium.relational.history.TableChanges.TableChangeType;
import io.debezium.relational.history.TableChanges.TableChangesSerializer;
import io.debezium.text.MultipleParsingExceptions;
import io.debezium.text.ParsingException;
import io.debezium.util.Clock;
@ -39,47 +37,16 @@ public abstract class AbstractSchemaHistory implements SchemaHistory {
protected final Logger logger = LoggerFactory.getLogger(getClass());
// Required for unified thread creation
public static final Field INTERNAL_CONNECTOR_CLASS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "connector.class")
.withDisplayName("Debezium connector class")
.withType(Type.STRING)
.withWidth(Width.LONG)
.withImportance(Importance.HIGH)
.withDescription("The class of the Debezium database connector")
.withNoValidation();
// Required for unified thread creation
public static final Field INTERNAL_CONNECTOR_ID = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "connector.id")
.withDisplayName("Debezium connector identifier")
.withType(Type.STRING)
.withWidth(Width.SHORT)
.withImportance(Importance.HIGH)
.withDescription("The unique identifier of the Debezium connector")
.withNoValidation();
// Temporary preference for DDL over logical schema due to DBZ-32
public static final Field INTERNAL_PREFER_DDL = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "prefer.ddl")
.withDisplayName("Prefer DDL for schema recovery")
.withType(Type.BOOLEAN)
.withDefault(false)
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDescription("Prefer DDL for schema recovery in case logical schema is present")
.withInvisibleRecommender()
.withNoValidation();
public static Field.Set ALL_FIELDS = Field.setOf(SchemaHistory.NAME, INTERNAL_CONNECTOR_CLASS,
INTERNAL_CONNECTOR_ID);
public static Field.Set ALL_FIELDS = Field.setOf(NAME, INTERNAL_CONNECTOR_CLASS, INTERNAL_CONNECTOR_ID);
protected Configuration config;
private HistoryRecordComparator comparator = HistoryRecordComparator.INSTANCE;
private boolean skipUnparseableDDL;
private boolean storeOnlyCapturedTablesDdl;
private Predicate<String> ddlFilter = x -> false;
private SchemaHistoryListener listener = SchemaHistoryListener.NOOP;
private boolean useCatalogBeforeSchema;
private boolean preferDdl = false;
private TableChanges.TableChangesSerializer<Array> tableChangesSerializer = new JsonTableChangeSerializer();
private final TableChangesSerializer<Array> tableChangesSerializer = new JsonTableChangeSerializer();
protected AbstractSchemaHistory() {
}
@ -88,10 +55,9 @@ protected AbstractSchemaHistory() {
public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) {
this.config = config;
this.comparator = comparator != null ? comparator : HistoryRecordComparator.INSTANCE;
this.skipUnparseableDDL = config.getBoolean(SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS);
this.storeOnlyCapturedTablesDdl = Boolean.valueOf(config.getString(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL));
this.skipUnparseableDDL = config.getBoolean(SKIP_UNPARSEABLE_DDL_STATEMENTS);
final String ddlFilter = config.getString(SchemaHistory.DDL_FILTER);
final String ddlFilter = config.getString(DDL_FILTER);
this.ddlFilter = (ddlFilter != null) ? Predicates.includes(ddlFilter, Pattern.CASE_INSENSITIVE | Pattern.DOTALL) : (x -> false);
this.listener = listener;
this.useCatalogBeforeSchema = useCatalogBeforeSchema;
@ -165,8 +131,8 @@ else if (ddl != null && ddlParser != null) {
ddlParser.setCurrentSchema(recovered.schemaName()); // may be null
}
if (ddlFilter.test(ddl)) {
logger.info("a DDL '{}' was filtered out of processing by regular expression '{}", ddl,
config.getString(SchemaHistory.DDL_FILTER));
logger.info("a DDL '{}' was filtered out of processing by regular expression '{}'", ddl,
config.getString(DDL_FILTER));
return;
}
try {
@ -176,7 +142,7 @@ else if (ddl != null && ddlParser != null) {
}
catch (final ParsingException | MultipleParsingExceptions e) {
if (skipUnparseableDDL) {
logger.warn("Ignoring unparseable statements '{}' stored in database schema history: {}", ddl, e);
logger.warn("Ignoring unparseable statements '{}' stored in database schema history", ddl, e);
}
else {
throw e;
@ -203,19 +169,4 @@ public void stop() {
@Override
public void initializeStorage() {
}
@Override
public boolean storeOnlyCapturedTables() {
return storeOnlyCapturedTablesDdl;
}
@Override
public boolean skipUnparseableDdlStatements() {
return skipUnparseableDDL;
}
@Override
public Predicate<String> ddlFilter() {
return ddlFilter;
}
}

View File

@ -10,7 +10,6 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.Predicate;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
@ -64,6 +63,16 @@ public interface SchemaHistory {
+ "then only DDL that manipulates a captured table will be stored.")
.withDefault(false);
Field STORE_ONLY_CAPTURED_DATABASES_DDL = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "store.only.captured.databases.ddl")
.withDisplayName("Store only DDL that modifies tables of databases that are captured based on include/exclude lists")
.withType(Type.BOOLEAN)
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDescription("Controls what DDL will Debezium store in database schema history. "
+ "By default (true) only DDL that manipulates a table from captured schema/database will be stored. "
+ "If set to false, then Debezium will store all incoming DDL statements.")
.withDefault(false);
Field DDL_FILTER = Field.createInternal(CONFIGURATION_FIELD_PREFIX_STRING + "ddl.filter")
.withDisplayName("DDL filter")
.withType(Type.STRING)
@ -86,6 +95,35 @@ public interface SchemaHistory {
+ "from processing and storing into schema history evolution.")
.withValidation(Field::isListOfRegex);
// Required for unified thread creation
Field INTERNAL_CONNECTOR_CLASS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "connector.class")
.withDisplayName("Debezium connector class")
.withType(Type.STRING)
.withWidth(Width.LONG)
.withImportance(Importance.HIGH)
.withDescription("The class of the Debezium database connector")
.withNoValidation();
// Required for unified thread creation
Field INTERNAL_CONNECTOR_ID = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "connector.id")
.withDisplayName("Debezium connector identifier")
.withType(Type.STRING)
.withWidth(Width.SHORT)
.withImportance(Importance.HIGH)
.withDescription("The unique identifier of the Debezium connector")
.withNoValidation();
// Temporary preference for DDL over logical schema due to DBZ-32
Field INTERNAL_PREFER_DDL = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "prefer.ddl")
.withDisplayName("Prefer DDL for schema recovery")
.withType(Type.BOOLEAN)
.withDefault(false)
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDescription("Prefer DDL for schema recovery in case logical schema is present")
.withInvisibleRecommender()
.withNoValidation();
/**
* Configure this instance.
*
@ -180,10 +218,4 @@ default void recover(Offsets<?, ?> offsets, Tables schema, DdlParser ddlParser)
* Called to initialize permanent storage of the history.
*/
void initializeStorage();
boolean storeOnlyCapturedTables();
boolean skipUnparseableDdlStatements();
Predicate<String> ddlFilter();
}

View File

@ -44,13 +44,11 @@ default void recover(Partition partition, OffsetContext offset) {
void initializeStorage();
default boolean storeOnlyCapturedTables() {
return false;
}
default boolean skipUnparseableDdlStatements() {
return false;
}
Predicate<String> ddlFilter();
boolean skipUnparseableDdlStatements();
boolean storeOnlyCapturedTables();
boolean storeOnlyCapturedDatabases();
}

View File

@ -167,9 +167,8 @@ public class KafkaSchemaHistory extends AbstractSchemaHistory {
.withDefault(Duration.ofSeconds(30).toMillis())
.withValidation(Field::isPositiveInteger);
public static Field.Set ALL_FIELDS = Field.setOf(TOPIC, BOOTSTRAP_SERVERS, SchemaHistory.NAME,
RECOVERY_POLL_INTERVAL_MS, RECOVERY_POLL_ATTEMPTS, INTERNAL_CONNECTOR_CLASS, INTERNAL_CONNECTOR_ID,
KAFKA_QUERY_TIMEOUT_MS);
public static Field.Set ALL_FIELDS = Field.setOf(TOPIC, BOOTSTRAP_SERVERS, NAME, RECOVERY_POLL_INTERVAL_MS,
RECOVERY_POLL_ATTEMPTS, INTERNAL_CONNECTOR_CLASS, INTERNAL_CONNECTOR_ID, KAFKA_QUERY_TIMEOUT_MS);
private static final String CONSUMER_PREFIX = CONFIGURATION_FIELD_PREFIX_STRING + "consumer.";
private static final String PRODUCER_PREFIX = CONFIGURATION_FIELD_PREFIX_STRING + "producer.";

View File

@ -38,11 +38,20 @@ Skipping should be used only with care as it can lead to data loss or mangling w
|[[{context}-property-database-history-store-only-captured-tables-ddl]]<<{context}-property-database-history-store-only-captured-tables-ddl, `+schema.history.internal.store.only.captured.tables.ddl+`>>
|`false`
|A Boolean value that specifies whether the connector should record all DDL statements +
|A Boolean value that specifies whether the connector should record all DDL statements from specified schema or database +
`true` records only those DDL statements that are relevant to tables whose changes are being captured by {prodname}. Set to `true` with care because missing data might become necessary if you change which tables have their changes captured. +
The safe default is `false`.
|[[{context}-property-database-history-store-only-captured-databases-ddl]]<<{context}-property-database-history-store-only-captured-databases-ddl, `+schema.history.internal.store.only.captured.databases.ddl+`>>
|`false`
|A Boolean value that specifies whether the connector should record all DDL statements +
`true` records only those DDL statements that are relevant to database/schema's tables whose changes are being captured by {prodname}. `false` will store all incoming DDL statements. +
NOTE: The default value is `true` for MySQL Connector +
|===
[id="{context}-pass-through-database-history-properties-for-configuring-producer-and-consumer-clients"]