DBZ-6197 Remove duplicated createDdlFilter method from historized connector config

This commit is contained in:
harveyyue 2023-03-10 21:29:30 +08:00 committed by Jiri Pechanec
parent 2f7c60931b
commit 1877906dd8
8 changed files with 61 additions and 19 deletions

View File

@ -573,7 +573,7 @@ protected void handleQueryEvent(MySqlPartition partition, MySqlOffsetContext off
// This is an XA transaction, and we currently ignore these and do nothing ... // This is an XA transaction, and we currently ignore these and do nothing ...
return; return;
} }
if (connectorConfig.getDdlFilter().test(sql)) { if (taskContext.getSchema().ddlFilter().test(sql)) {
LOGGER.debug("DDL '{}' was filtered out of processing", sql); LOGGER.debug("DDL '{}' was filtered out of processing", sql);
return; return;
} }

View File

@ -35,6 +35,7 @@
import io.debezium.connector.mysql.SourceInfo; import io.debezium.connector.mysql.SourceInfo;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.doc.FixFor; import io.debezium.doc.FixFor;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.kafka.KafkaCluster; import io.debezium.kafka.KafkaCluster;
import io.debezium.pipeline.spi.Offsets; import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition; import io.debezium.pipeline.spi.Partition;
@ -56,7 +57,7 @@ public class KafkaSchemaHistoryTest {
private KafkaSchemaHistory history; private KafkaSchemaHistory history;
private Offsets<Partition, MySqlOffsetContext> offsets; private Offsets<Partition, MySqlOffsetContext> offsets;
private MySqlOffsetContext position; private MySqlOffsetContext position;
private LogInterceptor interceptor;
private static final int PARTITION_NO = 0; private static final int PARTITION_NO = 0;
@BeforeClass @BeforeClass
@ -119,6 +120,7 @@ public void shouldStartWithEmptyTopicAndStoreDataAndRecoverAllState() throws Exc
} }
private void testHistoryTopicContent(String topicName, boolean skipUnparseableDDL) { private void testHistoryTopicContent(String topicName, boolean skipUnparseableDDL) {
interceptor = new LogInterceptor(KafkaSchemaHistory.class);
// Start up the history ... // Start up the history ...
Configuration config = Configuration.create() Configuration config = Configuration.create()
.with(KafkaSchemaHistory.BOOTSTRAP_SERVERS, kafka.brokerList()) .with(KafkaSchemaHistory.BOOTSTRAP_SERVERS, kafka.brokerList())
@ -135,6 +137,7 @@ private void testHistoryTopicContent(String topicName, boolean skipUnparseableDD
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
50000) 50000)
.with(KafkaSchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, skipUnparseableDDL) .with(KafkaSchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, skipUnparseableDDL)
.with(KafkaSchemaHistory.DDL_FILTER, "CREATE\\s+ROLE.*")
.with(KafkaSchemaHistory.INTERNAL_CONNECTOR_CLASS, "org.apache.kafka.connect.source.SourceConnector") .with(KafkaSchemaHistory.INTERNAL_CONNECTOR_CLASS, "org.apache.kafka.connect.source.SourceConnector")
.with(KafkaSchemaHistory.INTERNAL_CONNECTOR_ID, "dbz-test") .with(KafkaSchemaHistory.INTERNAL_CONNECTOR_ID, "dbz-test")
.build(); .build();
@ -296,6 +299,34 @@ public void shouldStopOnUnparseableSQL() throws Exception {
testHistoryTopicContent(topicName, false); testHistoryTopicContent(topicName, false);
} }
@Test
public void shouldSkipMessageOnDDLFilter() throws Exception {
String topicName = "stop-on-ddlfilter-schema-changes";
// Create the empty topic ...
kafka.createTopic(topicName, 1, 1);
// Create invalid records
final ProducerRecord<String, String> invalidSQL = new ProducerRecord<>(topicName, PARTITION_NO, null,
"{\"source\":{\"server\":\"my-server\"},\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"create role if not exists 'RL_COMPLIANCE_NSA';\"}");
final Configuration intruderConfig = Configuration.create()
.withDefault(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.brokerList())
.withDefault(ProducerConfig.CLIENT_ID_CONFIG, "intruder")
.withDefault(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.withDefault(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.build();
try (KafkaProducer<String, String> producer = new KafkaProducer<>(intruderConfig.asProperties())) {
producer.send(invalidSQL).get();
}
testHistoryTopicContent(topicName, true);
boolean result = interceptor
.containsMessage("a DDL 'create role if not exists 'RL_COMPLIANCE_NSA';' was filtered out of processing by regular expression 'CREATE\\s+ROLE.*");
assertThat(result).isTrue();
}
@Test @Test
public void testExists() { public void testExists() {
String topicName = "exists-schema-changes"; String topicName = "exists-schema-changes";

View File

@ -32,6 +32,7 @@
import java.util.TimeZone; import java.util.TimeZone;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.Config;
import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Field;
@ -2909,5 +2910,10 @@ public boolean storeOnlyCapturedTables() {
public boolean skipUnparseableDdlStatements() { public boolean skipUnparseableDdlStatements() {
return false; return false;
} }
@Override
public Predicate<String> ddlFilter() {
return ddl -> false;
}
} }
} }

View File

@ -5,9 +5,6 @@
*/ */
package io.debezium.relational; package io.debezium.relational;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width; import org.apache.kafka.common.config.ConfigDef.Width;
@ -17,7 +14,6 @@
import io.debezium.config.ConfigDefinition; import io.debezium.config.ConfigDefinition;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.config.Field; import io.debezium.config.Field;
import io.debezium.function.Predicates;
import io.debezium.relational.Selectors.TableIdToStringMapper; import io.debezium.relational.Selectors.TableIdToStringMapper;
import io.debezium.relational.Tables.TableFilter; import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.history.AbstractSchemaHistory; import io.debezium.relational.history.AbstractSchemaHistory;
@ -39,7 +35,6 @@ public abstract class HistorizedRelationalDatabaseConnectorConfig extends Relati
private boolean useCatalogBeforeSchema; private boolean useCatalogBeforeSchema;
private final Class<? extends SourceConnector> connectorClass; private final Class<? extends SourceConnector> connectorClass;
private final boolean multiPartitionMode; private final boolean multiPartitionMode;
private final Predicate<String> ddlFilter;
/** /**
* The database schema history class is hidden in the {@link #configDef()} since that is designed to work with a user interface, * The database schema history class is hidden in the {@link #configDef()} since that is designed to work with a user interface,
@ -74,7 +69,6 @@ protected HistorizedRelationalDatabaseConnectorConfig(Class<? extends SourceConn
this.useCatalogBeforeSchema = useCatalogBeforeSchema; this.useCatalogBeforeSchema = useCatalogBeforeSchema;
this.connectorClass = connectorClass; this.connectorClass = connectorClass;
this.multiPartitionMode = multiPartitionMode; this.multiPartitionMode = multiPartitionMode;
this.ddlFilter = createDdlFilter(config);
} }
protected HistorizedRelationalDatabaseConnectorConfig(Class<? extends SourceConnector> connectorClass, protected HistorizedRelationalDatabaseConnectorConfig(Class<? extends SourceConnector> connectorClass,
@ -88,7 +82,6 @@ protected HistorizedRelationalDatabaseConnectorConfig(Class<? extends SourceConn
this.useCatalogBeforeSchema = useCatalogBeforeSchema; this.useCatalogBeforeSchema = useCatalogBeforeSchema;
this.connectorClass = connectorClass; this.connectorClass = connectorClass;
this.multiPartitionMode = multiPartitionMode; this.multiPartitionMode = multiPartitionMode;
this.ddlFilter = createDdlFilter(config);
} }
/** /**
@ -127,16 +120,6 @@ public boolean multiPartitionMode() {
return multiPartitionMode; return multiPartitionMode;
} }
public Predicate<String> getDdlFilter() {
return ddlFilter;
}
private Predicate<String> createDdlFilter(Configuration config) {
// Set up the DDL filter
final String ddlFilter = config.getString(SchemaHistory.DDL_FILTER);
return (ddlFilter != null) ? Predicates.includes(ddlFilter, Pattern.CASE_INSENSITIVE | Pattern.DOTALL) : (x -> false);
}
/** /**
* Returns a comparator to be used when recovering records from the schema history, making sure no history entries * Returns a comparator to be used when recovering records from the schema history, making sure no history entries
* newer than the offset we resume from are recovered (which could happen when restarting a connector after history * newer than the offset we resume from are recovered (which could happen when restarting a connector after history

View File

@ -6,6 +6,7 @@
package io.debezium.relational; package io.debezium.relational;
import java.util.Objects; import java.util.Objects;
import java.util.function.Predicate;
import io.debezium.DebeziumException; import io.debezium.DebeziumException;
import io.debezium.pipeline.spi.Offsets; import io.debezium.pipeline.spi.Offsets;
@ -112,10 +113,16 @@ public boolean storeOnlyCapturedTables() {
return schemaHistory.storeOnlyCapturedTables(); return schemaHistory.storeOnlyCapturedTables();
} }
@Override
public boolean skipUnparseableDdlStatements() { public boolean skipUnparseableDdlStatements() {
return schemaHistory.skipUnparseableDdlStatements(); return schemaHistory.skipUnparseableDdlStatements();
} }
@Override
public Predicate<String> ddlFilter() {
return schemaHistory.ddlFilter();
}
@Override @Override
public boolean isHistorized() { public boolean isHistorized() {
return true; return true;

View File

@ -213,4 +213,9 @@ public boolean storeOnlyCapturedTables() {
public boolean skipUnparseableDdlStatements() { public boolean skipUnparseableDdlStatements() {
return skipUnparseableDDL; return skipUnparseableDDL;
} }
@Override
public Predicate<String> ddlFilter() {
return ddlFilter;
}
} }

View File

@ -10,6 +10,7 @@
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.function.Predicate;
import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigDef.Type;
@ -183,4 +184,6 @@ default void recover(Offsets<?, ?> offsets, Tables schema, DdlParser ddlParser)
boolean storeOnlyCapturedTables(); boolean storeOnlyCapturedTables();
boolean skipUnparseableDdlStatements(); boolean skipUnparseableDdlStatements();
Predicate<String> ddlFilter();
} }

View File

@ -6,6 +6,7 @@
package io.debezium.schema; package io.debezium.schema;
import java.util.Collection; import java.util.Collection;
import java.util.function.Predicate;
import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets; import io.debezium.pipeline.spi.Offsets;
@ -46,4 +47,10 @@ default void recover(Partition partition, OffsetContext offset) {
default boolean storeOnlyCapturedTables() { default boolean storeOnlyCapturedTables() {
return false; return false;
} }
default boolean skipUnparseableDdlStatements() {
return false;
}
Predicate<String> ddlFilter();
} }