Commit Graph

1544 Commits

Author SHA1 Message Date
Randall Hauch
e91aac5b18 DBZ-37 DatabaseHistory can now use custom logic to compare offsets
DatabaseHistory stores the DDL changes with the offset describing the position in the source where those DDL statements were found. When a connector restarts at a specific offset (supplied by Kafka Connect), connectors such as the MySQL connector reconstruct the database schemas by having DatabaseHistory load the history starting from the beginning and stopping at (or just before) the connector's starting offset. This change allows connectors to supply a custom comparison function.

To support GTIDs, the MySQL connector needed to store additional information in the offsets. This means the logic needed to compare offsets with and without GTIDs is non-trivial and unique to the MySQL connector. This commit adds a custom comparison function for offsets.

Per [MySQL documentation](https://dev.mysql.com/doc/refman/5.7/en/replication-gtids-failover.html), slaves are always expected to start with the same set of GTIDs as the master, so no matter which the MySQL connector follows it should always have the complete set of GTIDs seen by that server. Therefore:

* Two offsets with GTID sets can be compared using only the GTID sets.
* Any offset with a GTID set is always assumed to be newer than an offset without, since it is assumed once GTIDs are enabled they will remain enabled. (Otherwise, the connector likely needs to be restarted with a snapshot and tied to a specific master or slave with no failover.)
* Two offsets without GTIDs are compared using the binlog coordinates (filename, position, and row number).
* An offsets that is identical to another except for being in snapshot mode is considered earlier than without the snapshot. This is because snapshot mode begins by recording the position of the snapshot, and once complete the offset is recorded without the snapshot flag.
2016-06-04 16:20:26 -05:00
Randall Hauch
655aac7d4f DBZ-37 Added support for MySQL GTIDs
The BinlogClient library our MySQL connector uses already has support for GTIDs. This change makes use of that and adds the GTIDs from the server to the offsets created by the connector and used upon restarts.
2016-06-02 18:30:26 -05:00
Randall Hauch
264a9041df DBZ-64 Added Avro Converter to record verification utilities
The `VerifyRecord` utility class has methods that will verify a `SourceRecord`, and is used in many of our integration tests to check whether records are constructed in a valid manner. The utility already checks whether the records can be serialized and deserialized using the JSON converter (provided with Kafka Connect); this change also checks with the Avro Converter (which produces much smaller records and is more suitable for production).

Note that version 3.0.0 of the Confluent Avro Converter is required; version 2.1.0-alpha1 could not properly handle complex Schema objects with optional fields (see https://github.com/confluentinc/schema-registry/pull/280).

Also, the names of the Kafka Connect schemas used in MySQL source records has changed.

# The record's envelope Schema used to be "<serverName>.<database>.<table>" but is now "<serverName>.<database>.<table>.Envelope".
# The Schema for record keys used to be named "<database>.<table>/pk", but the '/' character is not valid within a Avro name, and has been changed to "<serverName>.<database>.<table>.Key".
# The Schema for record values used to be named "<database>.<table>", but to better fit with the other Schema names it has been changed to "<serverName>.<database>.<table>.Value".

Thus, all of the Schemas for a single database table have the same Avro namespace "<serverName>.<database>.<table>" (or "<topicName>") with Avro schema names of "Envelope", "Key", and "Value".

All unit and integration tests pass.
2016-06-02 16:54:21 -05:00
Randall Hauch
46c0ce9882 DBZ-58 Added MDC logging contexts to connector
Changed the MySQL connector to make use of MDC logging contexts, which allow thread-specific parameters that can be written out on every log line by simply changing the logging configuration (e.g., Log4J configuration file).

We adopt a convention for all Debezium connectors with the following MDC properties:

* `dbz.connectorType` - the type of connector, which would be a single well-known value for each connector (e.g., "MySQL" for the MySQL connector)
* `dbz.connectorName` - the name of the connector, which for the MySQL connector is simply the value of the `server.name` property (e.g., the logical name for the MySQL server/cluster). Unfortunately, Kafka Connect does not give us its name for the connector.
* `dbz.connectorContext` - the name of the thread, which is "main" for thread running the connector; the MySQL connector uses "snapshot" for the thread started by the snapshot reader, and "binlog" for the thread started by the binlog reader.

Different logging frameworks have their own way of using MDC properties. In a Log4J configuration, for example, simply use `%X{name}` in the logger's layout, where "name" is one of the properties listed above (or another MDC property).
2016-06-02 14:05:06 -05:00
Randall Hauch
58a5d8c033 DBZ-31 Added support for possibly performing snapshot upon startup
Refactored the MySQL connector to break out the logic of reading the binlog into a separate class, added a similar class to read a full snapshot, and then updated the MySQL connector task class to use both. Added several test cases and updated the existing tests.
2016-06-01 21:40:53 -05:00
Randall Hauch
e6c0ff5e4d DBZ-31 Refactored the MySQL Connector
Several of the MySQL connector classes were fairly large and complicated, and to prepare for upcoming changes/enhancements these larger classes were refactored to pull out units of functionality. Currently all unit tests pass with these changes, with additional unit tests for these new components.
2016-05-26 15:58:58 -05:00
Randall Hauch
24e99fb28f DBZ-31 DDL parser now supports '#' as comment line prefix 2016-05-26 15:40:50 -05:00
Randall Hauch
dc5a379764 DBZ-55 Corrected filtering of DDL statements based upon affected database
Previously, the DDL statements were being filtered and recorded based upon the name of the database that appeared in the binlog. However, that database name is actually the name of the database to which the client submitting the operation is connected, and is not necessarily the database _affected_ by the operation (e.g., when an operation includes a fully-qualified table name not in the connected-to database).

With these changes, the table/database affected by the DDL statements is now being used to filter the recording of the statements. The order of the DDL statements is still maintained, but since each DDL statement can apply to a separate database the DDL statements are batched (in the same original order) based upon the affected database. For example, two statements affecting "db1" will get batched together into one schema change record, followed by one statement affecting "db2" as a second schema change record, followed by another statement affecting "db1" as a third schema record.

Meanwhile, this change does not affect how the database history records the changes: it still records them as submitted using a single record for each separate binlog event/position. This is much safer as each binlog event (with specific position) is written atomically to the history stream. Also, since the database history stream is what the connector uses upon recovery, the database history records are now written _after_ any schema change records to ensure that, upon recovery after failure, no schema change records are lost (and instead have at-least-once delivery guarantees).
2016-05-23 11:01:27 -05:00
Randall Hauch
07315f2b4b DBZ-43 Changed form of schema change topic to use schemas 2016-05-19 16:54:22 -05:00
Randall Hauch
c0b7114424 DBZ-52 Added top-level container structure to all messages
The new envelope Struct contains fields for the local time at which the connector processed the event, the kind of operation (e.g., read, insert, update, or delete), the state of the record before and after the change, and the information about the event source. The latter two items are connector-specific. The timestamp is merely the time using the connector's process clock, and no guarantees are provided about accuracy, monotonicity, or relationship to the original source event.

The envelope structure is now used as the value for each event message in the MySQL connector; they keys of the event messages remain unchanged. Note that to facilitate Kafka log compaction (which requires a null value), a delete event containing the envelope with details about the deletion is followed by a "tombstone" event that contains the same key but null value.

An example of a message value with this new envelope is as follows:

{
    "schema" : {
      "type" : "struct",
      "fields" : [ {
        "type" : "struct",
        "fields" : [ {
          "type" : "int32",
          "optional" : false,
          "name" : "org.apache.kafka.connect.data.Date",
          "version" : 1,
          "field" : "order_date"
        }, {
          "type" : "int32",
          "optional" : false,
          "field" : "purchaser"
        }, {
          "type" : "int32",
          "optional" : false,
          "field" : "quantity"
        }, {
          "type" : "int32",
          "optional" : false,
          "field" : "product_id"
        } ],
        "optional" : true,
        "name" : "connector_test.orders",
        "field" : "before"
      }, {
        "type" : "struct",
        "fields" : [ {
          "type" : "int32",
          "optional" : false,
          "name" : "org.apache.kafka.connect.data.Date",
          "version" : 1,
          "field" : "order_date"
        }, {
          "type" : "int32",
          "optional" : false,
          "field" : "purchaser"
        }, {
          "type" : "int32",
          "optional" : false,
          "field" : "quantity"
        }, {
          "type" : "int32",
          "optional" : false,
          "field" : "product_id"
        } ],
        "optional" : true,
        "name" : "connector_test.orders",
        "field" : "after"
      }, {
        "type" : "struct",
        "fields" : [ {
          "type" : "string",
          "optional" : false,
          "field" : "server"
        }, {
          "type" : "string",
          "optional" : false,
          "field" : "file"
        }, {
          "type" : "int64",
          "optional" : false,
          "field" : "pos"
        }, {
          "type" : "int32",
          "optional" : false,
          "field" : "row"
        } ],
        "optional" : false,
        "name" : "io.debezium.connector.mysql.Source",
        "field" : "source"
      }, {
        "type" : "string",
        "optional" : false,
        "field" : "op"
      }, {
        "type" : "int64",
        "optional" : true,
        "field" : "ts"
      } ],
      "optional" : false,
      "name" : "kafka-connect-2.connector_test.orders",
      "version" : 1
    },
    "payload" : {
      "before" : null,
      "after" : {
        "order_date" : 16852,
        "purchaser" : 1003,
        "quantity" : 1,
        "product_id" : 107
      },
      "source" : {
        "server" : "kafka-connect-2",
        "file" : "mysql-bin.000002",
        "pos" : 2887680,
        "row" : 4
      },
      "op" : "c",
      "ts" : 1463437199134
    }
}

Notice how the Schema is significantly larger, since it must describe all of the envelope's fields even when those fields are not used. In this case, the event signifies that a record was created as the 4th record of a single event recorded in the binlog.
2016-05-19 12:40:16 -05:00
Randall Hauch
6d56a8f3d0 DBZ-50 Added parameters for truncated length and when the field is masked. 2016-05-12 16:31:33 -05:00
Randall Hauch
b1e6eb1028 DBZ-29 Refactored ColumnMappers and enabled ColumnMapper impls to add parameters to the Kafka Connect Schema. 2016-05-12 12:26:04 -05:00
Randall Hauch
18995abfbd Merge pull request #38 from rhauch/dbz-29
DBZ-29 Changed MySQL connector to be able to hide, truncate, and mask specific columns
2016-05-12 08:27:15 -05:00
Randall Hauch
ff9d0fc240 DBZ-29 Changed MySQL connector to be able to hide, truncate, and mask specific columns
Changed the MySQL connector to use comma-separated lists of regular expressions for the database
and table whitelist/blacklists. Literals are still accepted and will match fully-qualified table names,
although the '.' character used as a delimiter is also a special character in regular expressions and
therefore may need to be escaped with a double backslash ('\\') to more carefully match fully-qualified
table names.

Added several new configuration properties for the MySQL connector that instruct it to hide,
truncate, and/or mask certain columns. The properties' values are all lists of regular expressions
or literal fully-qualified column names. For example, the following configuration property:

    column.blacklist=server.users.picture,server.users.other

will cause the connector to leave out of change event messages for the `server.users` table those
fields that correspond to the `picture` and `others` columns. This capability can be used to
This capability can be used to prevent dissemination of sensitive information in the change event
stream.

An alternative to blacklisting is masking. The following configuration property:

    column.mask.with.10.chars=server\\.users\\.(\\w*email)

will cause the connector to mask in the change event messages for the `server.users` table
all values for columns whose name ends in `email`. The values will be replaced in this case with
a constant string of 10 asterisk ('*') characters, even when the email value is null.
This capability can also be used to prevent dissemination of sensitive information in the change event
stream.

Another option is to truncate string values for specific columns. The following configuration
property:

    column.truncate.to.120.chars=server[.]users[.](description|biography)

will cause the connector to truncate to at most 120 characters the values of the `description` and
`biography` columns in the change event messages for the `server.users` table. Although this example
used a limit of 120 characters, any positive length can be specified; separate properties should
be used when different lengths are required. Note how the '.' delimiter in the fully-qualified names
is escaped since that same character is a special character in regular expressions. This capability
can be used to reduce the size of change event messages.
2016-05-11 15:57:06 -05:00
Christian Posta
8b736ef654 DBZ-48 Cannot parse COMMIT and flush statements 2016-05-05 15:36:24 -07:00
Randall Hauch
1fcb4b02cf DBZ-38 Changed DROP VIEW and TABLE to include single-table statements in events
Drop table/view statements that involve more than one table generate one event for each table/view. Previously, each of those statements had the original multi-table/view statement. Now, each event has a statement that applies to only that table (generated from the original with all the same clauses).
2016-04-12 18:18:13 -05:00
Randall Hauch
b1e428c986 DBZ-38 Adjusted how events are generated for RENAME TO statements
The previous change did not correctly capture the statements for a `RENAME TO` that renamed multiple tables, so fixed the code so that it generates a single `RENAME TO` for each table rename.
2016-04-12 17:58:07 -05:00
Randall Hauch
5b30568650 DBZ-38 Changed the listening framework of the DDL parser
Refactored the mechanism by which components can listen to the activities of a DDL parser. The new approach
should be significantly more flexible for additional types of DDL events while making it easier to maintain
backward compatibility. It also will enable passing event-specific information on each DDL event.
2016-04-12 11:00:02 -05:00
Randall Hauch
137b9f6d4d DBZ-38 Changed the DDL parser framework to notify listeners as statements are applied. 2016-04-11 15:16:04 -05:00
Randall Hauch
8f5487b2c0 [maven-release-plugin] prepare for next development iteration 2016-03-17 16:28:40 -05:00
Randall Hauch
c2b8ac50ae [maven-release-plugin] prepare release v0.1.0 2016-03-17 16:28:40 -05:00
Randall Hauch
43f79aad5e Added missing version element to modules 2016-03-17 16:14:17 -05:00
Randall Hauch
5a002dbf62 DBZ-15 Cached converters are now dropped upon log rotation. 2016-03-17 11:03:28 -05:00
Randall Hauch
4998325de7 DBZ-30 Changed the MySQL connector to include all columns in the record value 2016-03-04 10:51:14 -06:00
Randall Hauch
9034e26d1e DBZ-26 Corrected the embedded connector framework to enable stopping. Also improved logging statements. 2016-03-03 15:27:11 -06:00
Randall Hauch
1d46e59048 DBZ-17 Minor changes to the POMs 2016-02-18 13:58:29 -06:00
Randall Hauch
2e5dfd837b DBZ-13 Minor code changes to eliminate JavaDoc warnings 2016-02-17 11:15:21 -06:00
Randall Hauch
73f3c9836b DBZ-1 Completed integration testing and debugging of the MySQL connector 2016-02-15 14:46:12 -06:00
Randall Hauch
1a59f9b07c DBZ-11 Build can skip long-running unit and integration tests 2016-02-04 15:35:27 -06:00
Randall Hauch
54b822bb72 DBZ-10 Added small utility so unit tests can run an embedded Kafka cluster within the same process.
This utility is only suitable for unit tests and therefore is defined in the test JAR of the `debezium-core` module. It certainly should never be used for production purposes.
2016-02-04 15:18:27 -06:00
Randall Hauch
c501f8486f DBZ-9 Added MySQL whitelist and blacklists on tables and databases. 2016-02-04 07:56:13 -06:00
Randall Hauch
37d6a5e7da DBZ-1 Expanded documentation and improved EmbeddedConnector framework
Changed the EmbeddedConnector framework to initialize all major components via configuration properties rather than through the public builder. This increases the size of the configurations, but it simplifies what embedding applications must do to obtain an EmbeddedConnector instance.

The DatabaseHistory framework was also changed to be configurable in similar ways to the OffsetBackingStore. Essentially, connectors that want to use it (like the MySqlConnector) will describe it as part of the connector's configuration, allowing more flexibility in which DatabaseHistory implementation is used and how it is configured whether in Kafka Connector or as part of the EmbeddedConnector.

Added a README.md to `debezium-embedded` to provide documentation and sample code showing how to use the EmbeddedConnector.
2016-02-03 14:11:53 -06:00
Randall Hauch
2da5b37f76 DBZ-1 Added support for recording and recovering database schema
Adds a small framework for recording the DDL operations on the schema state (e.g., Tables) as they are read and applied from the log, and when restarting the connector task to recover the accumulated schema state. Where and how the DDL operations are recorded is an abstraction called `DatabaseHistory`, with three options: in-memory (primarily for testing purposes), file-based (for embedded cases and perhaps standalone Kafka Connect uses), and Kafka (for normal Kafka Connect deployments).

The `DatabaseHistory` interface methods take several parameters that are used to construct a `SourceRecord`. The `SourceRecord` type was not used, however, since that would result in this interface (and potential extension mechanism) having a dependency on and exposing the Kafka API. Instead, the more general parameters are used to keep the API simple.

The `FileDatabaseHistory` and `MemoryDatabaseHistory` implementations are both fairly simple, but the `FileDatabaseHistory` relies upon representing each recorded change as a JSON document. This is simple, is easily written to files, allows for recovery of data from the raw file, etc. Although this was done initially using Jackson, the code to read and write the JSON documents required a lot of boilerplate. Instead, the `Document` framework developed during Debezium's very early prototype stages was brought back. It provides a very usable API for working with documents, including the ability to compare documents semantically (e.g., numeric values are converted to be able to compare their numeric values rather than just compare representations) and with or without field order.

The `KafkaDatabaseHistory` is a bit more complicated, since it uses a Kafka broker to record all database schema changes on a single topic with single partition, and then upon restart uses it to recover the history from the dedicated topics. This implementation also records the changes as JSON documents, keeping it simple and independent of the Kafka Connect converters.
2016-02-02 14:27:14 -06:00
Randall Hauch
6796fe32be DBZ-1 Added the initial stages of a MySQL source connector
The connector is in a basic working state, although it is not well tested yet and upon restart does not recover the schema state from the previous run.
2016-01-29 10:12:28 -06:00
Randall Hauch
d9090ed67b DBZ-4 Removed unused files, most of which were originally copied from the ModeShape codebase. 2016-01-27 08:37:23 -06:00
Randall Hauch
4c538d4e54 DBZ-4 Changed copyright statement in source code headers and adjusted checkstyle rules. 2016-01-27 08:12:01 -06:00
Randall Hauch
eff1f665fa Updated checkstyle rule for headers, and corrected several incorrect headers. 2016-01-25 18:59:25 -06:00
Randall Hauch
a0a8953d2a Updated the copyright dates per new approach. 2016-01-25 18:33:08 -06:00
Randall Hauch
4ddd4b33be Changed Docker usage on Travis-CI 2016-01-25 16:12:07 -06:00
Randall Hauch
772977f391 Attempted to correct Travis build error 2016-01-25 13:50:17 -06:00
Randall Hauch
5e4c428285 Correct return type for function 2016-01-25 13:41:38 -06:00
Randall Hauch
71e90b5a69 Added MySQL ingest module with support for reading DDL statements. 2016-01-23 08:26:52 -06:00
Randall Hauch
8e6c615644 Added utilities for managing a relational schema's table definitions, with support for updating those by reading DDL 2016-01-20 08:53:29 -06:00
Randall Hauch
dffdfd8049 Added debezium-core and MySQL binary log reading tests. 2015-11-24 15:54:37 -06:00