Refactored the MySQL connector to break out the logic of reading the binlog into a separate class, added a similar class to read a full snapshot, and then updated the MySQL connector task class to use both. Added several test cases and updated the existing tests.
Several of the MySQL connector classes were fairly large and complicated, and to prepare for upcoming changes/enhancements these larger classes were refactored to pull out units of functionality. Currently all unit tests pass with these changes, with additional unit tests for these new components.
Previously, the DDL statements were being filtered and recorded based upon the name of the database that appeared in the binlog. However, that database name is actually the name of the database to which the client submitting the operation is connected, and is not necessarily the database _affected_ by the operation (e.g., when an operation includes a fully-qualified table name not in the connected-to database).
With these changes, the table/database affected by the DDL statements is now being used to filter the recording of the statements. The order of the DDL statements is still maintained, but since each DDL statement can apply to a separate database the DDL statements are batched (in the same original order) based upon the affected database. For example, two statements affecting "db1" will get batched together into one schema change record, followed by one statement affecting "db2" as a second schema change record, followed by another statement affecting "db1" as a third schema record.
Meanwhile, this change does not affect how the database history records the changes: it still records them as submitted using a single record for each separate binlog event/position. This is much safer as each binlog event (with specific position) is written atomically to the history stream. Also, since the database history stream is what the connector uses upon recovery, the database history records are now written _after_ any schema change records to ensure that, upon recovery after failure, no schema change records are lost (and instead have at-least-once delivery guarantees).
Added integration test logic to verify that UPDATE events include both 'before' and 'after' states (previously added as part of DBZ-52), to verify that altering a table does not generate events for the rows in that table, and that the 'before' and 'after' states (read from the binlog) are always defined in terms of the _current_ table schema. IOW, no special logic is needed to handle a 'before' state that has different columns than defined in the current table's definition.
Added to the Debezium event message's `source` information the MySQL server ID for the cluster process that recorded the event and the MySQL timestamp at which the event was recorded.
Added explicit support for handling `CHARSET` as an alias for `CHARACTER SET` in both tables and columns. `CREATE DATABASE` and `ALTER DATABASE` statements can also specify character sets, but the DDL parser handles but does not explicitly parse them so no modification is needed for them. Several unit tests were added to confirm the behavior.
The new envelope Struct contains fields for the local time at which the connector processed the event, the kind of operation (e.g., read, insert, update, or delete), the state of the record before and after the change, and the information about the event source. The latter two items are connector-specific. The timestamp is merely the time using the connector's process clock, and no guarantees are provided about accuracy, monotonicity, or relationship to the original source event.
The envelope structure is now used as the value for each event message in the MySQL connector; they keys of the event messages remain unchanged. Note that to facilitate Kafka log compaction (which requires a null value), a delete event containing the envelope with details about the deletion is followed by a "tombstone" event that contains the same key but null value.
An example of a message value with this new envelope is as follows:
{
"schema" : {
"type" : "struct",
"fields" : [ {
"type" : "struct",
"fields" : [ {
"type" : "int32",
"optional" : false,
"name" : "org.apache.kafka.connect.data.Date",
"version" : 1,
"field" : "order_date"
}, {
"type" : "int32",
"optional" : false,
"field" : "purchaser"
}, {
"type" : "int32",
"optional" : false,
"field" : "quantity"
}, {
"type" : "int32",
"optional" : false,
"field" : "product_id"
} ],
"optional" : true,
"name" : "connector_test.orders",
"field" : "before"
}, {
"type" : "struct",
"fields" : [ {
"type" : "int32",
"optional" : false,
"name" : "org.apache.kafka.connect.data.Date",
"version" : 1,
"field" : "order_date"
}, {
"type" : "int32",
"optional" : false,
"field" : "purchaser"
}, {
"type" : "int32",
"optional" : false,
"field" : "quantity"
}, {
"type" : "int32",
"optional" : false,
"field" : "product_id"
} ],
"optional" : true,
"name" : "connector_test.orders",
"field" : "after"
}, {
"type" : "struct",
"fields" : [ {
"type" : "string",
"optional" : false,
"field" : "server"
}, {
"type" : "string",
"optional" : false,
"field" : "file"
}, {
"type" : "int64",
"optional" : false,
"field" : "pos"
}, {
"type" : "int32",
"optional" : false,
"field" : "row"
} ],
"optional" : false,
"name" : "io.debezium.connector.mysql.Source",
"field" : "source"
}, {
"type" : "string",
"optional" : false,
"field" : "op"
}, {
"type" : "int64",
"optional" : true,
"field" : "ts"
} ],
"optional" : false,
"name" : "kafka-connect-2.connector_test.orders",
"version" : 1
},
"payload" : {
"before" : null,
"after" : {
"order_date" : 16852,
"purchaser" : 1003,
"quantity" : 1,
"product_id" : 107
},
"source" : {
"server" : "kafka-connect-2",
"file" : "mysql-bin.000002",
"pos" : 2887680,
"row" : 4
},
"op" : "c",
"ts" : 1463437199134
}
}
Notice how the Schema is significantly larger, since it must describe all of the envelope's fields even when those fields are not used. In this case, the event signifies that a record was created as the 4th record of a single event recorded in the binlog.
When a row is updated in the database and the primary/unique key for that table is changed, the MySQL connector continues to generate an update event with the new key and new value, but now also generates a tombstone event for the old key. This ensures that when a Kafka topic is compacted, all prior events with the old key will (eventually) be removed. It also ensures that consumers see that the row represented by the old key has been removed.
MySQL 5.6 using the MyISAM engine will create the `help_relation` system table using a CREATE TABLE statement that does not have in the columns' REFERENCE clause a list of columns in the referenced table. MySQL 5.7 using the InnoDB engine does not include the REFERENCE clauses.
Because Debezium's MySQL DDL parser is meant only to understand the statements recorded in the binlog, it does not have to validate the statements and therefore the DDL parser can be a bit more lenient by not requiring the list of columns in a REFERENCE clause in a CREATE TABLE statement's column definitions.
This commit also adds several unit tests that validate all of the DDL statements used by MySQL 5.6 and 5.7 during startup (in the configurations used in our integration tests).
Changed the MySQL connector to use comma-separated lists of regular expressions for the database
and table whitelist/blacklists. Literals are still accepted and will match fully-qualified table names,
although the '.' character used as a delimiter is also a special character in regular expressions and
therefore may need to be escaped with a double backslash ('\\') to more carefully match fully-qualified
table names.
Added several new configuration properties for the MySQL connector that instruct it to hide,
truncate, and/or mask certain columns. The properties' values are all lists of regular expressions
or literal fully-qualified column names. For example, the following configuration property:
column.blacklist=server.users.picture,server.users.other
will cause the connector to leave out of change event messages for the `server.users` table those
fields that correspond to the `picture` and `others` columns. This capability can be used to
This capability can be used to prevent dissemination of sensitive information in the change event
stream.
An alternative to blacklisting is masking. The following configuration property:
column.mask.with.10.chars=server\\.users\\.(\\w*email)
will cause the connector to mask in the change event messages for the `server.users` table
all values for columns whose name ends in `email`. The values will be replaced in this case with
a constant string of 10 asterisk ('*') characters, even when the email value is null.
This capability can also be used to prevent dissemination of sensitive information in the change event
stream.
Another option is to truncate string values for specific columns. The following configuration
property:
column.truncate.to.120.chars=server[.]users[.](description|biography)
will cause the connector to truncate to at most 120 characters the values of the `description` and
`biography` columns in the change event messages for the `server.users` table. Although this example
used a limit of 120 characters, any positive length can be specified; separate properties should
be used when different lengths are required. Note how the '.' delimiter in the fully-qualified names
is escaped since that same character is a special character in regular expressions. This capability
can be used to reduce the size of change event messages.
Drop table/view statements that involve more than one table generate one event for each table/view. Previously, each of those statements had the original multi-table/view statement. Now, each event has a statement that applies to only that table (generated from the original with all the same clauses).
The previous change did not correctly capture the statements for a `RENAME TO` that renamed multiple tables, so fixed the code so that it generates a single `RENAME TO` for each table rename.
Refactored the mechanism by which components can listen to the activities of a DDL parser. The new approach
should be significantly more flexible for additional types of DDL events while making it easier to maintain
backward compatibility. It also will enable passing event-specific information on each DDL event.