From 6451c9325680722dd8d2f06e017700fcec580858 Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Tue, 21 Mar 2023 18:18:59 +0800 Subject: [PATCH] DBZ-6240 Add an STM to filter schema change event --- .../io/debezium/schema/SchemaFactory.java | 8 + .../transforms/SchemaChangeEventFilter.java | 96 +++++++++++ .../io/debezium/transforms/SmtManager.java | 11 ++ .../SchemaChangeEventFilterTest.java | 149 ++++++++++++++++++ .../schema-change-event-filter.adoc | 145 +++++++++++++++++ 5 files changed, 409 insertions(+) create mode 100644 debezium-core/src/main/java/io/debezium/transforms/SchemaChangeEventFilter.java create mode 100644 debezium-core/src/test/java/io/debezium/transforms/SchemaChangeEventFilterTest.java create mode 100644 documentation/modules/ROOT/pages/transformations/schema-change-event-filter.adoc diff --git a/debezium-core/src/main/java/io/debezium/schema/SchemaFactory.java b/debezium-core/src/main/java/io/debezium/schema/SchemaFactory.java index 53718303c..950598f9a 100644 --- a/debezium-core/src/main/java/io/debezium/schema/SchemaFactory.java +++ b/debezium-core/src/main/java/io/debezium/schema/SchemaFactory.java @@ -92,6 +92,14 @@ public static SchemaFactory get() { return schemaFactoryObject; } + public boolean isSchemaChangeSchema(Schema schema) { + if (schema != null && schema.name() != null) { + return schema.name().endsWith(SCHEMA_HISTORY_CONNECTOR_VALUE_SCHEMA_NAME_SUFFIX) || + schema.name().endsWith(SCHEMA_HISTORY_CONNECTOR_KEY_SCHEMA_NAME_SUFFIX); + } + return false; + } + public Schema heartbeatKeySchema(SchemaNameAdjuster adjuster) { return SchemaBuilder.struct() .name(adjuster.adjust(HEARTBEAT_KEY_SCHEMA_NAME)) diff --git a/debezium-core/src/main/java/io/debezium/transforms/SchemaChangeEventFilter.java b/debezium-core/src/main/java/io/debezium/transforms/SchemaChangeEventFilter.java new file mode 100644 index 000000000..ef8598b12 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/transforms/SchemaChangeEventFilter.java @@ -0,0 +1,96 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.transforms; + +import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.transforms.Transformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.config.Configuration; +import io.debezium.config.Field; +import io.debezium.relational.history.ConnectTableChangeSerializer; +import io.debezium.relational.history.HistoryRecord; +import io.debezium.schema.SchemaChangeEvent; + +/** + * This SMT to filter schema change event + * @param + */ +public class SchemaChangeEventFilter> implements Transformation { + private static final Logger LOGGER = LoggerFactory.getLogger(SchemaChangeEventFilter.class); + + private static final Field SCHEMA_CHANGE_EVENT_INCLUDE_LIST = Field.create("schema.change.event.include.list") + .withDisplayName("Schema change event include list") + .withType(ConfigDef.Type.STRING) + .withWidth(ConfigDef.Width.SHORT) + .withImportance(ConfigDef.Importance.LOW) + .withDescription( + "Support filtering during DDL synchronization") + .required(); + + private Set includeSchemaChangeEvents; + private SmtManager smtManager; + + @Override + public void configure(Map configs) { + final Configuration config = Configuration.from(configs); + smtManager = new SmtManager<>(config); + smtManager.validate(config, Field.setOf(SCHEMA_CHANGE_EVENT_INCLUDE_LIST)); + final String includeSchemaChangeEvents = config.getString(SCHEMA_CHANGE_EVENT_INCLUDE_LIST); + this.includeSchemaChangeEvents = Arrays.stream(includeSchemaChangeEvents.split(",")).map(typeName -> SchemaChangeEvent.SchemaChangeEventType.valueOf(typeName)) + .collect(Collectors.toSet()); + + } + + @Override + public R apply(R record) { + if (record.value() == null || !smtManager.isValidSchemaChange(record)) { + return record; + } + Struct recordValue = requireStruct(record.value(), "Read schema change event to filter"); + + List tableChanges = recordValue.getArray(HistoryRecord.Fields.TABLE_CHANGES); + if (tableChanges == null) { + LOGGER.debug("Table changes is empty, excluded it."); + return null; + } + SchemaChangeEvent.SchemaChangeEventType schemaChangeEventType; + if (tableChanges.isEmpty()) { + schemaChangeEventType = SchemaChangeEvent.SchemaChangeEventType.DATABASE; + } + else { + schemaChangeEventType = SchemaChangeEvent.SchemaChangeEventType.valueOf((String) tableChanges.get(0).get(ConnectTableChangeSerializer.TYPE_KEY)); + } + + if (includeSchemaChangeEvents.contains(schemaChangeEventType)) { + return record; + } + return null; + } + + @Override + public ConfigDef config() { + final ConfigDef config = new ConfigDef(); + Field.group(config, null, SCHEMA_CHANGE_EVENT_INCLUDE_LIST); + return config; + } + + @Override + public void close() { + } + +} diff --git a/debezium-core/src/main/java/io/debezium/transforms/SmtManager.java b/debezium-core/src/main/java/io/debezium/transforms/SmtManager.java index 18eb30998..e074bf449 100644 --- a/debezium-core/src/main/java/io/debezium/transforms/SmtManager.java +++ b/debezium-core/src/main/java/io/debezium/transforms/SmtManager.java @@ -20,6 +20,7 @@ import io.debezium.config.Configuration; import io.debezium.config.Field; import io.debezium.data.Envelope; +import io.debezium.schema.SchemaFactory; /** * A class used by all Debezium supplied SMTs to centralize common logic. @@ -51,6 +52,16 @@ public boolean isValidEnvelope(final R record) { return true; } + public boolean isValidSchemaChange(final R record) { + if (record.valueSchema() == null || + record.valueSchema().name() == null || + !SchemaFactory.get().isSchemaChangeSchema(record.valueSchema())) { + LOGGER.debug("Expected schema change schema for transformation, passing it unchanged"); + return false; + } + return true; + } + public boolean isValidKey(final R record) { if (record.keySchema() == null || record.keySchema().name() == null || diff --git a/debezium-core/src/test/java/io/debezium/transforms/SchemaChangeEventFilterTest.java b/debezium-core/src/test/java/io/debezium/transforms/SchemaChangeEventFilterTest.java new file mode 100644 index 000000000..505e76c23 --- /dev/null +++ b/debezium-core/src/test/java/io/debezium/transforms/SchemaChangeEventFilterTest.java @@ -0,0 +1,149 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.debezium.transforms; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.Test; + +import io.debezium.relational.history.ConnectTableChangeSerializer; +import io.debezium.relational.history.HistoryRecord; + +/** + * Unit test for the {@link SchemaChangeEventFilter} single message transformation. + * + */ +public class SchemaChangeEventFilterTest extends AbstractExtractStateTest { + + private static final String SCHEMA_CHANGE_EVENT_INCLUDE_LIST = "schema.change.event.include.list"; + private static final String SCHEMA_HISTORY_CHANGE_SCHEMA_NAME = "io.debezium.connector.schema.Change"; + + @Test + public void whenNoDeclaredConfigExceptionIsThrew() { + try (SchemaChangeEventFilter transform = new SchemaChangeEventFilter<>()) { + final Map props = new HashMap<>(); + assertThatThrownBy(() -> transform.configure(props)).isInstanceOf(ConfigException.class).hasMessageContaining( + "Invalid value null for configuration schema.change.event.include.list: The 'schema.change.event.include.list' value is invalid: A value is required"); + } + } + + @Test + public void testSchemaChangeContainsEventTypeFilter() { + try (SchemaChangeEventFilter transform = new SchemaChangeEventFilter<>()) { + final Map props = new HashMap<>(); + props.put(SCHEMA_CHANGE_EVENT_INCLUDE_LIST, "ALTER,CREATE"); + transform.configure(props); + final SourceRecord record = createSchemaChangeRecordContainsEventType(); + assertThat(transform.apply(record)); + } + } + + @Test + public void testSchemaChangeNonEventTypeFilter() { + try (SchemaChangeEventFilter transform = new SchemaChangeEventFilter<>()) { + final Map props = new HashMap<>(); + props.put(SCHEMA_CHANGE_EVENT_INCLUDE_LIST, "ALTER,CREATE"); + transform.configure(props); + final SourceRecord record = createSchemaChangeRecordNonEventType(); + assertThat(transform.apply(record)).isNull(); + } + } + + @Test + public void testSchemaChangeNonContainsEventTypeFilter() { + try (SchemaChangeEventFilter transform = new SchemaChangeEventFilter<>()) { + final Map props = new HashMap<>(); + props.put(SCHEMA_CHANGE_EVENT_INCLUDE_LIST, "ALTER,CREATE"); + transform.configure(props); + final SourceRecord record = createSchemaChangeRecordNonContainsEventType(); + assertThat(transform.apply(record)).isNull(); + } + } + + protected SourceRecord createSchemaChangeRecordNonEventType() { + final Schema schemaChangeRecordSchema = createSchemaChangeSchema(); + final Struct result = new Struct(schemaChangeRecordSchema); + result.put(HistoryRecord.Fields.TIMESTAMP, System.currentTimeMillis()); + result.put(HistoryRecord.Fields.DATABASE_NAME, "test"); + result.put(HistoryRecord.Fields.SCHEMA_NAME, "test_schema"); + result.put(HistoryRecord.Fields.DDL_STATEMENTS, ""); + result.put(HistoryRecord.Fields.TABLE_CHANGES, new ArrayList<>()); + return new SourceRecord(new HashMap<>(), new HashMap<>(), "dummy", schemaChangeRecordSchema, result); + } + + protected SourceRecord createSchemaChangeRecordContainsEventType() { + final Schema schemaChangeRecordSchema = createSchemaChangeSchema(); + final Struct result = new Struct(schemaChangeRecordSchema); + result.put(HistoryRecord.Fields.TIMESTAMP, System.currentTimeMillis()); + result.put(HistoryRecord.Fields.DATABASE_NAME, "test"); + result.put(HistoryRecord.Fields.SCHEMA_NAME, "test_schema"); + result.put(HistoryRecord.Fields.DDL_STATEMENTS, ""); + + List structs = new ArrayList<>(); + Struct struct = new Struct(schemaHistoryChangeSchema()); + struct.put(ConnectTableChangeSerializer.TYPE_KEY, "ALTER"); + struct.put(ConnectTableChangeSerializer.ID_KEY, "test.table"); + structs.add(struct); + result.put(HistoryRecord.Fields.TABLE_CHANGES, structs); + + // value.put() + return new SourceRecord(new HashMap<>(), new HashMap<>(), "dummy", schemaChangeRecordSchema, result); + } + + protected SourceRecord createSchemaChangeRecordNonContainsEventType() { + final Schema schemaChangeRecordSchema = createSchemaChangeSchema(); + final Struct result = new Struct(schemaChangeRecordSchema); + result.put(HistoryRecord.Fields.TIMESTAMP, System.currentTimeMillis()); + result.put(HistoryRecord.Fields.DATABASE_NAME, "test"); + result.put(HistoryRecord.Fields.SCHEMA_NAME, "test_schema"); + result.put(HistoryRecord.Fields.DDL_STATEMENTS, ""); + + List structs = new ArrayList<>(); + Struct struct = new Struct(schemaHistoryChangeSchema()); + struct.put(ConnectTableChangeSerializer.TYPE_KEY, "DROP"); + struct.put(ConnectTableChangeSerializer.ID_KEY, "test.table"); + structs.add(struct); + result.put(HistoryRecord.Fields.TABLE_CHANGES, structs); + + result.put(HistoryRecord.Fields.TABLE_CHANGES, structs); + + // value.put() + return new SourceRecord(new HashMap<>(), new HashMap<>(), "dummy", schemaChangeRecordSchema, result); + } + + private static Schema createSchemaChangeSchema() { + final Schema schemaChangeRecordSchema = SchemaBuilder.struct() + .name("filter.SchemaChangeValue") + .field(HistoryRecord.Fields.TIMESTAMP, Schema.INT64_SCHEMA) + .field(HistoryRecord.Fields.DATABASE_NAME, Schema.OPTIONAL_STRING_SCHEMA) + .field(HistoryRecord.Fields.SCHEMA_NAME, Schema.OPTIONAL_STRING_SCHEMA) + .field(HistoryRecord.Fields.DDL_STATEMENTS, Schema.OPTIONAL_STRING_SCHEMA) + .field(HistoryRecord.Fields.TABLE_CHANGES, SchemaBuilder.array(schemaHistoryChangeSchema()).build()) + .build(); + return schemaChangeRecordSchema; + } + + protected static Schema schemaHistoryChangeSchema() { + return SchemaBuilder.struct() + .name(SCHEMA_HISTORY_CHANGE_SCHEMA_NAME) + .field(ConnectTableChangeSerializer.TYPE_KEY, Schema.STRING_SCHEMA) + .field(ConnectTableChangeSerializer.ID_KEY, Schema.STRING_SCHEMA) + .build(); + } + +} diff --git a/documentation/modules/ROOT/pages/transformations/schema-change-event-filter.adoc b/documentation/modules/ROOT/pages/transformations/schema-change-event-filter.adoc new file mode 100644 index 000000000..ebef8588d --- /dev/null +++ b/documentation/modules/ROOT/pages/transformations/schema-change-event-filter.adoc @@ -0,0 +1,145 @@ +:page-aliases: configuration/schema-change-event-filter.adoc + + +[id="schema-change-event-filter"] += SchemaChangeEventFilter + +:toc: +:toc-placement: macro +:linkattrs: +:icons: font +:source-highlighter: highlight.js + +toc::[] + +The `schemaChangeEventFilter` is mainly used to filter table schema change events, allowing us to selectively synchronize ddl. +For more information about configuring the SMT, see the following xref:example-schema-change-event-filter[example]. + + +[[example-schema-change-event-filter]] +== Example + +If you want to filter the captured ddl, please configure the following options when creating the connector. +For example, to filter out `DROP` and `TRUNCATE` from captured DDL event messages, add the following line to the connector configuration: +[source] +---- +transforms=filterTableDropAndTruncateEvent +transforms.filterTableDropAndTruncateEvent.type=io.debezium.transforms.SchemaChangeEventFilter +transforms.filterTableDropAndTruncateEvent.schema.change.event.include.list=DROP,TRUNCATE +---- + +The following example shows the changes in the values of event records before and after being filtered by the SchemaChangeEventFilter +.Effect of applying the `SchemaChangeEventFilter` SMT +==== +Schema change event record before it is processed by the `SchemaChangeEventFilter` transformation:: + +Schema change event record before the SMT processes the event record::: ++ +[source] +---- +{ + "schema":{ + "type":"struct", + "fields":[ + { + "type":"int64", + "optional":false, + "field":"ts_ms" + }, + { + "type":"string", + "optional":true, + "field":"databaseName" + }, + { + "type":"string", + "optional":true, + "field":"schemaName" + }, + { + "type":"string", + "optional":true, + "field":"ddl" + }, + { + "type":"array", + "items":{ + "type":"struct", + "fields":[ + { + "type":"string", + "optional":false, + "field":"type" + }, + { + "type":"string", + "optional":false, + "field":"id" + } + ], + "optional":false, + "name":"io.debezium.connector.schema.Change" + }, + "optional":false, + "field":"tableChanges" + } + ], + "optional":false, + "name":"filter.SchemaChangeValue" + }, + "payload":{ + "ts_ms":1691035505397, + "databaseName":"test", + "schemaName":"test_schema", + "ddl":"", + "tableChanges":[ + { + "type":"DROP", + "id":"test.table" + } + ] + } +} +---- + +The record of the schema change event will return null after being processed by the `SchemaChangeEventFilter` transformation, and null values will not be sent:: + +Schema change event record after the SMT processes the event record::: + ++ +[source] +---- +null +---- +==== + + +[[schema-change-event-filter-configuration-options]] +== Configuration options + +The following table lists the configuration options that you can use with the `SchemaChangeEventFilter` SMT. + +.SchemaChangeEventFilter SMT configuration options +[cols="14%a,40%a,10%a, 16%a, 16%a, 10%a"] +|=== +|Property +|Description +|Type +|Default +|Valid Values +|Importance + +|[[schema-change-event-filter-include-list]]<> +|A comma-separated list of schema change events that need to be filtered, Set one or more of the following options. +---- +CREATE +ALTER +DROP +TRUNCATE +DATABASE +---- +|String +|No default value +|non-empty string +|high +|===