The cache mechanism had to be adapted in order to support non-versioned
and versioned schemas, a test now confirms it's the same valueSchema
instance created once.
To allow more flexibility we should proxy the new Event Schema fields
based on the original table columns which Debezium is capturing.
The Schema is now built one time during the first Record in order to
detect those types which are only available within the Record.
It's a good practice to depend on the Kafka metadata instead of custom
dates in the payload, this way for instance when using KStreams with
Tumbling Window the dates are correctly matched.
This commit does a few things:
- Refactors snapshot modes to be encapsulated by an interface and
to only use that interface in determining when to snapshot and in
determing the type of the `RecordProducer` interface to instantiate
- Refactors the configuration of existing snapshot modes to tie the
existing snapshot modes to their aligned implementation
- Adds a new snapshot.mode, custom, and a new configuration option to
specify a custom implementation that will be loaded by the class loader
- Changes the visibility of some classes to allow for custom snapshot
modes to get enough context to make an informed choice
- Adds some metadata about slots (the catalog_xmin) to give a full idea
of the state of slots which can be useful in implementing snapshot
modes (which is also configurable, as it can add some overhead)
Together, these changes allow for a much broader flexibility got end
users to implement a snapshot mode that can do more advanced snapshots,
such as partial recovery or for partial snapshots for tables where not
all records are needed.
This could also be seen as superseeding the
`snapshot.select.statement.overrides` to allow for users to dynamically
build queries based on the state of the slot and the offsets consumed.
* Removing redundant check for date mapping type
* Always using String as fallback value for temporal values where needed
* Pulling fallback temporal values up to JdbcValueConverters
Reason:
reference to prepareQuery is ambiguous
[ERROR] both method prepareQuery(java.lang.String,io.debezium.jdbc.JdbcConnection.StatementPreparer,io.debezium.jdbc.JdbcConnection.BlockingResultSetConsumer) in io.debezium.jdbc.JdbcConnection and method prepareQuery(java.lang.String,io.debezium.jdbc.JdbcConnection.StatementPreparer,io.debezium.jdbc.JdbcConnection.ResultSetConsumer) in io.debezium.jdbc.JdbcConnection match
* Renaming getTimeSinceLastEvent() to getMilliSecondsSinceLastEvent()
* Further unifying metrics implementation across connectors
* Emitting event in EventDispatcher also if event is filtered out
* Typo fixes
The method allows to define steps which have to be taken just after the
database connection is created (e.g. setting snapshot isolation level).
By default no operation is executed.
This will allow consumers to recognize the Debezium connector used for creating a given message, helping them to adjust their behavior for a variety of connectors.
The "field.blacklist" configuration property is an optional comma-separated list of the fully-qualified names of fields that should be excluded from change event message values. Fully-qualified names for fields are of the form "databaseName.collectionName.fieldName.nestedFieldName", where "databaseName" and "collectionName" may contain the wildcard (*) which matches any characters.
Although the "field.blacklist" configuration property allows you to remove fields from the event values, the "_id" field is always included in the event’s key.
That's simpler to grasp than the approach of passing a supplier lambda to the constructor.
Also it allows to pass on the offset via local variables instead of instance fields in some cases.
If value is binary or string, it should be better to compare the content
of the actual byte arrays.
According to ErrorProne you use reference equality when calling equals on
an array.
Part of https://issues.jboss.org/browse/DBZ-759
This records the DDL for DDL events captured during streaming. For the
initial schema snapshot, a JSON-style representation of the captured
Table objects is used in a new field of HistoryRecord, as the DDL
returned by dbms_metadata.get_ddl() isn't fully parseable by our
grammar.
* Renaming ConnectorTaskContext to CdcSourceTaskContext
* Renaming ReplicationContext to MongoDbTaskContext
* Making relationship from MongoDbTaskContext to ConnectionContext has-a instead of is-a
TypeRegistry introduced for Postgres connector
JDBC column does not have a special componentType
JDBC column provide a database specific type id
OID is a primary type identifier to be used in Postgres connector code - dropping JDB/OID dichotomy
* ChangeEventQueue#enqueue() checks the interrupted state of the calling
thread now, raising an InterruptedException in case the interrupted flag
has been set (because the producer's thread executor has been stopped)
* RecordSnapshotProducer has been adjusted to check for the interrupted
regularly, aborting if it has been set
* Renaming ConfigurationHelper to Instantiator
* Doc improvements and typo fixes
* Bringing getInstance() methods into consistent order
* Raising exception instead of logging error if instantion fails
* shutting down the snapshotting thread and the DB history producer client
if the connector is stopped while trying to write to the history topic
* reducing the time that KafkaProducer#send() will block if Kafka isn't
available; this will release the producer thread quicker in case the
connector is stopped during snapshotting
* not returning from finally block (!) in case the TX is rolled back; This
prevented the failed state to be set by the outer catch clause in execute()
* Adding support for PostGIS geometry types
* Adding support for GEOMETRY, POLYGON and more in MySQL
* For all newly supported types, changes are represented using two new schema types Geometry and Geography, containing the WKB (binary geo data) ans srid (coord system identifier)
* The existing Point type also contains the new (optional) srid field
If you start a cluster (e.g. in a test) without specifying a port
you get a random port. Sometimes you might want to connect to the
embedded zookeeper instance (for instance, to make an assertion about
a znode). To do this you need to know the port number. So let's expose it.
MySQL has special handling of 2-digit years that it deems are ambiguous, such as the year value `17` that is actually treated as `2017`. Apparently the 2-digit values are stored in MySQL and the interpretation is performed when the data is extracted, so therefore the connector needs to also perform this adjustment of the year values. This commit uses the JDK’s `TemporalAdjuster` interface and passes this down to the requisite temporal-related datatype handling code. The MySQL connector then provides its own `TemporalAdjuster` implementation that adjusts the year values via the excellend JDK `Temporal` methods.
A row in one of the MySQL test databases was changed to use a 2-digit year of `16` while the test method still checks that the year is still 2016`, verifying that the year value is properly adjusted.
Added a table and inserted rows that tries to replicate the problem reported in DBZ-195, but the test was unable to replicate the problem. In fact, this really is no different than existing tests. Changed the log messages so that if/when this happens again it will be possible to know which row was problematic.
The MySQL parser now properly handles control blocks such as `BEGIN…END`, `IF…END IF`, `REPEAT…END REPEAT`, and `LOOP…END LOOP`, even in cases where the block is preceded by and terminated by a label.
Apparently not all reserved words must be quoted when using them as colum names, so refactored MySQL’s DDL parser to better handle a variety of unquoted colum names that are reserved words.
Changed the GTID source filters in the MySQL connector to be far more efficient when the filters specify literal UUIDs rather than regex patterns. In these cases, the predicate just checks whether a supplied value is in a hash set, and no regular expression patterns are used.
The GTID source filters can still be a combination of UUID literals and regular expressions, and the predicate will use the best implementation for each. For example, if the filters include all UUID literals, then regular expressions will never be used.
The MySQL DDL parser was not correclty handling `DEFINER` clauses within `CREATE TRIGGER` or `CREATE EVENT` statements. Support for `DEFINER` clauses was recently added for the various forms of `CREATE PROCEDURE`, `CREATE FUNCTION` and `CREATE VIEW` statements. These are the only kinds of statements that have the definer attribute, per the [MySQL documentation](https://dev.mysql.com/doc/refman/5.7/en/stored-programs-security.html).
The KafkaDatabaseHistory class was not behaving well in tests using my local development environment. When restoring from the persisted Kafka topic, the class would set up a Kafka consumer and see repeated messages. It is unclear whether the repeats were due to our test environment and very short poll timeouts. Regardless, the restore logic was refactored to track offsets so as to only process messages once.
The version of the DB server required for this to work is at least 9.4. To be able to stream logical changes, the code relies on enhancements to the JDBC driver which are not yet public. Therefore, the current codebase includes the sources for the JDBC driver.
The commit also updates the general DBZ build system for:
* custom checkstyle package exclusions - required by the Postgres driver the protobuf code for now
* adds support for debugging Surefire and Failsafe
The version of the DB server required for this to work is at least 9.4
The commit also updates the general DBZ build system for:
* custom checkstyle package exclusions - required by the Postgres driver the protobuf code for now
* adds support for debugging Surefire and Failsafe
The MySQL connector uses several threads, so previously upon connector shutdown these threads were simply cancelled. This is fine for the binlog reader (which can stop at any moment), but is a poor approach for the snapshot as we didn’t always properly release the database resources and also didn’t complete the writing of the DDL history.
With this change, the snapshot reader stops in a very controlled manner, basically by having the 10-step snapshot procedure frequently check whether the reader is to continue working, and to completely avoid thread interruption altogether. And, the snapshot procedure will always clean up its database resources (locks, transactions, etc.), even if the procedure is stopped before completion.
This change also refactors how the snapshot and binlog reader are managed. This is no longer done in the MySqlConnectorTask class (which is busy enough), but rather the logic has been encapsulated in a new `ChainedReader` that makes use of a new `Reader` interface. This makes testing of `ChainedReader` easier, and ensure that `ChainedReader` relies only upon the primary methods of `Reader` rather than upon `AbstractReader`. `ChainedReader` handles multiple readers generically, and ensures that when stopped the readers are all handled correctly and completely process all records, yet avoid accidentally starting a subsequent reader(s) when stopping the previous reader.
The MySQL DDL parser was not properly consuming function declarations. For functions, the parser consumes the entire statement without handline the various expressions within the function declaration, but the parser was not properly finding the end of the statement and instead was continuing to try to consume values beyond the end of the statement.
Specifically, when the parser consumes a `BEGIN`, it looks for a corresponding `END`. However, if it encountered an `END IF`, the `IF` plus any remaining tokens were left on the token stream and unprocessed. This confused the parser, which keep looking for statements and ultimately ended with a `No more content` error.
This case was replicated in integration tests, and the code fixed to properly find the end of the statements.
By default the MySQL connector handles `DECIMAL` and `NUMERIC` columns using `java.math.BigDecimal` values and describing them using the `org.apache.kafka.connect.data.Decimal` schema type, which serializes the values to a binary form.
This change adds a configuration option that will keep the default behavior, but will instead allow handling `DECIMAL` adn `NUMERIC` values as Java `double` and a schema type of `FLOAT64`.
Added tests to verify whether the connector is properly restarting in the binlog when previously the connector failed or stopped in the middle of a transaction. The tests showed that the connector is not able to properly start when using or not using GTIDs, since restarting from an arbitrary binlog event causes problems since the TABLE_MAP events for the affected tables are skipped.
The logic was changed significantly to record in the offsets the binlog coordinates at the start of the transaction, which should work whether or not GTIDs are used. Upon restart, the connector may have to re-read the events that were previously processed, but now the offset also includes the number of events that were previously processed so that these can be skipped upon restart.
This has an unforunate side effect since the offsets capture a transaction was completed only when it generates a source record for the subsequent transaction. This is because the connector generates source records (with their offsets) for the binlog events in the transaction before the transaction's commit is seen. And, since no additional source records are produced for the transaction commit, the recorded offsets will show that the prior transaction is complete and that all of the events in the subsequent transaction are to be skipped. Thus, upon restart the connector has to re-read (but ignore) all of the binlog events associated with the completed transaction. This shouldn’t be a problem, and will only slow restarts for very large transactions.
Improved the error handling of the MySQL connector to ensure that we’re always stopping the connector when we have a problem handling a binlog event or if we have problems starting.
When the MySQL connector is reading the binlog, it outputs INFO log messages reporting status at an exponentially-increasing rate, starting at every 5 seconds and doubling until a max period of 1 hour. This output is useful when the connector starts to know that it is working, but thereafter the usefulness decreases. Once an hour is probably acceptable output.
This is not intended to replace the capturing of metrics, but is merely an aid to easily tell via the logs whether the connector continues to work.
Also improved the log message when the binlog reader stops to capture the total number of events recorded by Kafka Connect and the last recorded offset.
Corrected how the MySQL connector is treating columns of type `BIT(n)`, where _n_ is the number of bits in the value. When `n=1`, the resulting values are booleans; when `n>1`, the resulting values are little endian `byte[]` that have the minimum number of bytes to hold the `n` bits.
The `KafkaDatabaseHistory` was always creating a new producer whenever its `start()` method was called, even if it were called more than once. And, the `MySqlSchema` was calling `start()` twice, resulting in multiple producers being created and registered with JMX. Both issues were fixed.
Also, UUIDs were being used as the name of the JMX MBean for the producer, unless the `database.history.consumer.client.id` and `database.history.producer.client.id` properties were being explicitly set. Now, the MySQL connector will by default set the `client.id` property on both the database history's Kafka consumer and producer to `{connectorName}-dbhistory`. Of course, the `database.history.consumer.client.id` and `database.history.producer.client.id` properties can still be set to define the name of the producer and consumer.
MySQL supports "zero-value" dates and timestamps, but these cannot be represented as valid dates or timestamps using the Java types. For example, the zero-value `0000-00-00` for a date has what Java considers to be an invalid month and day-of-the-month.
This commit changes how the MySQL connector handles these values to not throw exceptions. When columns allow nulls, such values will be treated as nulls; when columns do not allow null values, these values will be converted to a "zero-value" for the corresponding Java representation (e.g., the epoch day or timestamp). A new test case verifies the behaviors.
Anytime we `toString()` a `Configuration`, any values for password properties should be masked. A password property is defined to be a property whose key ends in "password" in a case-insensitive manner.
The MySQL binlog events contain the binary representation of string-like values as encoded per the column's character set. Properly decoding these into Java strings requires capturing the column, table, and database character set when parsing the DDL statements.
Unfortunately, MySQL DDL allows columns (at the time the columns are created or modified) to inherit the default character set for the table, or if that is not defined the default character set for the database, or if that is not defined the character set for the server. So, in addition to modifying the MySQL DDL parser to support capturing the character set name for each column, it also had to be changed to know what these default character set names are.
The default character sets are all available via MySQL server/session/local variables. Although strictly speaking the character set variables cannot be set globally, MySQL DDL does allow session and local variables to be set with `SET` statements. Therefore, this commit enhances the MySQL DDL parser to parse `SET` statements and to track the various global, session, and local variables as seen by the DDL parser. Upon connector startup, a subset of server variables (related to character sets and collations) are read from the database via JDBC and used to initialize the DDL parser via `SET` methods.
In addition to initializing the DDL parser with the system variables related to character sets and collation, it is important to also capture the server and database default character sets in the database history so that the correct character sets are used for columns even when the default character sets have changed on the database and/or the server. Therefore, upon startup or snapshot the MySQL connector records in the database history a `SET` statement for the `character_set_server` and `collation_server` system variables so that, upon a later restart, the history's DDL statements can be re-parsed with the correct default server and database character sets. Also, when the MySQL connector reloads the database history (upon startup), the recorded default server character set is compared with the MySQL instance's current server character set, and if they are different the current character set is recorded with a new `SET` statement.
These extra steps ensure that the connector use the correct character set for each column, even when the connector restarts and reloads the database history captured by a previous version of the connector. IOW, the MySQL connector can be safely upgraded, and the new version will correctly start using the columns' character sets to decode the string-like values.
The DDL parser and in-memory models of the relational schemas were changed to capture the character set for each column whose type is a string (e.g., `CHAR`, `VARCHAR`, etc.). This required handling `SET` statements used to change the system variables that hold the names of the default character set for the server and for each database. So, even if a column does not explicitly define the character set, the column's actual character set is identified from the table's character set, which might default to the current database's character set, which if not set defaults to the system character set.
These changes merely affect how MySQL DDL is parsed and the in-memory relational schema representation to accommodate the character set at various levels. It does not change the behavior of the MySQL connector; that will be done in a subsequent commit.
All tests pass with these changes, including quite a few additional tests for the new functionality.
Changed the MySQL connector to have several new configuration properties for setting up the SSL key store and trust store (which can be used in place of System or JDK properties) used for MySQL secure connections, and another property to specify what kind of SSL connection be used.
Modified several integration tests to ensure all MySQL connections are made with `useSSL=false`.
The ENUM and SET values read from the binlog contain the indexes of the options that are included in the value, but this doesn't compared with the string values returned by MySQL and JDBC that contain the comma-separated options. With this change, the values read from the binlog will also be comma-separated strings.
Rewrote how the MySQL connector converts temporal values to use schemas with names that identify the semantic
type of temporal value, and customized how the MySQL binlog client library creates Java object values from the
raw binlog events.
Several new "semantic" schema types were defined:
* `io.debezium.time.Year` represents a year number as an INT32 value (e.g., 2016, -345, etc.).
* `io.debezium.time.Date` represents a date by storing the epoch seconds (that is, the number of seconds past the epoch) as an INT64 value.
* `io.debezium.time.Time` represents a time by storing the milliseconds past midnight as an INT32 value.
* `io.debezium.time.MicroTime` represents a time by storing the microsconds past midnight as an INT32 value.
* `io.debezium.time.NanoTime` represents a time by storing the nanoseconds past midnight as an INT32 value.
* `io.debezium.time.Timestamp` represents a date and time (without timezone information) by storing the milliseconds past epoch as an INT64 value.
* `io.debezium.time.MicroTimestamp` represents a date and time (without timezone information) by storing the microseconds past epoch as an INT64 value.
* `io.debezium.time.NanoTimestamp` represents a date and time (without timezone information) by storing the nanoseconds past epoch as an INT64 value.
* `io.debezium.time.ZonedTime` represents a time with timezone and optional fractions of a second (but no date) by storing the ISO8601 form as a STRING value (e.g., `10:15:30+01:00`)
* `io.debezium.time.ZonedTimestamp` represents a date and time with timezone and optional fractions of a second by storing the ISO8601 form as a STRING value (e.g., `2011-12-03T10:15:30.030431+01:00`)
This range of semantic types allows for a far more accurate representation in the events of the temporal values stored within the database. The MySQL connector chooses the semantic type based upon the precision of the MySQL type (e.g., `TIMESTAMP(6)` will be represented with `io.debezium.time.MicroTimestamp`, whereas `TIMESTAMP(3)` will be represented with `io.debezium.time.Timestamp`). This ensures that the events do not lose precision and that the semantics of the database column values are retained in the events even though the values are represented with primitive values.
Obviously these Kafka Connect schema representations are different and more precise than the built-in `org.apache.kafka.connect.data.Date`, `org.apache.kafka.connect.data.Time`, and `org.apache.kafka.connect.data.Timestamp` logical types provided by Kafka Connect and used by the MySQL connector in all 0.2.x and 0.1.x versions. Migration to the new MySQL connector should be possible, although consumers may still need to know about these types to properly handle temporal values and the correct precision (i.e., consumers can just assume all date INT64 values represent milliseconds).
The MySQL binlog client library converted the raw binary event information to JDBC types using a local Calendar instance, which obviously incorporates the local timezone and cannot retain more than millisecond precision. This change extends the library's deserializers to instead use the Java 8 `javax.time` classes and to retain the exact semantics of the database values and to not lose any precisions (since the `javax.time` classes have nanosecond precision).
The same logic is also used to convert the JDBC values obtained during a snapshot from the MySQL Connect/J JDBC driver. The latter has a few quirks, such as not returning any fractional seconds for `TIME` columns, even though `java.sql.Time` can store up to milliseconds.
Most of the logic of the conversions of values and mapping to Kafka Connect schemas is handled in the new `JdbcValueConverters`, which was extracted from the existing `TableSchemaBuilder`. The MySQL connector reuses and actually extends the `JdbcValueConverters` class with its own `MySqlValueConverters` class that also adds support for MySQL-specific types such as `YEAR`. Other connectors whose values are based on JDBC types should be able to reuse and/or extend the `JdbcValueConverters` class.
Integration tests that deal with temporal types were modified to use proper expected values and comparisons.
By default the MySQL JDBC driver will put the entire result set into memory, which obviously doesn't work for tables of even moderate sizes. This change adds support for streaming rows in result sets when the tables have more than a configurable number of rows (defaults to 1,000).
This posed a problem for how we were previously finding the last row in the last table; the MySQL driver does not support `ResultSet.isLast()` on result sets that are streamed. Instead, this commit wraps the consumer to which the snapshot reader writes all source records, with a consumer that buffers the last record. When the snapshot completes, the offset is updated (denoting the end of the snapshot) and set on the last buffered record before that record is flushed to the normal consumer. This should add minimal overhead while simplifying the logic to ensure the last source record has the updated offset.
This also improves the log output of the snapshot process.
* fixes a java.sql.Date conversion test to take into account zone offsets
* makes sure the ZK DB is closed during testing, otherwise file handles may leak and cause test failures
Upgraded from Kafka 0.9.0.1 to Kafka 0.10.0. The only required change was to override the `Connector.config()` method, which returns `null` or a `ConfigDef` instance that contains detailed metadata for each of the configuration fields, including supporting recommended values and marking fields as not visible (e.g., if they don't make sense given other configuration field values). This can be used by user interfaces to data-drive the configuration of a connector. Also, the default validation logic of the Connector implementations uses a `Validator` that is pretty restrictive in its functionality.
Debezium already had a fairly decent and simple `Configuration` framework. After several attempts to try and merge these concepts, reconciling the two validation mechanisms was very complicated and involved a lot of changes. It was easier to simply continue Debezium-specific validation and to override the `Connector.validate(...)` method to use Debezium's `Configuration`-based validation. Connector-based validation logic includes determining recommended values, so Debezium's `Field` class (used to define each configuration property) was enhanced with a new `Recommender` class that is similar to Kafka's.
Additional integration tests were added to verify that the `ConfigDef` result is acceptable and that the new connector validation logic works as expected, including getting recommended values for some fields (e.g., database names, table/collection names) from MySQL and MongoDB by connecting and dynamically reading the values. This was done in a way that remains backward compatible with the regular expression formats of these fields, but in a user interface that uses the `ConfigDef` mechanism the user can simply select the databases and table/collection identifiers.
Added an integration test case to diagnose the loss of the fractional seconds from MySQL temporal values. The problem appears to be a bug in the MySQL Binary Log Connector library that we used, and this bug was reported as https://github.com/shyiko/mysql-binlog-connector-java/issues/103. That was fixed in version 0.3.2 of the library, which Stanley was kind enough to release for us.
During testing, though, several issues were discovered in how temporal values are handled and converted from the MySQL events, through the MySQL Binary Log client library, and through the Debezium MySQL connector to conform with Kafka Connect's various temporal logical schema types. Most of the issues involved converting most of the temporal values from local time zone (which is how they are created by the MySQL Binary Log client) into UTC (which is how Kafka Connect expects them). Really, java.util.Date doesn't have time zone information and instead tracks the number of milliseconds past epoch, but the conversion of normal timestamp information to the milliseconds past epoch in UTC depends on the time zone in which that conversion happens.
The MySQL connector now maps TINYINT and SMALLINT columns to INT16 (rather than INT32) because INT16 is smaller and yet still large enough for all TINYINT and SMALLINT values. Note that the range of TINYINT values is either -128 to 127 for signed or 0 to 255 for unsigned, and thus INT8 is not an acceptable choice since it can only handle values in the range 0 to 255. Additionally, the JDBC Specification also suggests the proper Java type for SQL-99's TINYINT is short, which maps to Kafka Connect's INT16.
This change will be backward compatible, although the generated Kafka Connect schema will be different than in previous versions. This shouldn't cause a problem, since clients should expect to handle schema changes, and this schema change does comply with Avro schema evolution rules.
Added a new `debezium-connector-mongodb` module that defines a MongoDB connector. The MongoDB connector can capture and record the changes within a MongoDB replica set, or when seeded with addresses of the configuration server of a MongoDB sharded cluster, the connector captures the changes from the each replica set used as a shard. In the latter case, the connector even discovers the addition of or removal of shards.
The connector monitors each replica set using multiple tasks and, if needed, separate threads within each task. When a replica set is being monitored for the first time, the connector will perform an "initial sync" of that replica set's databases and collections. Once the initial sync has completed, the connector will then begin tailing the oplog of the replica set, starting at the exact point in time at which it started the initial sync. This equivalent to how MongoDB replication works.
The connector always uses the replica set's primary node to tail the oplog. If the replica set undergoes an election and different node becomes primary, the connector will immediately stop tailing the oplog, connect to the new primary, and start tailing the oplog using the new primary node. Likewise, if connector experiences any problems communicating with the replica set members, it will try to reconnect (using exponential backoff so as to not overwhelm the replica set) and continue tailing the oplog from where it last left off. In this way the connector is able to dynamically adjust to changes in replica set membership and to automatically handle communication failures.
The MongoDB oplog contains limited information, and in particular the events describing updates and deletes do not actually have the before or after state of the documents. Instead, the oplog events are all idempotent, so updates contain the effective changes that were made during an update, and deletes merely contain the deleted document identifier. Consequently, the connector is limited in the information it includes in its output events. Create and read events do contain the initial state, but the update contain only the changes (rather than the before and/or after states of the document) and delete events do not have the before state of the deleted document. All connector events, however, do contain the local system timestamp at which the event was processed and _source_ information detailing the origins of the event, including the replica set name, the MongoDB transaction timestamp of the event, and the transactions identifier among other things.
It is possible for MongoDB to lose commits in specific failure situations. For exmaple, if the primary applies a change and records it in its oplog before it then crashes unexpectedly, the secondary nodes may not have had a chance to read those changes from the primary's oplog before the primary crashed. If one such secondary is then elected as primary, it's oplog is missing the last changes that the old primary had recorded and no longer has those changes. In these cases where MongoDB loses changes recorded in a primary's oplog, it is possible that the MongoDB connector may or may not capture these lost changes.
Binary values read from the MySQL binlog may include strings, in which case they need to be converted to binary values.
Interestingly, work on this uncovered [KAFKA-3803](https://issues.apache.org/jira/browse/KAFKA-3803) whereby Kafka Connect's `Struct.equals` method does not properly handle comparing `byte[]` values. Upon researching the problem and potentially supplying a patch, it was discovered that the Kafka Connect codebase and the Avro converter all use `ByteBuffer` objects rather than `byte[]`. Consequently, the Debezium code that converts JDBC values to Kafka Connect values was changed to return `ByteBuffer` objects rather than `byte[]` objects.
Unfortunately, the JSON converter rehydrates objects with just `byte[]`, so that still means that Debezium's `VerifyRecords` logic cannot rely upon `Struct.equals` for comparison, and instead needs custom logic.
Added a Maven profile to the MySQL connector component with a Docker image that runs MySQL with GTIDs enabled. The same integration tests can be run with it using `-Pgtid-mysql` or `-Dgtid-mysql` in the Maven build.
When the MySQL connector starts up, it now queries the MySQL server to detect whether GTIDs are enabled, and if they are it will also verify that any GTID sets from the most recently recorded offset are still available in the MySQL server (similarly to how it was already doing this for binlog filenames). If the server does not have the correct coordinates/GTIDs, the connector fails with a useful error message.
This commit also tests and adjusts the `GtidSet` class to better deal with comparisons of GTID sets for proper ordering.
It also changes the connector to output MySQL's timestamp for each event using _second_ precision rather than artificially in _millisecond_ precision. To clarify the different, this change renames the field in the event's `source` structure that records the MySQL timestamp from `ts` to `ts_sec`. Similarly, the envelope's field that records the time that the connector processed each record was renamed from `ts` to `ts_ms`.
All unit and integration tests pass with the default profile and with the new GTID-enabled profile.
DatabaseHistory stores the DDL changes with the offset describing the position in the source where those DDL statements were found. When a connector restarts at a specific offset (supplied by Kafka Connect), connectors such as the MySQL connector reconstruct the database schemas by having DatabaseHistory load the history starting from the beginning and stopping at (or just before) the connector's starting offset. This change allows connectors to supply a custom comparison function.
To support GTIDs, the MySQL connector needed to store additional information in the offsets. This means the logic needed to compare offsets with and without GTIDs is non-trivial and unique to the MySQL connector. This commit adds a custom comparison function for offsets.
Per [MySQL documentation](https://dev.mysql.com/doc/refman/5.7/en/replication-gtids-failover.html), slaves are always expected to start with the same set of GTIDs as the master, so no matter which the MySQL connector follows it should always have the complete set of GTIDs seen by that server. Therefore:
* Two offsets with GTID sets can be compared using only the GTID sets.
* Any offset with a GTID set is always assumed to be newer than an offset without, since it is assumed once GTIDs are enabled they will remain enabled. (Otherwise, the connector likely needs to be restarted with a snapshot and tied to a specific master or slave with no failover.)
* Two offsets without GTIDs are compared using the binlog coordinates (filename, position, and row number).
* An offsets that is identical to another except for being in snapshot mode is considered earlier than without the snapshot. This is because snapshot mode begins by recording the position of the snapshot, and once complete the offset is recorded without the snapshot flag.
The BinlogClient library our MySQL connector uses already has support for GTIDs. This change makes use of that and adds the GTIDs from the server to the offsets created by the connector and used upon restarts.
The `VerifyRecord` utility class has methods that will verify a `SourceRecord`, and is used in many of our integration tests to check whether records are constructed in a valid manner. The utility already checks whether the records can be serialized and deserialized using the JSON converter (provided with Kafka Connect); this change also checks with the Avro Converter (which produces much smaller records and is more suitable for production).
Note that version 3.0.0 of the Confluent Avro Converter is required; version 2.1.0-alpha1 could not properly handle complex Schema objects with optional fields (see https://github.com/confluentinc/schema-registry/pull/280).
Also, the names of the Kafka Connect schemas used in MySQL source records has changed.
# The record's envelope Schema used to be "<serverName>.<database>.<table>" but is now "<serverName>.<database>.<table>.Envelope".
# The Schema for record keys used to be named "<database>.<table>/pk", but the '/' character is not valid within a Avro name, and has been changed to "<serverName>.<database>.<table>.Key".
# The Schema for record values used to be named "<database>.<table>", but to better fit with the other Schema names it has been changed to "<serverName>.<database>.<table>.Value".
Thus, all of the Schemas for a single database table have the same Avro namespace "<serverName>.<database>.<table>" (or "<topicName>") with Avro schema names of "Envelope", "Key", and "Value".
All unit and integration tests pass.
Changed the MySQL connector to make use of MDC logging contexts, which allow thread-specific parameters that can be written out on every log line by simply changing the logging configuration (e.g., Log4J configuration file).
We adopt a convention for all Debezium connectors with the following MDC properties:
* `dbz.connectorType` - the type of connector, which would be a single well-known value for each connector (e.g., "MySQL" for the MySQL connector)
* `dbz.connectorName` - the name of the connector, which for the MySQL connector is simply the value of the `server.name` property (e.g., the logical name for the MySQL server/cluster). Unfortunately, Kafka Connect does not give us its name for the connector.
* `dbz.connectorContext` - the name of the thread, which is "main" for thread running the connector; the MySQL connector uses "snapshot" for the thread started by the snapshot reader, and "binlog" for the thread started by the binlog reader.
Different logging frameworks have their own way of using MDC properties. In a Log4J configuration, for example, simply use `%X{name}` in the logger's layout, where "name" is one of the properties listed above (or another MDC property).
Refactored the MySQL connector to break out the logic of reading the binlog into a separate class, added a similar class to read a full snapshot, and then updated the MySQL connector task class to use both. Added several test cases and updated the existing tests.
Several of the MySQL connector classes were fairly large and complicated, and to prepare for upcoming changes/enhancements these larger classes were refactored to pull out units of functionality. Currently all unit tests pass with these changes, with additional unit tests for these new components.
Previously, the DDL statements were being filtered and recorded based upon the name of the database that appeared in the binlog. However, that database name is actually the name of the database to which the client submitting the operation is connected, and is not necessarily the database _affected_ by the operation (e.g., when an operation includes a fully-qualified table name not in the connected-to database).
With these changes, the table/database affected by the DDL statements is now being used to filter the recording of the statements. The order of the DDL statements is still maintained, but since each DDL statement can apply to a separate database the DDL statements are batched (in the same original order) based upon the affected database. For example, two statements affecting "db1" will get batched together into one schema change record, followed by one statement affecting "db2" as a second schema change record, followed by another statement affecting "db1" as a third schema record.
Meanwhile, this change does not affect how the database history records the changes: it still records them as submitted using a single record for each separate binlog event/position. This is much safer as each binlog event (with specific position) is written atomically to the history stream. Also, since the database history stream is what the connector uses upon recovery, the database history records are now written _after_ any schema change records to ensure that, upon recovery after failure, no schema change records are lost (and instead have at-least-once delivery guarantees).
The new envelope Struct contains fields for the local time at which the connector processed the event, the kind of operation (e.g., read, insert, update, or delete), the state of the record before and after the change, and the information about the event source. The latter two items are connector-specific. The timestamp is merely the time using the connector's process clock, and no guarantees are provided about accuracy, monotonicity, or relationship to the original source event.
The envelope structure is now used as the value for each event message in the MySQL connector; they keys of the event messages remain unchanged. Note that to facilitate Kafka log compaction (which requires a null value), a delete event containing the envelope with details about the deletion is followed by a "tombstone" event that contains the same key but null value.
An example of a message value with this new envelope is as follows:
{
"schema" : {
"type" : "struct",
"fields" : [ {
"type" : "struct",
"fields" : [ {
"type" : "int32",
"optional" : false,
"name" : "org.apache.kafka.connect.data.Date",
"version" : 1,
"field" : "order_date"
}, {
"type" : "int32",
"optional" : false,
"field" : "purchaser"
}, {
"type" : "int32",
"optional" : false,
"field" : "quantity"
}, {
"type" : "int32",
"optional" : false,
"field" : "product_id"
} ],
"optional" : true,
"name" : "connector_test.orders",
"field" : "before"
}, {
"type" : "struct",
"fields" : [ {
"type" : "int32",
"optional" : false,
"name" : "org.apache.kafka.connect.data.Date",
"version" : 1,
"field" : "order_date"
}, {
"type" : "int32",
"optional" : false,
"field" : "purchaser"
}, {
"type" : "int32",
"optional" : false,
"field" : "quantity"
}, {
"type" : "int32",
"optional" : false,
"field" : "product_id"
} ],
"optional" : true,
"name" : "connector_test.orders",
"field" : "after"
}, {
"type" : "struct",
"fields" : [ {
"type" : "string",
"optional" : false,
"field" : "server"
}, {
"type" : "string",
"optional" : false,
"field" : "file"
}, {
"type" : "int64",
"optional" : false,
"field" : "pos"
}, {
"type" : "int32",
"optional" : false,
"field" : "row"
} ],
"optional" : false,
"name" : "io.debezium.connector.mysql.Source",
"field" : "source"
}, {
"type" : "string",
"optional" : false,
"field" : "op"
}, {
"type" : "int64",
"optional" : true,
"field" : "ts"
} ],
"optional" : false,
"name" : "kafka-connect-2.connector_test.orders",
"version" : 1
},
"payload" : {
"before" : null,
"after" : {
"order_date" : 16852,
"purchaser" : 1003,
"quantity" : 1,
"product_id" : 107
},
"source" : {
"server" : "kafka-connect-2",
"file" : "mysql-bin.000002",
"pos" : 2887680,
"row" : 4
},
"op" : "c",
"ts" : 1463437199134
}
}
Notice how the Schema is significantly larger, since it must describe all of the envelope's fields even when those fields are not used. In this case, the event signifies that a record was created as the 4th record of a single event recorded in the binlog.
Changed the MySQL connector to use comma-separated lists of regular expressions for the database
and table whitelist/blacklists. Literals are still accepted and will match fully-qualified table names,
although the '.' character used as a delimiter is also a special character in regular expressions and
therefore may need to be escaped with a double backslash ('\\') to more carefully match fully-qualified
table names.
Added several new configuration properties for the MySQL connector that instruct it to hide,
truncate, and/or mask certain columns. The properties' values are all lists of regular expressions
or literal fully-qualified column names. For example, the following configuration property:
column.blacklist=server.users.picture,server.users.other
will cause the connector to leave out of change event messages for the `server.users` table those
fields that correspond to the `picture` and `others` columns. This capability can be used to
This capability can be used to prevent dissemination of sensitive information in the change event
stream.
An alternative to blacklisting is masking. The following configuration property:
column.mask.with.10.chars=server\\.users\\.(\\w*email)
will cause the connector to mask in the change event messages for the `server.users` table
all values for columns whose name ends in `email`. The values will be replaced in this case with
a constant string of 10 asterisk ('*') characters, even when the email value is null.
This capability can also be used to prevent dissemination of sensitive information in the change event
stream.
Another option is to truncate string values for specific columns. The following configuration
property:
column.truncate.to.120.chars=server[.]users[.](description|biography)
will cause the connector to truncate to at most 120 characters the values of the `description` and
`biography` columns in the change event messages for the `server.users` table. Although this example
used a limit of 120 characters, any positive length can be specified; separate properties should
be used when different lengths are required. Note how the '.' delimiter in the fully-qualified names
is escaped since that same character is a special character in regular expressions. This capability
can be used to reduce the size of change event messages.
Drop table/view statements that involve more than one table generate one event for each table/view. Previously, each of those statements had the original multi-table/view statement. Now, each event has a statement that applies to only that table (generated from the original with all the same clauses).
The previous change did not correctly capture the statements for a `RENAME TO` that renamed multiple tables, so fixed the code so that it generates a single `RENAME TO` for each table rename.
Refactored the mechanism by which components can listen to the activities of a DDL parser. The new approach
should be significantly more flexible for additional types of DDL events while making it easier to maintain
backward compatibility. It also will enable passing event-specific information on each DDL event.
This utility is only suitable for unit tests and therefore is defined in the test JAR of the `debezium-core` module. It certainly should never be used for production purposes.
Changed the EmbeddedConnector framework to initialize all major components via configuration properties rather than through the public builder. This increases the size of the configurations, but it simplifies what embedding applications must do to obtain an EmbeddedConnector instance.
The DatabaseHistory framework was also changed to be configurable in similar ways to the OffsetBackingStore. Essentially, connectors that want to use it (like the MySqlConnector) will describe it as part of the connector's configuration, allowing more flexibility in which DatabaseHistory implementation is used and how it is configured whether in Kafka Connector or as part of the EmbeddedConnector.
Added a README.md to `debezium-embedded` to provide documentation and sample code showing how to use the EmbeddedConnector.
Adds a small framework for recording the DDL operations on the schema state (e.g., Tables) as they are read and applied from the log, and when restarting the connector task to recover the accumulated schema state. Where and how the DDL operations are recorded is an abstraction called `DatabaseHistory`, with three options: in-memory (primarily for testing purposes), file-based (for embedded cases and perhaps standalone Kafka Connect uses), and Kafka (for normal Kafka Connect deployments).
The `DatabaseHistory` interface methods take several parameters that are used to construct a `SourceRecord`. The `SourceRecord` type was not used, however, since that would result in this interface (and potential extension mechanism) having a dependency on and exposing the Kafka API. Instead, the more general parameters are used to keep the API simple.
The `FileDatabaseHistory` and `MemoryDatabaseHistory` implementations are both fairly simple, but the `FileDatabaseHistory` relies upon representing each recorded change as a JSON document. This is simple, is easily written to files, allows for recovery of data from the raw file, etc. Although this was done initially using Jackson, the code to read and write the JSON documents required a lot of boilerplate. Instead, the `Document` framework developed during Debezium's very early prototype stages was brought back. It provides a very usable API for working with documents, including the ability to compare documents semantically (e.g., numeric values are converted to be able to compare their numeric values rather than just compare representations) and with or without field order.
The `KafkaDatabaseHistory` is a bit more complicated, since it uses a Kafka broker to record all database schema changes on a single topic with single partition, and then upon restart uses it to recover the history from the dedicated topics. This implementation also records the changes as JSON documents, keeping it simple and independent of the Kafka Connect converters.
The connector is in a basic working state, although it is not well tested yet and upon restart does not recover the schema state from the previous run.