DBZ-188: Allow a debezium mysql connector to filter production of DML events into kafka by the mysql UUID of the event

With GTIDs enabled, each transaction in the binlog contains a GTID event, which gives us access to the GTID of the transaction. The GTID has the following format: source_id:transaction_id, where source_id is the UUID of the mysql server the transaction was written to.

I propose to allow a debezium instance to be configured with a UUID pattern to check against before producing DML events into Kafka. Debezium would produce a DML event into kafka if and only if the UUID in the event's GTID matches the pattern with which debezium was configured.

The configuration for the UUID patterns will make use of the existing gtid.source.includes and gtid.source.excludes options. The DML event filtering will only be performed if the new option gtid.source.filter.dml.events is true.
This commit is contained in:
dleibovic 2017-02-09 12:50:35 -05:00
parent 3aea39397f
commit aa50bfe71a
2 changed files with 45 additions and 1 deletions

View File

@ -16,6 +16,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
@ -72,6 +73,8 @@ public class BinlogReader extends AbstractReader {
private long previousOutputMillis = 0L;
private long initialEventsToSkip = 0L;
private boolean skipEvent = false;
private boolean ignoreDmlEventByGtidSource = false;
private final Predicate<String> gtidDmlSourceFilter;
private final AtomicLong totalRecordCounter = new AtomicLong();
private volatile Map<String, ?> lastOffset = null;
private com.github.shyiko.mysql.binlog.GtidSet gtidSet;
@ -101,6 +104,9 @@ public BinlogReader(String name, MySqlTaskContext context) {
client.registerLifecycleListener(new ReaderThreadLifecycleListener());
if (logger.isDebugEnabled()) client.registerEventListener(this::logEvent);
boolean filterDmlEventsByGtidSource = context.config().getBoolean(MySqlConnectorConfig.GTID_SOURCE_FILTER_DML_EVENTS);
gtidDmlSourceFilter = filterDmlEventsByGtidSource ? context.gtidSourceFilter() : null;
// Set up the event deserializer with additional type(s) ...
final Map<Long, TableMapEventData> tableMapEventByTableId = new HashMap<Long, TableMapEventData>();
EventDeserializer eventDeserializer = new EventDeserializer() {
@ -408,6 +414,13 @@ protected void handleGtidEvent(Event event) {
String gtid = gtidEvent.getGtid();
gtidSet.add(gtid);
source.startGtid(gtid, gtidSet.toString()); // rather than use the client's GTID set
ignoreDmlEventByGtidSource = false;
if (gtidDmlSourceFilter != null && gtid != null) {
String uuid = gtid.trim().substring(0, gtid.indexOf(":"));
if (!gtidDmlSourceFilter.test(uuid)) {
ignoreDmlEventByGtidSource = true;
}
}
}
/**
@ -438,6 +451,7 @@ protected void handleQueryEvent(Event event) throws InterruptedException {
source.commitTransaction();
source.setBinlogThread(-1L);
skipEvent = false;
ignoreDmlEventByGtidSource = false;
return;
}
if (sql.toUpperCase().startsWith("XA ")) {
@ -490,6 +504,10 @@ protected void handleInsert(Event event) throws InterruptedException {
logger.debug("Skipping previously processed row event: {}", event);
return;
}
if (ignoreDmlEventByGtidSource) {
logger.debug("Skipping DML event because this GTID source is filtered: {}", event);
return;
}
WriteRowsEventData write = unwrapData(event);
long tableNumber = write.getTableId();
BitSet includedColumns = write.getIncludedColumns();
@ -533,6 +551,10 @@ protected void handleUpdate(Event event) throws InterruptedException {
logger.debug("Skipping previously processed row event: {}", event);
return;
}
if (ignoreDmlEventByGtidSource) {
logger.debug("Skipping DML event because this GTID source is filtered: {}", event);
return;
}
UpdateRowsEventData update = unwrapData(event);
long tableNumber = update.getTableId();
BitSet includedColumns = update.getIncludedColumns();
@ -580,6 +602,10 @@ protected void handleDelete(Event event) throws InterruptedException {
logger.debug("Skipping previously processed row event: {}", event);
return;
}
if (ignoreDmlEventByGtidSource) {
logger.debug("Skipping DML event because this GTID source is filtered: {}", event);
return;
}
DeleteRowsEventData deleted = unwrapData(event);
long tableNumber = deleted.getTableId();
BitSet includedColumns = deleted.getIncludedColumns();

View File

@ -515,6 +515,23 @@ public static SecureConnectionMode parse(String value, String defaultValue) {
.withInvisibleRecommender()
.withDescription("The source UUIDs used to exclude GTID ranges when determine the starting position in the MySQL server's binlog.");
/**
* If set to true, we will only produce DML events into Kafka for transactions that were written on mysql servers
* with UUIDs matching the filters defined by the {@link GTID_SOURCE_INCLUDES} or {@link GTID_SOURCE_EXCLUDES}
* configuration options, if they are specified.
*
* Defaults to true.
*
* When true, either {@link GTID_SOURCE_INCLUDES} or {@link GTID_SOURCE_EXCLUDES} must be set.
*/
public static final Field GTID_SOURCE_FILTER_DML_EVENTS = Field.create("gtid.source.filter.dml.events")
.withDisplayName("Filter DML events")
.withType(Type.BOOLEAN)
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withDefault(true)
.withDescription("If set to true, we will only produce DML events into Kafka for transactions that were written on mysql servers with UUIDs matching the filters defined by the gtid.source.includes or gtid.source.excludes configuration options, if they are specified.");
public static final Field CONNECTION_TIMEOUT_MS = Field.create("connect.timeout.ms")
.withDisplayName("Connection Timeout (ms)")
.withType(Type.INT)
@ -685,6 +702,7 @@ public static final Field MASK_COLUMN(int length) {
DATABASE_WHITELIST, DATABASE_BLACKLIST,
COLUMN_BLACKLIST, SNAPSHOT_MODE, SNAPSHOT_MINIMAL_LOCKING,
GTID_SOURCE_INCLUDES, GTID_SOURCE_EXCLUDES,
GTID_SOURCE_FILTER_DML_EVENTS,
TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE,
SSL_MODE, SSL_KEYSTORE, SSL_KEYSTORE_PASSWORD,
SSL_TRUSTSTORE, SSL_TRUSTSTORE_PASSWORD, JDBC_DRIVER);
@ -709,7 +727,7 @@ protected static ConfigDef configDef() {
KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS, DATABASE_HISTORY);
Field.group(config, "Events", INCLUDE_SCHEMA_CHANGES, TABLES_IGNORE_BUILTIN, DATABASE_WHITELIST, TABLE_WHITELIST,
COLUMN_BLACKLIST, TABLE_BLACKLIST, DATABASE_BLACKLIST,
GTID_SOURCE_INCLUDES, GTID_SOURCE_EXCLUDES);
GTID_SOURCE_INCLUDES, GTID_SOURCE_EXCLUDES, GTID_SOURCE_FILTER_DML_EVENTS);
Field.group(config, "Connector", CONNECTION_TIMEOUT_MS, KEEP_ALIVE, MAX_QUEUE_SIZE, MAX_BATCH_SIZE, POLL_INTERVAL_MS,
SNAPSHOT_MODE, SNAPSHOT_MINIMAL_LOCKING, TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE);
return config;