The ENUM and SET values read from the binlog contain the indexes of the options that are included in the value, but this doesn't compared with the string values returned by MySQL and JDBC that contain the comma-separated options. With this change, the values read from the binlog will also be comma-separated strings.
Rewrote how the MySQL connector converts temporal values to use schemas with names that identify the semantic
type of temporal value, and customized how the MySQL binlog client library creates Java object values from the
raw binlog events.
Several new "semantic" schema types were defined:
* `io.debezium.time.Year` represents a year number as an INT32 value (e.g., 2016, -345, etc.).
* `io.debezium.time.Date` represents a date by storing the epoch seconds (that is, the number of seconds past the epoch) as an INT64 value.
* `io.debezium.time.Time` represents a time by storing the milliseconds past midnight as an INT32 value.
* `io.debezium.time.MicroTime` represents a time by storing the microsconds past midnight as an INT32 value.
* `io.debezium.time.NanoTime` represents a time by storing the nanoseconds past midnight as an INT32 value.
* `io.debezium.time.Timestamp` represents a date and time (without timezone information) by storing the milliseconds past epoch as an INT64 value.
* `io.debezium.time.MicroTimestamp` represents a date and time (without timezone information) by storing the microseconds past epoch as an INT64 value.
* `io.debezium.time.NanoTimestamp` represents a date and time (without timezone information) by storing the nanoseconds past epoch as an INT64 value.
* `io.debezium.time.ZonedTime` represents a time with timezone and optional fractions of a second (but no date) by storing the ISO8601 form as a STRING value (e.g., `10:15:30+01:00`)
* `io.debezium.time.ZonedTimestamp` represents a date and time with timezone and optional fractions of a second by storing the ISO8601 form as a STRING value (e.g., `2011-12-03T10:15:30.030431+01:00`)
This range of semantic types allows for a far more accurate representation in the events of the temporal values stored within the database. The MySQL connector chooses the semantic type based upon the precision of the MySQL type (e.g., `TIMESTAMP(6)` will be represented with `io.debezium.time.MicroTimestamp`, whereas `TIMESTAMP(3)` will be represented with `io.debezium.time.Timestamp`). This ensures that the events do not lose precision and that the semantics of the database column values are retained in the events even though the values are represented with primitive values.
Obviously these Kafka Connect schema representations are different and more precise than the built-in `org.apache.kafka.connect.data.Date`, `org.apache.kafka.connect.data.Time`, and `org.apache.kafka.connect.data.Timestamp` logical types provided by Kafka Connect and used by the MySQL connector in all 0.2.x and 0.1.x versions. Migration to the new MySQL connector should be possible, although consumers may still need to know about these types to properly handle temporal values and the correct precision (i.e., consumers can just assume all date INT64 values represent milliseconds).
The MySQL binlog client library converted the raw binary event information to JDBC types using a local Calendar instance, which obviously incorporates the local timezone and cannot retain more than millisecond precision. This change extends the library's deserializers to instead use the Java 8 `javax.time` classes and to retain the exact semantics of the database values and to not lose any precisions (since the `javax.time` classes have nanosecond precision).
The same logic is also used to convert the JDBC values obtained during a snapshot from the MySQL Connect/J JDBC driver. The latter has a few quirks, such as not returning any fractional seconds for `TIME` columns, even though `java.sql.Time` can store up to milliseconds.
Most of the logic of the conversions of values and mapping to Kafka Connect schemas is handled in the new `JdbcValueConverters`, which was extracted from the existing `TableSchemaBuilder`. The MySQL connector reuses and actually extends the `JdbcValueConverters` class with its own `MySqlValueConverters` class that also adds support for MySQL-specific types such as `YEAR`. Other connectors whose values are based on JDBC types should be able to reuse and/or extend the `JdbcValueConverters` class.
Integration tests that deal with temporal types were modified to use proper expected values and comparisons.
By default the MySQL JDBC driver will put the entire result set into memory, which obviously doesn't work for tables of even moderate sizes. This change adds support for streaming rows in result sets when the tables have more than a configurable number of rows (defaults to 1,000).
This posed a problem for how we were previously finding the last row in the last table; the MySQL driver does not support `ResultSet.isLast()` on result sets that are streamed. Instead, this commit wraps the consumer to which the snapshot reader writes all source records, with a consumer that buffers the last record. When the snapshot completes, the offset is updated (denoting the end of the snapshot) and set on the last buffered record before that record is flushed to the normal consumer. This should add minimal overhead while simplifying the logic to ensure the last source record has the updated offset.
This also improves the log output of the snapshot process.
The mysql:5.7 docker image changed its output to be more like mysql/mysql-server:5.7, and this broke our build because of what our build is looking for while waiting to for the server to completely intialize. Simply changing the pattern corrects the problem.
Upgraded from Kafka 0.9.0.1 to Kafka 0.10.0. The only required change was to override the `Connector.config()` method, which returns `null` or a `ConfigDef` instance that contains detailed metadata for each of the configuration fields, including supporting recommended values and marking fields as not visible (e.g., if they don't make sense given other configuration field values). This can be used by user interfaces to data-drive the configuration of a connector. Also, the default validation logic of the Connector implementations uses a `Validator` that is pretty restrictive in its functionality.
Debezium already had a fairly decent and simple `Configuration` framework. After several attempts to try and merge these concepts, reconciling the two validation mechanisms was very complicated and involved a lot of changes. It was easier to simply continue Debezium-specific validation and to override the `Connector.validate(...)` method to use Debezium's `Configuration`-based validation. Connector-based validation logic includes determining recommended values, so Debezium's `Field` class (used to define each configuration property) was enhanced with a new `Recommender` class that is similar to Kafka's.
Additional integration tests were added to verify that the `ConfigDef` result is acceptable and that the new connector validation logic works as expected, including getting recommended values for some fields (e.g., database names, table/collection names) from MySQL and MongoDB by connecting and dynamically reading the values. This was done in a way that remains backward compatible with the regular expression formats of these fields, but in a user interface that uses the `ConfigDef` mechanism the user can simply select the databases and table/collection identifiers.
Added an integration test case to diagnose the loss of the fractional seconds from MySQL temporal values. The problem appears to be a bug in the MySQL Binary Log Connector library that we used, and this bug was reported as https://github.com/shyiko/mysql-binlog-connector-java/issues/103. That was fixed in version 0.3.2 of the library, which Stanley was kind enough to release for us.
During testing, though, several issues were discovered in how temporal values are handled and converted from the MySQL events, through the MySQL Binary Log client library, and through the Debezium MySQL connector to conform with Kafka Connect's various temporal logical schema types. Most of the issues involved converting most of the temporal values from local time zone (which is how they are created by the MySQL Binary Log client) into UTC (which is how Kafka Connect expects them). Really, java.util.Date doesn't have time zone information and instead tracks the number of milliseconds past epoch, but the conversion of normal timestamp information to the milliseconds past epoch in UTC depends on the time zone in which that conversion happens.
The MySQL connector now maps TINYINT and SMALLINT columns to INT16 (rather than INT32) because INT16 is smaller and yet still large enough for all TINYINT and SMALLINT values. Note that the range of TINYINT values is either -128 to 127 for signed or 0 to 255 for unsigned, and thus INT8 is not an acceptable choice since it can only handle values in the range 0 to 255. Additionally, the JDBC Specification also suggests the proper Java type for SQL-99's TINYINT is short, which maps to Kafka Connect's INT16.
This change will be backward compatible, although the generated Kafka Connect schema will be different than in previous versions. This shouldn't cause a problem, since clients should expect to handle schema changes, and this schema change does comply with Avro schema evolution rules.
Added a new `debezium-connector-mongodb` module that defines a MongoDB connector. The MongoDB connector can capture and record the changes within a MongoDB replica set, or when seeded with addresses of the configuration server of a MongoDB sharded cluster, the connector captures the changes from the each replica set used as a shard. In the latter case, the connector even discovers the addition of or removal of shards.
The connector monitors each replica set using multiple tasks and, if needed, separate threads within each task. When a replica set is being monitored for the first time, the connector will perform an "initial sync" of that replica set's databases and collections. Once the initial sync has completed, the connector will then begin tailing the oplog of the replica set, starting at the exact point in time at which it started the initial sync. This equivalent to how MongoDB replication works.
The connector always uses the replica set's primary node to tail the oplog. If the replica set undergoes an election and different node becomes primary, the connector will immediately stop tailing the oplog, connect to the new primary, and start tailing the oplog using the new primary node. Likewise, if connector experiences any problems communicating with the replica set members, it will try to reconnect (using exponential backoff so as to not overwhelm the replica set) and continue tailing the oplog from where it last left off. In this way the connector is able to dynamically adjust to changes in replica set membership and to automatically handle communication failures.
The MongoDB oplog contains limited information, and in particular the events describing updates and deletes do not actually have the before or after state of the documents. Instead, the oplog events are all idempotent, so updates contain the effective changes that were made during an update, and deletes merely contain the deleted document identifier. Consequently, the connector is limited in the information it includes in its output events. Create and read events do contain the initial state, but the update contain only the changes (rather than the before and/or after states of the document) and delete events do not have the before state of the deleted document. All connector events, however, do contain the local system timestamp at which the event was processed and _source_ information detailing the origins of the event, including the replica set name, the MongoDB transaction timestamp of the event, and the transactions identifier among other things.
It is possible for MongoDB to lose commits in specific failure situations. For exmaple, if the primary applies a change and records it in its oplog before it then crashes unexpectedly, the secondary nodes may not have had a chance to read those changes from the primary's oplog before the primary crashed. If one such secondary is then elected as primary, it's oplog is missing the last changes that the old primary had recorded and no longer has those changes. In these cases where MongoDB loses changes recorded in a primary's oplog, it is possible that the MongoDB connector may or may not capture these lost changes.
Removed several of the `GtidSet` convenience methods that are not in the [improved](https://github.com/shyiko/mysql-binlog-connector-java/pull/100) `com.github.shyiko.mysql.binlog.GtidSet` class. Getting these out of our API will make it easier to reuse the improved `com.github.shyiko.mysql.binlog.GtidSet` class.
The snapshot mode within the offsets now are marked as complete with the last source record produced during the snapshot. This is the only sure way to update the offset.
Note that the `source` field shows the snapshot is in effect for _all_ records produced during the snapshot, including the very last one. This distinction w/r/t the offset was made possible due to recent changes for DBZ-73.
Previously, when the snapshot reader completed all generation of records, it then attempted to record an empty DDL statement. However, since this statement had no net effect on the schemas, no source record was produced and thus the offset's snapshot mode was never changed. Consequently, if the connector were stopped immediately after the snapshot completed but before other events could be read or produced, upon restart the connector would perform another snapshot.
Fixes two issues with how the binlog coordinates are handled.
The first, DBZ-73, fixes how the offsets are recording the _next_ binlog coordinates within the offsets, which is fine for single-row events but which can result in dropped events should Kafka Connect flush the offset of some but not all of the rows before the Kafka Connect crashes. Upon restart, the offset contains the binlog coordinates for the _next_ event, so any of the last rows from the previous events will be lost.
With this fix, the offset used with all but the last row (in the binlog event) has the binlog coordinates of the current event, with the event row number set to be the next row that needs to be processed. The offset for the last row will have the binlog coordinates of the next event.
The second issue, DBZ-76, is somewhat related: the `source` field of the change events has the binlog coordinates of the _next_ issue. The fix involves putting the binlog coordinates for the _current_ event into the `source` field.
Both of these issues are related and influenced a fix that could address both problems. Essentially, the `SourceInfo` is now recording the previous and next position, and the next and previous row numbers. The offset is created with parameters that specify the row number and the total number of rows, so this method correctly adjusts the binlog coordinates of the offset. The `struct` field produces the value for the `source` field, and it is always using the previous position and previous row number that reflect the change event in which it is used.
Changed the build so that the `assembly` profile runs the MySQL integration tests three times, once against each of the three MySQL configurations:
# MySQL server w/o GTIDs
# MySQL server w/ GTIDs
# The Docker team's MySQL server image w/o GTIDs
The normal profiles are still available:
# The default profile runs the integration tests once against MySQL server w/o GTIDs
# `gtid-mysql` runs the integration tests against MySQL server w/ GTIDs
# `alt-mysql` runs the integration tests against the Docker team's MySQL server image w/o GTIDs
# `skip-integration-tests` (or `-DskipITs`) skips the integration tests altogether
Binary values read from the MySQL binlog may include strings, in which case they need to be converted to binary values.
Interestingly, work on this uncovered [KAFKA-3803](https://issues.apache.org/jira/browse/KAFKA-3803) whereby Kafka Connect's `Struct.equals` method does not properly handle comparing `byte[]` values. Upon researching the problem and potentially supplying a patch, it was discovered that the Kafka Connect codebase and the Avro converter all use `ByteBuffer` objects rather than `byte[]`. Consequently, the Debezium code that converts JDBC values to Kafka Connect values was changed to return `ByteBuffer` objects rather than `byte[]` objects.
Unfortunately, the JSON converter rehydrates objects with just `byte[]`, so that still means that Debezium's `VerifyRecords` logic cannot rely upon `Struct.equals` for comparison, and instead needs custom logic.
Added a Maven profile to the MySQL connector component with a Docker image that runs MySQL with GTIDs enabled. The same integration tests can be run with it using `-Pgtid-mysql` or `-Dgtid-mysql` in the Maven build.
When the MySQL connector starts up, it now queries the MySQL server to detect whether GTIDs are enabled, and if they are it will also verify that any GTID sets from the most recently recorded offset are still available in the MySQL server (similarly to how it was already doing this for binlog filenames). If the server does not have the correct coordinates/GTIDs, the connector fails with a useful error message.
This commit also tests and adjusts the `GtidSet` class to better deal with comparisons of GTID sets for proper ordering.
It also changes the connector to output MySQL's timestamp for each event using _second_ precision rather than artificially in _millisecond_ precision. To clarify the different, this change renames the field in the event's `source` structure that records the MySQL timestamp from `ts` to `ts_sec`. Similarly, the envelope's field that records the time that the connector processed each record was renamed from `ts` to `ts_ms`.
All unit and integration tests pass with the default profile and with the new GTID-enabled profile.
DatabaseHistory stores the DDL changes with the offset describing the position in the source where those DDL statements were found. When a connector restarts at a specific offset (supplied by Kafka Connect), connectors such as the MySQL connector reconstruct the database schemas by having DatabaseHistory load the history starting from the beginning and stopping at (or just before) the connector's starting offset. This change allows connectors to supply a custom comparison function.
To support GTIDs, the MySQL connector needed to store additional information in the offsets. This means the logic needed to compare offsets with and without GTIDs is non-trivial and unique to the MySQL connector. This commit adds a custom comparison function for offsets.
Per [MySQL documentation](https://dev.mysql.com/doc/refman/5.7/en/replication-gtids-failover.html), slaves are always expected to start with the same set of GTIDs as the master, so no matter which the MySQL connector follows it should always have the complete set of GTIDs seen by that server. Therefore:
* Two offsets with GTID sets can be compared using only the GTID sets.
* Any offset with a GTID set is always assumed to be newer than an offset without, since it is assumed once GTIDs are enabled they will remain enabled. (Otherwise, the connector likely needs to be restarted with a snapshot and tied to a specific master or slave with no failover.)
* Two offsets without GTIDs are compared using the binlog coordinates (filename, position, and row number).
* An offsets that is identical to another except for being in snapshot mode is considered earlier than without the snapshot. This is because snapshot mode begins by recording the position of the snapshot, and once complete the offset is recorded without the snapshot flag.
The BinlogClient library our MySQL connector uses already has support for GTIDs. This change makes use of that and adds the GTIDs from the server to the offsets created by the connector and used upon restarts.
The `VerifyRecord` utility class has methods that will verify a `SourceRecord`, and is used in many of our integration tests to check whether records are constructed in a valid manner. The utility already checks whether the records can be serialized and deserialized using the JSON converter (provided with Kafka Connect); this change also checks with the Avro Converter (which produces much smaller records and is more suitable for production).
Note that version 3.0.0 of the Confluent Avro Converter is required; version 2.1.0-alpha1 could not properly handle complex Schema objects with optional fields (see https://github.com/confluentinc/schema-registry/pull/280).
Also, the names of the Kafka Connect schemas used in MySQL source records has changed.
# The record's envelope Schema used to be "<serverName>.<database>.<table>" but is now "<serverName>.<database>.<table>.Envelope".
# The Schema for record keys used to be named "<database>.<table>/pk", but the '/' character is not valid within a Avro name, and has been changed to "<serverName>.<database>.<table>.Key".
# The Schema for record values used to be named "<database>.<table>", but to better fit with the other Schema names it has been changed to "<serverName>.<database>.<table>.Value".
Thus, all of the Schemas for a single database table have the same Avro namespace "<serverName>.<database>.<table>" (or "<topicName>") with Avro schema names of "Envelope", "Key", and "Value".
All unit and integration tests pass.
Changed the MySQL connector to make use of MDC logging contexts, which allow thread-specific parameters that can be written out on every log line by simply changing the logging configuration (e.g., Log4J configuration file).
We adopt a convention for all Debezium connectors with the following MDC properties:
* `dbz.connectorType` - the type of connector, which would be a single well-known value for each connector (e.g., "MySQL" for the MySQL connector)
* `dbz.connectorName` - the name of the connector, which for the MySQL connector is simply the value of the `server.name` property (e.g., the logical name for the MySQL server/cluster). Unfortunately, Kafka Connect does not give us its name for the connector.
* `dbz.connectorContext` - the name of the thread, which is "main" for thread running the connector; the MySQL connector uses "snapshot" for the thread started by the snapshot reader, and "binlog" for the thread started by the binlog reader.
Different logging frameworks have their own way of using MDC properties. In a Log4J configuration, for example, simply use `%X{name}` in the logger's layout, where "name" is one of the properties listed above (or another MDC property).