DBZ-6240 Add an STM to filter schema change event

This commit is contained in:
sunxiaojian 2023-03-21 18:18:59 +08:00 committed by Jiri Pechanec
parent 0d18eb6768
commit 6451c93256
5 changed files with 409 additions and 0 deletions

View File

@ -92,6 +92,14 @@ public static SchemaFactory get() {
return schemaFactoryObject;
}
public boolean isSchemaChangeSchema(Schema schema) {
if (schema != null && schema.name() != null) {
return schema.name().endsWith(SCHEMA_HISTORY_CONNECTOR_VALUE_SCHEMA_NAME_SUFFIX) ||
schema.name().endsWith(SCHEMA_HISTORY_CONNECTOR_KEY_SCHEMA_NAME_SUFFIX);
}
return false;
}
public Schema heartbeatKeySchema(SchemaNameAdjuster adjuster) {
return SchemaBuilder.struct()
.name(adjuster.adjust(HEARTBEAT_KEY_SCHEMA_NAME))

View File

@ -0,0 +1,96 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.transforms;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.Transformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.relational.history.ConnectTableChangeSerializer;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.schema.SchemaChangeEvent;
/**
* This SMT to filter schema change event
* @param <R>
*/
public class SchemaChangeEventFilter<R extends ConnectRecord<R>> implements Transformation<R> {
private static final Logger LOGGER = LoggerFactory.getLogger(SchemaChangeEventFilter.class);
private static final Field SCHEMA_CHANGE_EVENT_INCLUDE_LIST = Field.create("schema.change.event.include.list")
.withDisplayName("Schema change event include list")
.withType(ConfigDef.Type.STRING)
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.LOW)
.withDescription(
"Support filtering during DDL synchronization")
.required();
private Set<SchemaChangeEvent.SchemaChangeEventType> includeSchemaChangeEvents;
private SmtManager<R> smtManager;
@Override
public void configure(Map<String, ?> configs) {
final Configuration config = Configuration.from(configs);
smtManager = new SmtManager<>(config);
smtManager.validate(config, Field.setOf(SCHEMA_CHANGE_EVENT_INCLUDE_LIST));
final String includeSchemaChangeEvents = config.getString(SCHEMA_CHANGE_EVENT_INCLUDE_LIST);
this.includeSchemaChangeEvents = Arrays.stream(includeSchemaChangeEvents.split(",")).map(typeName -> SchemaChangeEvent.SchemaChangeEventType.valueOf(typeName))
.collect(Collectors.toSet());
}
@Override
public R apply(R record) {
if (record.value() == null || !smtManager.isValidSchemaChange(record)) {
return record;
}
Struct recordValue = requireStruct(record.value(), "Read schema change event to filter");
List<Struct> tableChanges = recordValue.getArray(HistoryRecord.Fields.TABLE_CHANGES);
if (tableChanges == null) {
LOGGER.debug("Table changes is empty, excluded it.");
return null;
}
SchemaChangeEvent.SchemaChangeEventType schemaChangeEventType;
if (tableChanges.isEmpty()) {
schemaChangeEventType = SchemaChangeEvent.SchemaChangeEventType.DATABASE;
}
else {
schemaChangeEventType = SchemaChangeEvent.SchemaChangeEventType.valueOf((String) tableChanges.get(0).get(ConnectTableChangeSerializer.TYPE_KEY));
}
if (includeSchemaChangeEvents.contains(schemaChangeEventType)) {
return record;
}
return null;
}
@Override
public ConfigDef config() {
final ConfigDef config = new ConfigDef();
Field.group(config, null, SCHEMA_CHANGE_EVENT_INCLUDE_LIST);
return config;
}
@Override
public void close() {
}
}

View File

@ -20,6 +20,7 @@
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.data.Envelope;
import io.debezium.schema.SchemaFactory;
/**
* A class used by all Debezium supplied SMTs to centralize common logic.
@ -51,6 +52,16 @@ public boolean isValidEnvelope(final R record) {
return true;
}
public boolean isValidSchemaChange(final R record) {
if (record.valueSchema() == null ||
record.valueSchema().name() == null ||
!SchemaFactory.get().isSchemaChangeSchema(record.valueSchema())) {
LOGGER.debug("Expected schema change schema for transformation, passing it unchanged");
return false;
}
return true;
}
public boolean isValidKey(final R record) {
if (record.keySchema() == null ||
record.keySchema().name() == null ||

View File

@ -0,0 +1,149 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.transforms;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Test;
import io.debezium.relational.history.ConnectTableChangeSerializer;
import io.debezium.relational.history.HistoryRecord;
/**
* Unit test for the {@link SchemaChangeEventFilter} single message transformation.
*
*/
public class SchemaChangeEventFilterTest extends AbstractExtractStateTest {
private static final String SCHEMA_CHANGE_EVENT_INCLUDE_LIST = "schema.change.event.include.list";
private static final String SCHEMA_HISTORY_CHANGE_SCHEMA_NAME = "io.debezium.connector.schema.Change";
@Test
public void whenNoDeclaredConfigExceptionIsThrew() {
try (SchemaChangeEventFilter<SourceRecord> transform = new SchemaChangeEventFilter<>()) {
final Map<String, String> props = new HashMap<>();
assertThatThrownBy(() -> transform.configure(props)).isInstanceOf(ConfigException.class).hasMessageContaining(
"Invalid value null for configuration schema.change.event.include.list: The 'schema.change.event.include.list' value is invalid: A value is required");
}
}
@Test
public void testSchemaChangeContainsEventTypeFilter() {
try (SchemaChangeEventFilter<SourceRecord> transform = new SchemaChangeEventFilter<>()) {
final Map<String, String> props = new HashMap<>();
props.put(SCHEMA_CHANGE_EVENT_INCLUDE_LIST, "ALTER,CREATE");
transform.configure(props);
final SourceRecord record = createSchemaChangeRecordContainsEventType();
assertThat(transform.apply(record));
}
}
@Test
public void testSchemaChangeNonEventTypeFilter() {
try (SchemaChangeEventFilter<SourceRecord> transform = new SchemaChangeEventFilter<>()) {
final Map<String, String> props = new HashMap<>();
props.put(SCHEMA_CHANGE_EVENT_INCLUDE_LIST, "ALTER,CREATE");
transform.configure(props);
final SourceRecord record = createSchemaChangeRecordNonEventType();
assertThat(transform.apply(record)).isNull();
}
}
@Test
public void testSchemaChangeNonContainsEventTypeFilter() {
try (SchemaChangeEventFilter<SourceRecord> transform = new SchemaChangeEventFilter<>()) {
final Map<String, String> props = new HashMap<>();
props.put(SCHEMA_CHANGE_EVENT_INCLUDE_LIST, "ALTER,CREATE");
transform.configure(props);
final SourceRecord record = createSchemaChangeRecordNonContainsEventType();
assertThat(transform.apply(record)).isNull();
}
}
protected SourceRecord createSchemaChangeRecordNonEventType() {
final Schema schemaChangeRecordSchema = createSchemaChangeSchema();
final Struct result = new Struct(schemaChangeRecordSchema);
result.put(HistoryRecord.Fields.TIMESTAMP, System.currentTimeMillis());
result.put(HistoryRecord.Fields.DATABASE_NAME, "test");
result.put(HistoryRecord.Fields.SCHEMA_NAME, "test_schema");
result.put(HistoryRecord.Fields.DDL_STATEMENTS, "");
result.put(HistoryRecord.Fields.TABLE_CHANGES, new ArrayList<>());
return new SourceRecord(new HashMap<>(), new HashMap<>(), "dummy", schemaChangeRecordSchema, result);
}
protected SourceRecord createSchemaChangeRecordContainsEventType() {
final Schema schemaChangeRecordSchema = createSchemaChangeSchema();
final Struct result = new Struct(schemaChangeRecordSchema);
result.put(HistoryRecord.Fields.TIMESTAMP, System.currentTimeMillis());
result.put(HistoryRecord.Fields.DATABASE_NAME, "test");
result.put(HistoryRecord.Fields.SCHEMA_NAME, "test_schema");
result.put(HistoryRecord.Fields.DDL_STATEMENTS, "");
List<Struct> structs = new ArrayList<>();
Struct struct = new Struct(schemaHistoryChangeSchema());
struct.put(ConnectTableChangeSerializer.TYPE_KEY, "ALTER");
struct.put(ConnectTableChangeSerializer.ID_KEY, "test.table");
structs.add(struct);
result.put(HistoryRecord.Fields.TABLE_CHANGES, structs);
// value.put()
return new SourceRecord(new HashMap<>(), new HashMap<>(), "dummy", schemaChangeRecordSchema, result);
}
protected SourceRecord createSchemaChangeRecordNonContainsEventType() {
final Schema schemaChangeRecordSchema = createSchemaChangeSchema();
final Struct result = new Struct(schemaChangeRecordSchema);
result.put(HistoryRecord.Fields.TIMESTAMP, System.currentTimeMillis());
result.put(HistoryRecord.Fields.DATABASE_NAME, "test");
result.put(HistoryRecord.Fields.SCHEMA_NAME, "test_schema");
result.put(HistoryRecord.Fields.DDL_STATEMENTS, "");
List<Struct> structs = new ArrayList<>();
Struct struct = new Struct(schemaHistoryChangeSchema());
struct.put(ConnectTableChangeSerializer.TYPE_KEY, "DROP");
struct.put(ConnectTableChangeSerializer.ID_KEY, "test.table");
structs.add(struct);
result.put(HistoryRecord.Fields.TABLE_CHANGES, structs);
result.put(HistoryRecord.Fields.TABLE_CHANGES, structs);
// value.put()
return new SourceRecord(new HashMap<>(), new HashMap<>(), "dummy", schemaChangeRecordSchema, result);
}
private static Schema createSchemaChangeSchema() {
final Schema schemaChangeRecordSchema = SchemaBuilder.struct()
.name("filter.SchemaChangeValue")
.field(HistoryRecord.Fields.TIMESTAMP, Schema.INT64_SCHEMA)
.field(HistoryRecord.Fields.DATABASE_NAME, Schema.OPTIONAL_STRING_SCHEMA)
.field(HistoryRecord.Fields.SCHEMA_NAME, Schema.OPTIONAL_STRING_SCHEMA)
.field(HistoryRecord.Fields.DDL_STATEMENTS, Schema.OPTIONAL_STRING_SCHEMA)
.field(HistoryRecord.Fields.TABLE_CHANGES, SchemaBuilder.array(schemaHistoryChangeSchema()).build())
.build();
return schemaChangeRecordSchema;
}
protected static Schema schemaHistoryChangeSchema() {
return SchemaBuilder.struct()
.name(SCHEMA_HISTORY_CHANGE_SCHEMA_NAME)
.field(ConnectTableChangeSerializer.TYPE_KEY, Schema.STRING_SCHEMA)
.field(ConnectTableChangeSerializer.ID_KEY, Schema.STRING_SCHEMA)
.build();
}
}

View File

@ -0,0 +1,145 @@
:page-aliases: configuration/schema-change-event-filter.adoc
[id="schema-change-event-filter"]
= SchemaChangeEventFilter
:toc:
:toc-placement: macro
:linkattrs:
:icons: font
:source-highlighter: highlight.js
toc::[]
The `schemaChangeEventFilter` is mainly used to filter table schema change events, allowing us to selectively synchronize ddl.
For more information about configuring the SMT, see the following xref:example-schema-change-event-filter[example].
[[example-schema-change-event-filter]]
== Example
If you want to filter the captured ddl, please configure the following options when creating the connector.
For example, to filter out `DROP` and `TRUNCATE` from captured DDL event messages, add the following line to the connector configuration:
[source]
----
transforms=filterTableDropAndTruncateEvent
transforms.filterTableDropAndTruncateEvent.type=io.debezium.transforms.SchemaChangeEventFilter
transforms.filterTableDropAndTruncateEvent.schema.change.event.include.list=DROP,TRUNCATE
----
The following example shows the changes in the values of event records before and after being filtered by the SchemaChangeEventFilter
.Effect of applying the `SchemaChangeEventFilter` SMT
====
Schema change event record before it is processed by the `SchemaChangeEventFilter` transformation::
Schema change event record before the SMT processes the event record:::
+
[source]
----
{
"schema":{
"type":"struct",
"fields":[
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"field":"databaseName"
},
{
"type":"string",
"optional":true,
"field":"schemaName"
},
{
"type":"string",
"optional":true,
"field":"ddl"
},
{
"type":"array",
"items":{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"type"
},
{
"type":"string",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"io.debezium.connector.schema.Change"
},
"optional":false,
"field":"tableChanges"
}
],
"optional":false,
"name":"filter.SchemaChangeValue"
},
"payload":{
"ts_ms":1691035505397,
"databaseName":"test",
"schemaName":"test_schema",
"ddl":"",
"tableChanges":[
{
"type":"DROP",
"id":"test.table"
}
]
}
}
----
The record of the schema change event will return null after being processed by the `SchemaChangeEventFilter` transformation, and null values will not be sent::
Schema change event record after the SMT processes the event record:::
+
[source]
----
null
----
====
[[schema-change-event-filter-configuration-options]]
== Configuration options
The following table lists the configuration options that you can use with the `SchemaChangeEventFilter` SMT.
.SchemaChangeEventFilter SMT configuration options
[cols="14%a,40%a,10%a, 16%a, 16%a, 10%a"]
|===
|Property
|Description
|Type
|Default
|Valid Values
|Importance
|[[schema-change-event-filter-include-list]]<<schema-change-event-filter-include-list, `schema.change.event.include.list`>>
|A comma-separated list of schema change events that need to be filtered, Set one or more of the following options.
----
CREATE
ALTER
DROP
TRUNCATE
DATABASE
----
|String
|No default value
|non-empty string
|high
|===