debezium server
1. Support for string and binary serialization formats on debezium api.
2. Allow configuring separate key and value formats on embedded engine.
This change fixes the following issue using outbox event router on
embedded engine:
Outbox event router supports arbitrary payload formats with
BinaryDataConverter as the value.converter which passes payload
transparently. However this is currently not supported with the
embedded engine which handles message conversion using value.format to
specify the format.
In addition, when we want to pass payload transparently, it makes
sense to also pass aggregateid i.e. the event key transparently. The
default outbox table configuration specifies aggregateid as a
varchar which is also not supported by embedded engine.
It turns out that the existing code for chunking a table when taking
an incremental snapshot was buggy and did not correctly handle NULL
values when building the chunk query. An example of such a situation
would be when the user has specified "message.key.columns" to reference
a column that is part of a PostgreSQL UNIQUE INDEX that was created with
the NULLS NOT DISTINCT option.
This commit updates the new AbstractChunkQueryBuilder so that it checks
whether a key column is optional. If it is, then additional will
appropriately consider NULL values when generating a chunk query using
"IS [NOT] NULL" clauses.
One complication is that different database engines have different
sorting behavior of ORDER BY. It is apparently not well-defined by the
SQL standard. Some databases consider NULL values to be higher than any
non-NULL values, and others consider them to be lower.
To handle this situation, a new nullsSortLast() function is added to the
JdbcConnection class. By default, it returns an empty value, indicating
that the behavior of the database engine is unknown. When an optional
field is encountered by AbstractChunkQueryBuilder in this situation, we
throw an error because we don't actually know how to correctly chunk the
query: there's no safe assumption that can be made here.
Derived JdbcConnection classes can then override the nullsSortLast
function, and return a value indicating the actual behavior of that
database engine. When this is done, the AbstractChunkQueryBuilder then
knows how to correctly build a chunk query that can handle NULL values.
To help test this, new tests have been added to
AbstractIncrementalSnapshotTest. First, the existing insertsWithoutPks
test has been moved and deduplicated from MySQL and PostgreSQL so that
the test case can be reused on other engines. Second, a new
insertsWithoutPksAndNull test is run, which inserts data with NULL
values in the message key columns. To demonstrate that chunk queries
are being correctly generated for practically every case, the
INCREMENTAL_SNAPSHOT_CHUNK_SIZE is set to 1 so that NULL values are not
returned in the middle of a chunk, which can cause us to skip testing
the code we need to test.
Mostly to stabilize the testsuite and find the right value for the
default task start/stop timeout. This info may be inaccurate for
mutiple tasks as we eventually call `ConnectorCallback::taskStopped`
callback and this time is added to the next task stop time.
Engine is typically run in a different thread when one from which the
`close()` method si called. During the call of `close()` method, we stop
task polling and `run()` method may move to `finally` block, calling
completetion callback before we return from `close()` method and thus
e.g. even before calling stop of the connector.
Make sure engine state is moved to `STOPPED` and completion callback is
called after engine is really stopped and `close()` method has finished.
If the number of threads is provided as a number, limit it to 16 threads
to avoid possible overhead with context switching on a beefy machines
where the default value using all available cores may result in many
threads, which would be waiting most of the time anyway, as such machine
may run probably many other tasks, not only Debezium.
If the user really wants to use all available cores, it can be specified
using `AVAILABLE-CORES` placeholder.
Some polling tasks may be stuck and we need to interrupt polling during
the shutdown not have to wait for TASK_MANAGEMENT_TIMEOUT_MS to timeout.
Also, when we start to interrput polling, we have to remove interruption
of the main thread in the `catch` part. It was a bug anyway as it
interrputed the main thread what we definitelly don't want to happen in
any case.
Increase task management timeout to two minutes and make this option
internal. This timeout will be hopefully sufficient for most of the
deployments. If not, we will increase the timeout it make this option
public.
Default implmentation of `RecordCommitter`, the `SourceRecordCommitter`,
is always created for each task and withing given task is called
sequentially, always in the same thread. There's no need to aquire locks
for each method call.
Make `SourceRecordCommitter` thread unsafe.
Based on feedback in https://github.com/debezium/debezium-server/pull/33 this
commit adds the partition() method to the ChangeEvent interface and implements
it in the EmbeddedEngineChangeEvent. This allows reading the assigned partition
for an event in downstream processors, for example in custom Sinks that need
the assigned partition for routing purposes.
Link: https://issues.redhat.com/browse/DBZ-6723
In 7b4cf1901 deprecated `io.debezium.embedded.spi.OffsetCommitPolicy`,
which provided also constructor for `Configuration`, was removed.
This constructor was actually used for creating `OffsetCommitPolicy`.
`Configuration` provides default values for options which are not
explicitly set, while `Properties` based constructor cannot do that
and therefore with this switch the default value for
`offset.flush.interval.ms` is now missing.
As debezium-api package has no knowledge about `Configuration`
interface, which is part of debezium-core, and thus about the default
values, specify default value direcly in the `PeriodicCommitOffsetPolicy`
class.
Postgres `RecordsStreamProducerIT` reliaes on
EmebeddedEngine.runWithTask(). As this method effectively expose
engine's internal task and tests do the asserts against the state
of the task, it's hard to replace it. If we want to keep the tests,
the most simple approach seems to expose engine task in similar way
how EmbeddedEngine does that.
Add interface for testing Debezium engine, which would define minimal
set of methods which needs to be exposed by the implementing classes to
be able to run the testsuite against the Debezium engine. The number of
such methods should be as low as possible. Implementing classes would
typically act as proxies to actual `DebeziumEngine` implementations.
Add `TestingDebeziumEngine` implementation for `EmbeddedEngine` and
switch to `TestingDebeziumEngine` in `AbstractConnectorTest`.
This config will be re-used by possible other implementations of
DebeiumEngine API in the embedded package. As DebeziumEngine API
can have completely different implementations and thus also config,
the class is called `EmbeddedEngineConfig` as it's assumed to be used
only by embedded engine "family" of implementations.
To keep backward compatibility, the config options are extracted into
an interface and `EmbeddedEngine` implements this interface, thus
allowing to use these options in custom classes without any need for the
code changes.