Commit Graph

366 Commits

Author SHA1 Message Date
Randall Hauch
e06f5c596c DBZ-43 Added explicit checking and validation of Schemas and Structs in integration tests 2016-05-19 17:06:22 -05:00
Randall Hauch
07315f2b4b DBZ-43 Changed form of schema change topic to use schemas 2016-05-19 16:54:22 -05:00
Randall Hauch
c0b7114424 DBZ-52 Added top-level container structure to all messages
The new envelope Struct contains fields for the local time at which the connector processed the event, the kind of operation (e.g., read, insert, update, or delete), the state of the record before and after the change, and the information about the event source. The latter two items are connector-specific. The timestamp is merely the time using the connector's process clock, and no guarantees are provided about accuracy, monotonicity, or relationship to the original source event.

The envelope structure is now used as the value for each event message in the MySQL connector; they keys of the event messages remain unchanged. Note that to facilitate Kafka log compaction (which requires a null value), a delete event containing the envelope with details about the deletion is followed by a "tombstone" event that contains the same key but null value.

An example of a message value with this new envelope is as follows:

{
    "schema" : {
      "type" : "struct",
      "fields" : [ {
        "type" : "struct",
        "fields" : [ {
          "type" : "int32",
          "optional" : false,
          "name" : "org.apache.kafka.connect.data.Date",
          "version" : 1,
          "field" : "order_date"
        }, {
          "type" : "int32",
          "optional" : false,
          "field" : "purchaser"
        }, {
          "type" : "int32",
          "optional" : false,
          "field" : "quantity"
        }, {
          "type" : "int32",
          "optional" : false,
          "field" : "product_id"
        } ],
        "optional" : true,
        "name" : "connector_test.orders",
        "field" : "before"
      }, {
        "type" : "struct",
        "fields" : [ {
          "type" : "int32",
          "optional" : false,
          "name" : "org.apache.kafka.connect.data.Date",
          "version" : 1,
          "field" : "order_date"
        }, {
          "type" : "int32",
          "optional" : false,
          "field" : "purchaser"
        }, {
          "type" : "int32",
          "optional" : false,
          "field" : "quantity"
        }, {
          "type" : "int32",
          "optional" : false,
          "field" : "product_id"
        } ],
        "optional" : true,
        "name" : "connector_test.orders",
        "field" : "after"
      }, {
        "type" : "struct",
        "fields" : [ {
          "type" : "string",
          "optional" : false,
          "field" : "server"
        }, {
          "type" : "string",
          "optional" : false,
          "field" : "file"
        }, {
          "type" : "int64",
          "optional" : false,
          "field" : "pos"
        }, {
          "type" : "int32",
          "optional" : false,
          "field" : "row"
        } ],
        "optional" : false,
        "name" : "io.debezium.connector.mysql.Source",
        "field" : "source"
      }, {
        "type" : "string",
        "optional" : false,
        "field" : "op"
      }, {
        "type" : "int64",
        "optional" : true,
        "field" : "ts"
      } ],
      "optional" : false,
      "name" : "kafka-connect-2.connector_test.orders",
      "version" : 1
    },
    "payload" : {
      "before" : null,
      "after" : {
        "order_date" : 16852,
        "purchaser" : 1003,
        "quantity" : 1,
        "product_id" : 107
      },
      "source" : {
        "server" : "kafka-connect-2",
        "file" : "mysql-bin.000002",
        "pos" : 2887680,
        "row" : 4
      },
      "op" : "c",
      "ts" : 1463437199134
    }
}

Notice how the Schema is significantly larger, since it must describe all of the envelope's fields even when those fields are not used. In this case, the event signifies that a record was created as the 4th record of a single event recorded in the binlog.
2016-05-19 12:40:16 -05:00
Randall Hauch
e6710a5300 DBZ-44 Generate a tombstone for old key when row's key is change
When a row is updated in the database and the primary/unique key for that table is changed, the MySQL connector continues to generate an update event with the new key and new value, but now also generates a tombstone event for the old key. This ensures that when a Kafka topic is compacted, all prior events with the old key will (eventually) be removed. It also ensures that consumers see that the row represented by the old key has been removed.
2016-05-13 17:43:29 -05:00
Randall Hauch
b1e6eb1028 DBZ-29 Refactored ColumnMappers and enabled ColumnMapper impls to add parameters to the Kafka Connect Schema. 2016-05-12 12:26:04 -05:00
Randall Hauch
ff9d0fc240 DBZ-29 Changed MySQL connector to be able to hide, truncate, and mask specific columns
Changed the MySQL connector to use comma-separated lists of regular expressions for the database
and table whitelist/blacklists. Literals are still accepted and will match fully-qualified table names,
although the '.' character used as a delimiter is also a special character in regular expressions and
therefore may need to be escaped with a double backslash ('\\') to more carefully match fully-qualified
table names.

Added several new configuration properties for the MySQL connector that instruct it to hide,
truncate, and/or mask certain columns. The properties' values are all lists of regular expressions
or literal fully-qualified column names. For example, the following configuration property:

    column.blacklist=server.users.picture,server.users.other

will cause the connector to leave out of change event messages for the `server.users` table those
fields that correspond to the `picture` and `others` columns. This capability can be used to
This capability can be used to prevent dissemination of sensitive information in the change event
stream.

An alternative to blacklisting is masking. The following configuration property:

    column.mask.with.10.chars=server\\.users\\.(\\w*email)

will cause the connector to mask in the change event messages for the `server.users` table
all values for columns whose name ends in `email`. The values will be replaced in this case with
a constant string of 10 asterisk ('*') characters, even when the email value is null.
This capability can also be used to prevent dissemination of sensitive information in the change event
stream.

Another option is to truncate string values for specific columns. The following configuration
property:

    column.truncate.to.120.chars=server[.]users[.](description|biography)

will cause the connector to truncate to at most 120 characters the values of the `description` and
`biography` columns in the change event messages for the `server.users` table. Although this example
used a limit of 120 characters, any positive length can be specified; separate properties should
be used when different lengths are required. Note how the '.' delimiter in the fully-qualified names
is escaped since that same character is a special character in regular expressions. This capability
can be used to reduce the size of change event messages.
2016-05-11 15:57:06 -05:00
Randall Hauch
8f5487b2c0 [maven-release-plugin] prepare for next development iteration 2016-03-17 16:28:40 -05:00
Randall Hauch
c2b8ac50ae [maven-release-plugin] prepare release v0.1.0 2016-03-17 16:28:40 -05:00
Randall Hauch
43f79aad5e Added missing version element to modules 2016-03-17 16:14:17 -05:00
Randall Hauch
9034e26d1e DBZ-26 Corrected the embedded connector framework to enable stopping. Also improved logging statements. 2016-03-03 15:27:11 -06:00
Randall Hauch
1d46e59048 DBZ-17 Minor changes to the POMs 2016-02-18 13:58:29 -06:00
Randall Hauch
73f3c9836b DBZ-1 Completed integration testing and debugging of the MySQL connector 2016-02-15 14:46:12 -06:00
Randall Hauch
70fc601c0f DBZ-8 Added documentation about embedded engines. 2016-02-03 16:09:43 -06:00
Randall Hauch
fbae6d75c8 DBZ-1 Renamed EmbeddedConnector to EmbeddedEngine and improved README 2016-02-03 15:33:57 -06:00
Randall Hauch
37d6a5e7da DBZ-1 Expanded documentation and improved EmbeddedConnector framework
Changed the EmbeddedConnector framework to initialize all major components via configuration properties rather than through the public builder. This increases the size of the configurations, but it simplifies what embedding applications must do to obtain an EmbeddedConnector instance.

The DatabaseHistory framework was also changed to be configurable in similar ways to the OffsetBackingStore. Essentially, connectors that want to use it (like the MySqlConnector) will describe it as part of the connector's configuration, allowing more flexibility in which DatabaseHistory implementation is used and how it is configured whether in Kafka Connector or as part of the EmbeddedConnector.

Added a README.md to `debezium-embedded` to provide documentation and sample code showing how to use the EmbeddedConnector.
2016-02-03 14:11:53 -06:00
Randall Hauch
2da5b37f76 DBZ-1 Added support for recording and recovering database schema
Adds a small framework for recording the DDL operations on the schema state (e.g., Tables) as they are read and applied from the log, and when restarting the connector task to recover the accumulated schema state. Where and how the DDL operations are recorded is an abstraction called `DatabaseHistory`, with three options: in-memory (primarily for testing purposes), file-based (for embedded cases and perhaps standalone Kafka Connect uses), and Kafka (for normal Kafka Connect deployments).

The `DatabaseHistory` interface methods take several parameters that are used to construct a `SourceRecord`. The `SourceRecord` type was not used, however, since that would result in this interface (and potential extension mechanism) having a dependency on and exposing the Kafka API. Instead, the more general parameters are used to keep the API simple.

The `FileDatabaseHistory` and `MemoryDatabaseHistory` implementations are both fairly simple, but the `FileDatabaseHistory` relies upon representing each recorded change as a JSON document. This is simple, is easily written to files, allows for recovery of data from the raw file, etc. Although this was done initially using Jackson, the code to read and write the JSON documents required a lot of boilerplate. Instead, the `Document` framework developed during Debezium's very early prototype stages was brought back. It provides a very usable API for working with documents, including the ability to compare documents semantically (e.g., numeric values are converted to be able to compare their numeric values rather than just compare representations) and with or without field order.

The `KafkaDatabaseHistory` is a bit more complicated, since it uses a Kafka broker to record all database schema changes on a single topic with single partition, and then upon restart uses it to recover the history from the dedicated topics. This implementation also records the changes as JSON documents, keeping it simple and independent of the Kafka Connect converters.
2016-02-02 14:27:14 -06:00