89f51ce632
Co-authored-by: roldanbob <broldan@redhat.com>
807 lines
34 KiB
Plaintext
807 lines
34 KiB
Plaintext
// Category: debezium-using
|
|
// Type: assembly
|
|
[id="sending-signals-to-a-debezium-connector"]
|
|
= Sending signals to a {prodname} connector
|
|
ifdef::community[]
|
|
:toc:
|
|
:toc-placement: macro
|
|
:linkattrs:
|
|
:icons: font
|
|
:source-highlighter: highlight.js
|
|
|
|
toc::[]
|
|
|
|
[id="debezium-signaling-overview"]
|
|
== Overview
|
|
endif::community[]
|
|
|
|
The {prodname} signaling mechanism provides a way to modify the behavior of a connector, or to trigger a one-time action, such as initiating an xref:debezium-signaling-ad-hoc-incremental-snapshots[ad hoc snapshot] of a table.
|
|
To use signals to trigger a connector to perform a specified action, you can configure the connector to use one or more of the following channels:
|
|
|
|
SourceSignalChannel:: You can issue a SQL command to add a signal message to a specialized signaling data collection.
|
|
The signaling data collection, which you create on the source database, is designated exclusively for communicating with {prodname}.
|
|
KafkaSignalChannel:: You submit signal messages to a configurable Kafka topic.
|
|
JmxSignalChannel:: You submit signals through the JMX `signal` operation.
|
|
FileSignalChannel:: You can use a file to send signals.
|
|
Custom:: You submit signals to a xref:debezium-custom-signaling-channel[custom channel] that you implement.
|
|
|
|
When {prodname} detects that a new xref:debezium-signaling-example-of-a-logging-record[logging record] or xref:debezium-signaling-example-of-an-ad-hoc-blocking-snapshot-signal-record[ad hoc snapshot record] is added to the channel, it reads the signal, and initiates the requested operation.
|
|
|
|
Signaling is available for use with the following {prodname} connectors:
|
|
|
|
* Db2
|
|
* MongoDB (Technology Preview)
|
|
* MySQL
|
|
* Oracle
|
|
* PostgreSQL
|
|
* SQL Server
|
|
|
|
You can specify which channel is enabled by setting the `signal.enabled.channels` configuration property. The property lists the names of the channels that are enabled. By default, {prodname} provides the following channels: `source` and `kafka`.
|
|
The `source` channel is enabled by default, because it is required for incremental snapshot signals.
|
|
|
|
|
|
// Type: procedure
|
|
// Title: Enabling {prodname} source signaling channel
|
|
[id="debezium-signaling-enabling-source-signaling-channel"]
|
|
== Enabling source signaling channel
|
|
|
|
By default, the {prodname} source signaling channel is enabled.
|
|
|
|
You must explicitly configure signaling for each connector that you want to use it with.
|
|
|
|
.Procedure
|
|
|
|
. On the source database, create a signaling data collection table for sending signals to the connector.
|
|
For information about the required structure of the signaling data collection, see xref:debezium-signaling-data-collection-structure[Structure of a signaling data collection].
|
|
|
|
. For source databases such as Db2 or SQL Server that implement a native change data capture (CDC) mechanism, enable CDC for the signaling table.
|
|
|
|
. Add the name of the signaling data collection to the {prodname} connector configuration. +
|
|
In the connector configuration, add the property `signal.data.collection`, and set its value to the fully-qualified name of the signaling data collection that you created in Step 1. +
|
|
+
|
|
For example, `signal.data.collection = inventory.debezium_signals`. +
|
|
+
|
|
The format for the fully-qualified name of the signaling collection depends on the connector. +
|
|
The following example shows the naming formats to use for each connector:
|
|
|
|
Db2:: `_<schemaName>_._<tableName>_`
|
|
MongoDB:: `_<databaseName>_._<collectionName>_`
|
|
MySQL:: `_<databaseName>_._<tableName>_`
|
|
Oracle:: `_<databaseName>_._<schemaName>_._<tableName>_`
|
|
PostgreSQL:: `_<schemaName>_._<tableName>_`
|
|
SQL Server:: `_<databaseName>_._<schemaName>_._<tableName>_` +
|
|
+
|
|
For more information about setting the `signal.data.collection` property, see the table of configuration properties for your connector.
|
|
|
|
// Type: reference
|
|
// ModuleID: debezium-signaling-required-structure-of-a-signaling-data-collection
|
|
// Title: Required structure of a {prodname} signaling data collection
|
|
[id="debezium-signaling-data-collection-structure"]
|
|
=== Structure of a signaling data collection
|
|
|
|
A signaling data collection, or signaling table, stores signals that you send to a connector to trigger a specified operation.
|
|
The structure of the signaling table must conform to the following standard format.
|
|
|
|
* Contains three fields (columns).
|
|
* Fields are arranged in a specific order, as shown in xref:debezium-signaling-description-of-required-structure-of-a-signaling-data-collection[Table 1].
|
|
|
|
.Structure of a signaling data collection
|
|
[id="debezium-signaling-description-of-required-structure-of-a-signaling-data-collection"]
|
|
.Required structure of a signaling data collection
|
|
[cols="1,1,9",options="header"]
|
|
|===
|
|
|Field | Type | Description
|
|
|
|
|`id` +
|
|
(required)
|
|
|`string`
|
|
|
|
|An arbitrary unique string that identifies a signal instance. +
|
|
You assign an `id` to each signal that you submit to the signaling table. +
|
|
Typically, the ID is a UUID string. +
|
|
You can use signal instances for logging, debugging, or de-duplication. +
|
|
When a signal triggers {prodname} to perform an incremental snapshot, it generates a signal message with an arbitrary `id` string.
|
|
The `id` string that the generated message contains is unrelated to the `id` string in the submitted signal.
|
|
|
|
|`type` +
|
|
(required)
|
|
|`string`
|
|
|
|
|Specifies the type of signal to send. +
|
|
You can use some signal types with any connector for which signaling is available, while other signal types are available for specific connectors only.
|
|
|
|
|`data` +
|
|
(optional)
|
|
|`string`
|
|
|
|
|Specifies JSON-formatted parameters to pass to a signal action. +
|
|
Each signal type requires a specific set of data.
|
|
|
|
|===
|
|
|
|
NOTE: The field names in a data collection are arbitrary.
|
|
The preceding table provides suggested names.
|
|
If you use a different naming convention, ensure that the values in each field are consistent with the expected content.
|
|
|
|
// Type: procedure
|
|
// Title: Creating a {prodname} signaling data collection
|
|
[id="debezium-signaling-creating-a-signal-data-collection"]
|
|
=== Creating a signaling data collection
|
|
|
|
You create a signaling table by submitting a standard SQL DDL query to the source database.
|
|
|
|
.Prerequisites
|
|
|
|
* You have sufficient access privileges to create a table on the source database.
|
|
|
|
.Procedure
|
|
|
|
* Submit a SQL query to the source database to create a table that is consistent with the xref:debezium-signaling-description-of-required-structure-of-a-signaling-data-collection[required structure], as shown in the following example: +
|
|
+
|
|
`CREATE TABLE _<tableName>_ (id VARCHAR(_<varcharValue>_) PRIMARY KEY, type VARCHAR(__<varcharValue>__) NOT NULL, data VARCHAR(_<varcharValue>_) NULL);` +
|
|
|
|
[NOTE]
|
|
====
|
|
The amount of space that you allocate to the `VARCHAR` parameter of the `id` variable must be sufficient to accommodate the size of the ID strings of signals sent to the signaling table. +
|
|
If the size of an ID exceeds the available space, the connector cannot process the signal.
|
|
====
|
|
|
|
The following example shows a `CREATE TABLE` command that creates a three-column `debezium_signal` table:
|
|
|
|
[source,sql]
|
|
----
|
|
CREATE TABLE debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);
|
|
----
|
|
|
|
// Type: procedure
|
|
// Title: Enabling the {prodname} Kafka signaling channel
|
|
[id="debezium-signaling-enabling-kafka-signaling-channel"]
|
|
== Enabling Kafka signaling channel
|
|
|
|
You can enable the Kafka signaling channel by adding it to the `signal.enabled.channels` configuration property, and then adding the name of the topic that receives signals to the `signal.kafka.topic` property.
|
|
After you enable the signaling channel, a Kafka consumer is created to consume signals that are sent to the configured signal topic.
|
|
|
|
.Additional configuration available for the consumer
|
|
|
|
* {link-prefix}:{link-db2-connector}#debezium-db2-connector-kafka-signals-configuration-properties[Db2 connector Kafka signal configuration properties]
|
|
ifdef::community[]
|
|
* {link-prefix}:{link-mongodb-connector}#debezium-mongodb-connector-kafka-signals-configuration-properties[MongoDB connector Kafka signal configuration properties]
|
|
endif::community[]
|
|
* {link-prefix}:{link-mysql-connector}#debezium-mysql-connector-kafka-signals-configuration-properties[MySQL connector Kafka signal configuration properties]
|
|
* {link-prefix}:{link-oracle-connector}#debezium-oracle-connector-kafka-signals-configuration-properties[Oracle connector Kafka signal configuration properties]
|
|
* {link-prefix}:{link-postgresql-connector}#debezium-postgresql-connector-kafka-signals-configuration-properties[PostgreSQL connector Kafka signal configuration properties]
|
|
* {link-prefix}:{link-sqlserver-connector}#debezium-sqlserver-connector-kafka-signals-configuration-properties[SQL Server connector Kafka signal configuration properties]
|
|
|
|
[NOTE]
|
|
====
|
|
To use Kafka signaling to trigger ad hoc incremental snapshots for most connectors, you must first xref:debezium-signaling-enabling-source-signaling-channel[enable a `source` signaling channel] in the connector configuration.
|
|
The source channel implements a watermarking mechanism to deduplicate events that might be captured by an incremental snapshot and then captured again after streaming resumes.
|
|
Enabling the source channel is not required when using a signaling channel to trigger an incremental snapshot of a read-only MySQL database that has {link-prefix}:link-mysql-connector}#enable-mysql-gtids[GTIDs enabled].
|
|
For more information, see {link-prefix}:{link-mysql-connector}#mysql-read-only-incremental-snapshots[MySQL read only incremental snapshot]
|
|
====
|
|
|
|
=== Message format
|
|
|
|
The key of the Kafka message must match the value of the `topic.prefix` connector configuration option.
|
|
|
|
The value is a JSON object with `type` and `data` fields.
|
|
|
|
When the signal type is set to `execute-snapshot`, the `data` field must include the fields that are listed in the following table:
|
|
|
|
.Execute snapshot data fields
|
|
[cols="2,2,6a",options="header"]
|
|
|===
|
|
|Field | Default | Value
|
|
|
|
|`type`
|
|
|`incremental`
|
|
| The type of the snapshot to run.
|
|
Currently {prodname} supports the `incremental` and `blocking` types.
|
|
|
|
|`data-collections`
|
|
|_N/A_
|
|
| An array of comma-separated regular expressions that match the fully-qualified names of the data collections to include in the snapshot. +
|
|
Specify the names by using the same format as is required for the `signal.data.collection` configuration option.
|
|
|
|
|`[.line-through]#additional-condition#`
|
|
|_N/A_
|
|
| An optional string that specifies a condition that the connector evaluates to designate a subset of records to include in a snapshot. +
|
|
+
|
|
[NOTE]
|
|
====
|
|
This property is deprecated and should be replaced by the `additional-conditions` property.
|
|
====
|
|
|
|
|`additional-conditions`
|
|
|_N/A_
|
|
| An optional array that specifies a set of additional conditions that the connector evaluates to determine the subset of records to include in a snapshot. +
|
|
Each additional condition is an object that specifies the criteria for filtering the data that an ad hoc snapshot captures.
|
|
You can set the following properties for each additional condition:
|
|
|
|
`data-collection`:: The fully-qualified name of the +{data-collection}+ that the filter applies to.
|
|
You can apply different filters to each +{data-collection}+.
|
|
`filter`:: Specifies column values that must be present in a database record for the snapshot to include it, for example, `"color='blue'"`. +
|
|
The snapshot process evaluates records in the +{data-collection}+ against the `filter` value and captures only records that contain matching values. +
|
|
+
|
|
The specific values that you assign to the `filter` property depend on the type of ad hoc snapshot:
|
|
|
|
* For incremental snapshots, you specify a search condition fragment, such as `"color='blue'"`, that the snapshot appends to the condition clause of a query.
|
|
* For blocking snapshots, you specify a full `SELECT` statement, such as the one that you might set in the `snapshot.select.statement.overrides` property.
|
|
|===
|
|
|
|
The following example shows a typical `execute-snapshot` Kafka message:
|
|
|
|
----
|
|
Key = `test_connector`
|
|
|
|
Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`
|
|
----
|
|
|
|
// Type: procedure
|
|
// Title: Enabling the {prodname} JMX signaling channel
|
|
[id="debezium-signaling-enabling-jmx-signaling-channel"]
|
|
== Enabling a JMX signaling channel
|
|
|
|
You can enable the JMX signaling by adding `jmx` to the `signal.enabled.channels` property in the connector configuration, and then {link-prefix}:{link-debezium-monitoring}#monitoring-debezium[enabling the JMX MBean Server] to expose the signaling bean.
|
|
|
|
// Title: Using a JMX signaling channel to send signals to {prodname}
|
|
[id="debezium-signaling-using-a-jmx-signaling-channel-to-sends-signals"]
|
|
=== Sending JMX signals
|
|
|
|
.Procedure
|
|
1. Use your preferred JMX client (for example. JConsole or JDK Mission Control) to connect to the MBean server.
|
|
2. Search for the Mbean `debezium.__<connector-type>__.management.signals.__<server>__`.
|
|
The Mbean exposes `signal` operations that accept the following input parameters:
|
|
|
|
p0:: The id of the signal.
|
|
p1:: The type of the signal, for example, `execute-snapshot`.
|
|
p2:: A JSON data field that contains additional information about the specified signal type.
|
|
3. Send an `execute-snapshot` signal by providing value for the input parameters. +
|
|
In the JSON data field, include the information that is listed in the following table:
|
|
+
|
|
.Execute snapshot data fields
|
|
[cols="2,2,6a",options="header"]
|
|
|===
|
|
|Field | Default | Value
|
|
|
|
|`type`
|
|
|`incremental`
|
|
| The type of the snapshot to run.
|
|
Currently {prodname} supports the `incremental` and `blocking` types.
|
|
|
|
|`data-collections`
|
|
|_N/A_
|
|
| An array of comma-separated regular expressions that match the fully-qualified names of the tables to include in the snapshot. +
|
|
Specify the names by using the same format as is required for the xref:{context}-property-signal-data-collection[signal.data.collection] configuration option.
|
|
|
|
|`[.line-through]#additional-condition#`
|
|
|_N/A_
|
|
| An optional string that specifies a condition that the connector evaluates to designate a subset of records to include in a snapshot. +
|
|
|
|
[NOTE]
|
|
====
|
|
This property is deprecated and should be replaced by the `additional-conditions` property.
|
|
====
|
|
|
|
|`additional-conditions`
|
|
|_N/A_
|
|
|An optional array that specifies a set of additional conditions that the connector evaluates to determine the subset of records to include in a snapshot. +
|
|
Each additional condition is an object that specifies the criteria for filtering the data that an ad hoc snapshot captures.
|
|
You can set the following properties for each additional condition:
|
|
|
|
`data-collection`:: The fully-qualified name of the +{data-collection}+ that the filter applies to.
|
|
You can apply different filters to each +{data-collection}+.
|
|
|
|
`filter`:: Specifies column values that must be present in a database record for the snapshot to include it, for example, `"color='blue'"`. +
|
|
The snapshot process evaluates records in the +{data-collection}+ against the `filter` value and captures only records that contain matching values. +
|
|
+
|
|
The specific values that you assign to the `filter` property depend on the type of ad hoc snapshot:
|
|
|
|
* For incremental snapshots, you specify a search condition fragment, such as `"color='blue'"`, that the snapshot appends to the condition clause of a query.
|
|
* For blocking snapshots, you specify a full `SELECT` statement, such as the one that you might set in the `snapshot.select.statement.overrides` property.
|
|
|===
|
|
|
|
The following image shows an example of how to use JConsole to send a signal:
|
|
|
|
image::jmx-signal-operation.png[Using JConsole to send an `execute-snapshot` signal]
|
|
|
|
// Type: concept
|
|
[id="debezium-custom-signaling-channel"]
|
|
== Custom signaling channel
|
|
The signaling mechanism is designed to be extensible.
|
|
You can implement channels as needed to send signals to {prodname} in a manner that works best in your environment.
|
|
|
|
Adding a signaling channel involves several steps:
|
|
|
|
1. xref:debezium-signaling-enabling-custom-signaling-channel[Create a Java project for the channel] to implement the channel, and xref:debezium-signaling-core-module-dependencies[add `{prodname} Core` as a dependency].
|
|
2. xref:deploying-a-debezium-custom-signaling-channel[Deploy the custom signaling channel].
|
|
3. xref:configuring-connectors-to-use-a-custom-signaling-channel[Enable connectors to use the custom signaling channel by modifying the connector configuration].
|
|
|
|
// Type: procedure
|
|
// Title: Providing a custom {prodname} signaling channel
|
|
// ModuleID: debezium-signaling-providing-a-custom-signaling-channel
|
|
[id="debezium-signaling-enabling-custom-signaling-channel"]
|
|
=== Provide custom signaling channel
|
|
|
|
Custom signaling channels are Java classes that implement the `io.debezium.pipeline.signal.channels.SignalChannelReader` service provider interface (SPI).
|
|
For example:
|
|
|
|
[source,java,indent=0]
|
|
----
|
|
public interface SignalChannelReader {
|
|
|
|
String name(); // <1>
|
|
|
|
void init(CommonConnectorConfig connectorConfig); // <2>
|
|
|
|
List<SignalRecord> read(); // <3>
|
|
|
|
void close(); // <4>
|
|
}
|
|
----
|
|
<1> The name of the reader.
|
|
To enable {prodname} to use the channel, specify this name in the connector's `signal.enabled.channels` property.
|
|
<2> Initializes specific configuration, variables, or connections that the channel requires.
|
|
<3> Reads signal from the channel.
|
|
The `SignalProcessor` class calls this method to retrieve the signal to process.
|
|
<4> Closes all allocated resources.
|
|
{prodname} calls this methods when the connector is stopped.
|
|
|
|
// Type: concept
|
|
[id="debezium-signaling-core-module-dependencies"]
|
|
=== {prodname} core module dependencies
|
|
|
|
A custom signaling channel Java project has compile dependencies on the {prodname} core module.
|
|
You must include these compile dependencies in your project's `pom.xml` file, as shown in the following example:
|
|
|
|
[source,xml]
|
|
----
|
|
<dependency>
|
|
<groupId>io.debezium</groupId>
|
|
<artifactId>debezium-core</artifactId>
|
|
<version>${version.debezium}</version> // <1>
|
|
</dependency>
|
|
----
|
|
<1> `${version.debezium}` represents the version of the {prodname} connector.
|
|
|
|
Declare your implementation in the `META-INF/services/io.debezium.pipeline.signal.channels.SignalChannelReader` file.
|
|
|
|
// Type: procedure
|
|
[id="deploying-a-debezium-custom-signaling-channel"]
|
|
=== Deploying a custom signaling channel
|
|
|
|
.Prerequisites
|
|
* You have a custom signaling channel Java program.
|
|
|
|
.Procedure
|
|
* To use a custom signaling channel with a {prodname} connector, export the Java project to a JAR file, and copy the file to the directory that contains the JAR file for each {prodname} connector that you want to use it with. +
|
|
+
|
|
For example, in a typical deployment, the {prodname} connector files are stored in subdirectories of a Kafka Connect directory (`/kafka/connect`), with each connector JAR in its own subdirectory (`/kafka/connect/debezium-connector-db2`, `/kafka/connect/debezium-connector-mysql`, and so forth).
|
|
|
|
NOTE: To use a custom signaling channel with multiple connectors, you must place a copy of the custom signaling channel JAR file in the subdirectory for each connector.
|
|
|
|
// Type: procedure
|
|
[id="configuring-connectors-to-use-a-custom-signaling-channel"]
|
|
=== Configuring connectors to use a custom signaling channel
|
|
|
|
Add the name of the custom signaling channel to the `signal.enabled.channels` configuration property.
|
|
|
|
// Type: concept
|
|
// ModuleID: debezium-signaling-types-of-signal-actions
|
|
// Title: Types of {prodname} signal actions
|
|
== Signal actions
|
|
|
|
You can use signaling to initiate the following actions:
|
|
|
|
* xref:debezium-signaling-logging[Add messages to the log].
|
|
* xref:debezium-signaling-ad-hoc-incremental-snapshots[Trigger ad hoc incremental snapshots].
|
|
* xref:debezium-signaling-stop-ad-hoc-snapshots[Stop execution of an ad hoc snapshot].
|
|
* xref:debezium-signaling-pause-incremental-snapshots[Pause incremental snapshots].
|
|
* xref:debezium-signaling-resume-incremental-snapshots[Resume incremental snapshots].
|
|
* xref:debezium-signaling-ad-hoc-blocking-snapshots[Trigger ad hoc blocking snapshot].
|
|
* xref:debezium-signaling-custom-action[Custom action].
|
|
|
|
Some signals are not compatible with all connectors.
|
|
|
|
// Type: concept
|
|
[id="debezium-signaling-logging"]
|
|
=== Logging signals
|
|
|
|
You can request a connector to add an entry to the log by creating a signaling table entry with the `log` signal type.
|
|
After processing the signal, the connector prints the specified message to the log.
|
|
Optionally, you can configure the signal so that the resulting message includes the streaming coordinates.
|
|
|
|
[id="debezium-signaling-example-of-a-logging-record"]
|
|
.Example of a signaling record for adding a log message
|
|
[cols="1,9,9",options="header"]
|
|
|===
|
|
|Column | Value | Description
|
|
|
|
|id
|
|
|`924e3ff8-2245-43ca-ba77-2af9af02fa07`
|
|
|
|
|
|
|
|type
|
|
|`log`
|
|
|The action type of the signal.
|
|
|
|
|data
|
|
a|
|
|
[source,json]
|
|
----
|
|
{"message": "Signal message at offset {}"}
|
|
----
|
|
| The `message` parameter specifies the string to print to the log. +
|
|
If you add a placeholder (`{}`) to the message, it is replaced with streaming coordinates.
|
|
|===
|
|
|
|
// Type: concept
|
|
[id="debezium-signaling-ad-hoc-incremental-snapshots"]
|
|
=== Ad hoc snapshot signals
|
|
|
|
You can request a connector to initiate an ad hoc snapshot by creating a signal with the `execute-snapshot` signal type.
|
|
After processing the signal, the connector runs the requested snapshot operation.
|
|
|
|
Unlike the initial snapshot that a connector runs after it first starts, an ad hoc snapshot occurs during runtime, after the connector has already begun to stream change events from a database.
|
|
You can initiate ad hoc snapshots at any time.
|
|
|
|
Ad hoc snapshots are available for the following {prodname} connectors:
|
|
|
|
* Db2
|
|
ifdef::community[]
|
|
* MongoDB
|
|
endif::community[]
|
|
* MySQL
|
|
* Oracle
|
|
* PostgreSQL
|
|
* SQL Server
|
|
|
|
[id="debezium-signaling-example-of-an-ad-hoc-signal-record"]
|
|
.Example of an ad hoc snapshot signal record
|
|
[cols="1,9",options="header"]
|
|
|===
|
|
|Column | Value
|
|
|
|
|id
|
|
|`d139b9b7-7777-4547-917d-e1775ea61d41`
|
|
|
|
|type
|
|
|`execute-snapshot`
|
|
|
|
|data
|
|
a|
|
|
[source,json]
|
|
----
|
|
{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}
|
|
----
|
|
|
|
|===
|
|
|
|
[id="debezium-signaling-example-of-an-ad-hoc-signal-message"]
|
|
.Example of an ad hoc snapshot signal message
|
|
[cols="1,9",options="header"]
|
|
|===
|
|
|Key | Value
|
|
|
|
|test_connector
|
|
a|
|
|
[source,json]
|
|
----
|
|
{"type":"execute-snapshot","data": {"data-collections": ["public.MyFirstTable"], "type": "INCREMENTAL", "additional-conditions":[{"data-collection": "public.MyFirstTable", "filter":"color='blue' AND brand='MyBrand'"]}}
|
|
----
|
|
|
|
|===
|
|
|
|
|
|
For more information about ad hoc snapshots, see the _Snapshots_ topic in the documentation for your connector.
|
|
|
|
|
|
.Additional resources
|
|
|
|
* {link-prefix}:{link-db2-connector}#db2-ad-hoc-snapshots[Db2 connector ad hoc snapshots]
|
|
ifdef::community[]
|
|
* {link-prefix}:{link-mongodb-connector}#mongodb-ad-hoc-snapshots[MongoDB connector ad hoc snapshots]
|
|
endif::community[]
|
|
* {link-prefix}:{link-mysql-connector}#mysql-ad-hoc-snapshots[MySQL connector ad hoc snapshots]
|
|
* {link-prefix}:{link-oracle-connector}#oracle-ad-hoc-snapshots[Oracle connector ad hoc snapshots]
|
|
* {link-prefix}:{link-postgresql-connector}#postgresql-ad-hoc-snapshots[PostgreSQL connector ad hoc snapshots]
|
|
* {link-prefix}:{link-sqlserver-connector}#sqlserver-ad-hoc-snapshots[SQL Server connector ad hoc snapshots]
|
|
|
|
[id="debezium-signaling-stop-ad-hoc-snapshots"]
|
|
=== Ad hoc snapshot stop signals
|
|
|
|
You can request a connector to stop an in-progress ad hoc snapshot by creating a signal table entry with the `stop-snapshot` signal type.
|
|
After processing the signal, the connector will stop the current in-progress snapshot operation.
|
|
|
|
You can stop ad hoc snapshots for the following {prodname} connectors:
|
|
|
|
* Db2
|
|
ifdef::community[]
|
|
* MongoDB
|
|
endif::community[]
|
|
* MySQL
|
|
* Oracle
|
|
* PostgreSQL
|
|
* SQL Server
|
|
|
|
[id="debezium-signaling-example-of-a-stop-ad-hoc-signal-record"]
|
|
.Example of a stop ad hoc snapshot signal record
|
|
[cols="1,9",options="header"]
|
|
|===
|
|
|Column | Value
|
|
|
|
|id
|
|
|`d139b9b7-7777-4547-917d-e1775ea61d41`
|
|
|
|
|type
|
|
|`stop-snapshot`
|
|
|
|
|data
|
|
a|
|
|
[source,json]
|
|
----
|
|
{"type":"INCREMENTAL", "data-collections": ["public.MyFirstTable"]}
|
|
----
|
|
|===
|
|
|
|
You must specify the `type` of the signal.
|
|
The `data-collections` field is optional.
|
|
Leave the `data-collections` field blank to request the connector to stop all activity in the current snapshot.
|
|
If you want the incremental snapshot to proceed, but you want to exclude specific collections from the snapshot, provide a comma-separated list of the names of the collections or regular expressions to exclude.
|
|
After the connector processes the signal, the incremental snapshot proceeds, but it excludes data from the collections that you specify.
|
|
|
|
// Type: concept
|
|
[id="debezium-signaling-incremental-snapshots"]
|
|
=== Incremental snapshots
|
|
|
|
Incremental snapshots are a specific type of ad hoc snapshot.
|
|
In an incremental snapshot, the connector captures the baseline state of the tables that you specify, similar to an initial snapshot.
|
|
However, unlike an initial snapshot, an incremental snapshot captures tables in chunks, rather than all at once.
|
|
The connector uses a watermarking method to track the progress of the snapshot.
|
|
|
|
By capturing the initial state of the specified tables in chunks rather than in a single monolithic operation, incremental snapshots provide the following advantages over the initial snapshot process:
|
|
|
|
* While the connector captures the baseline state of the specified tables, streaming of near real-time events from the transaction log continues uninterrupted.
|
|
* If the incremental snapshot process is interrupted, it can be resumed from the point at which it stopped.
|
|
* You can initiate an incremental snapshot at any time.
|
|
|
|
[id="debezium-signaling-pause-incremental-snapshots"]
|
|
==== Incremental snapshot pause signals
|
|
|
|
You can request a connector to pause an in-progress incremental snapshot by creating a signal table entry with the `pause-snapshot` signal type.
|
|
After processing the signal, the connector will stop pause current in-progress snapshot operation.
|
|
Therefor it's not possible to specify the data collection as the snapshot processing will be paused in position where it is in time of processing of the signal.
|
|
|
|
You can pause incremental snapshots for the following {prodname} connectors:
|
|
|
|
* Db2
|
|
ifdef::community[]
|
|
* MongoDB
|
|
endif::community[]
|
|
* MySQL
|
|
* Oracle
|
|
* PostgreSQL
|
|
* SQL Server
|
|
|
|
[id="debezium-signaling-example-of-a-pause-incremental-signal-record"]
|
|
.Example of a pause incremental snapshot signal record
|
|
[cols="1,9",options="header"]
|
|
|===
|
|
|Column | Value
|
|
|
|
|id
|
|
|`d139b9b7-7777-4547-917d-e1775ea61d41`
|
|
|
|
|type
|
|
|`pause-snapshot`
|
|
|
|
|===
|
|
|
|
You must specify the `type` of the signal.
|
|
The `data` field is ignored.
|
|
|
|
[id="debezium-signaling-resume-incremental-snapshots"]
|
|
==== Incremental snapshot resume signals
|
|
|
|
You can request a connector to resume a paused incremental snapshot by creating a signal table entry with the `resume-snapshot` signal type.
|
|
After processing the signal, the connector will resume previously paused snapshot operation.
|
|
|
|
You can resume incremental snapshots for the following {prodname} connectors:
|
|
|
|
* Db2
|
|
ifdef::community[]
|
|
* MongoDB
|
|
endif::community[]
|
|
* MySQL
|
|
* Oracle
|
|
* PostgreSQL
|
|
* SQL Server
|
|
|
|
[id="debezium-signaling-example-of-a-resume-incremental-signal-record"]
|
|
.Example of a resume incremental snapshot signal record
|
|
[cols="1,9",options="header"]
|
|
|===
|
|
|Column | Value
|
|
|
|
|id
|
|
|`d139b9b7-7777-4547-917d-e1775ea61d41`
|
|
|
|
|type
|
|
|`resume-snapshot`
|
|
|
|
|===
|
|
|
|
You must specify the `type` of the signal.
|
|
The `data` field is ignored.
|
|
|
|
For more information about incremental snapshots, see the _Snapshots_ topic in the documentation for your connector.
|
|
|
|
.Additional resources
|
|
|
|
* {link-prefix}:{link-db2-connector}#db2-incremental-snapshots[Db2 connector incremental snapshots]
|
|
ifdef::community[]
|
|
* {link-prefix}:{link-mongodb-connector}#mongodb-incremental-snapshots[MongoDB connector incremental snapshots]
|
|
endif::community[]
|
|
* {link-prefix}:{link-mysql-connector}#mysql-incremental-snapshots[MySQL connector incremental snapshots]
|
|
* {link-prefix}:{link-oracle-connector}#oracle-incremental-snapshots[Oracle connector incremental snapshots]
|
|
* {link-prefix}:{link-postgresql-connector}#postgresql-incremental-snapshots[PostgreSQL connector incremental snapshots]
|
|
* {link-prefix}:{link-sqlserver-connector}#sqlserver-incremental-snapshots[SQL Server connector incremental snapshots]
|
|
|
|
|
|
// Type: concept
|
|
[id="debezium-signaling-ad-hoc-blocking-snapshots"]
|
|
=== Blocking snapshot signals
|
|
|
|
You can request a connector to initiate an ad hoc blocking snapshot by creating a signal with the `execute-snapshot` signal type and `data.type` with value `blocking`.
|
|
After processing the signal, the connector runs the requested snapshot operation.
|
|
|
|
Unlike the initial snapshot that a connector runs after it first starts, an ad hoc blocking snapshot occurs during runtime, after the connector has stopped to stream change events from a database.
|
|
You can initiate ad hoc blocking snapshots at any time.
|
|
|
|
Blocking snapshots are available for the following {prodname} connectors:
|
|
|
|
* Db2
|
|
ifdef::community[]
|
|
* MongoDB
|
|
endif::community[]
|
|
* MySQL
|
|
* Oracle
|
|
* PostgreSQL
|
|
* SQL Server
|
|
|
|
[id="debezium-signaling-example-of-an-ad-hoc-blocking-snapshot-signal-record"]
|
|
.Example of a blocking snapshot signal record
|
|
[cols="1,9",options="header"]
|
|
|===
|
|
|Column | Value
|
|
|
|
|id
|
|
|`d139b9b7-7777-4547-917d-e1775ea61d41`
|
|
|
|
|type
|
|
|`execute-snapshot`
|
|
|
|
|data
|
|
a|
|
|
[source,json]
|
|
----
|
|
{"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]}
|
|
----
|
|
|
|
|===
|
|
|
|
[id="debezium-signaling-example-of-an-ad-hoc-blocking-snapshot-signal-message"]
|
|
.Example of a blocking snapshot signal message
|
|
[cols="1,9",options="header"]
|
|
|===
|
|
|Key | Value
|
|
|
|
|test_connector
|
|
a|
|
|
[source,json]
|
|
----
|
|
{"type":"execute-snapshot","data": {"type": "blocking"}
|
|
----
|
|
|===
|
|
|
|
|
|
For more information about blocking snapshots, see the _Snapshots_ topic in the documentation for your connector.
|
|
|
|
|
|
.Additional resources
|
|
|
|
* {link-prefix}:{link-db2-connector}#db2-blocking-snapshots[Db2 connector ad hoc blocking snapshots]
|
|
ifdef::community[]
|
|
* {link-prefix}:{link-mongodb-connector}#mongodb-blocking-snapshots[MongoDB connector ad hoc blocking snapshots]
|
|
endif::community[]
|
|
* {link-prefix}:{link-mysql-connector}#mysql-blocking-snapshots[MySQL connector ad hoc blocking snapshots]
|
|
* {link-prefix}:{link-oracle-connector}#oracle-blocking-snapshots[Oracle connector ad hoc blocking snapshots]
|
|
* {link-prefix}:{link-postgresql-connector}#postgresql-blocking-snapshots[PostgreSQL connector ad hoc blocking snapshots]
|
|
* {link-prefix}:{link-sqlserver-connector}#sqlserver-blocking-snapshots[SQL Server connector ad hoc blocking snapshots]
|
|
|
|
|
|
// Type: procedure
|
|
// ModuleID: debezium-signaling-defining-a-custom-action
|
|
// Title: Defining a custom signal action
|
|
[id="debezium-signaling-custom-action"]
|
|
=== Defining a custom action
|
|
|
|
Custom actions enable you to extend the {prodname} signaling framework to trigger actions that are not available in the default implementation.
|
|
You can use a custom action with multiple connectors.
|
|
|
|
To define a custom signal action, you must define the following interface:
|
|
|
|
|
|
[source,java,indent=0]
|
|
----
|
|
@FunctionalInterface
|
|
public interface SignalAction<P extends Partition> {
|
|
|
|
/**
|
|
* @param signalPayload the content of the signal
|
|
* @return true if the signal was processed
|
|
*/
|
|
boolean arrived(SignalPayload<P> signalPayload) throws InterruptedException;
|
|
}
|
|
|
|
----
|
|
|
|
The `io.debezium.pipeline.signal.actions.SignalAction` exposes a single method with one parameter, which represents the message payloads sent through the signaling channel.
|
|
|
|
After you define a custom signaling action, use the following SPI interface to make the custom action available to the signaling mechanism: `io.debezium.pipeline.signal.actions.SignalActionProvider`.
|
|
|
|
[source,java,indent=0]
|
|
----
|
|
public interface SignalActionProvider {
|
|
|
|
/**
|
|
* Create a map of signal action where the key is the name of the action.
|
|
*
|
|
* @param dispatcher the event dispatcher instance
|
|
* @param connectorConfig the connector config
|
|
* @return a concrete action
|
|
*/
|
|
|
|
<P extends Partition> Map<String, SignalAction<P>> createActions(EventDispatcher<P, ? extends DataCollectionId> dispatcher, CommonConnectorConfig connectorConfig);
|
|
}
|
|
----
|
|
|
|
Your implementation must return a map of the signal action.
|
|
Set the map key to the name of the action.
|
|
The key is used as the xref:debezium-signaling-description-of-required-structure-of-a-signaling-data-collection[`type`] of the signal.
|
|
|
|
// Type: concept
|
|
[id="debezium-signaling-custom-action-core-module-dependencies"]
|
|
=== {prodname} core module dependencies
|
|
|
|
A custom actions Java project has compile dependencies on the {prodname} core module.
|
|
Include the following compile dependencies in your project's `pom.xml` file:
|
|
|
|
[source,xml]
|
|
----
|
|
<dependency>
|
|
<groupId>io.debezium</groupId>
|
|
<artifactId>debezium-core</artifactId>
|
|
<version>${version.debezium}</version> // <1>
|
|
</dependency>
|
|
----
|
|
<1> `${version.debezium}` represents the version of the {prodname} connector.
|
|
|
|
Declare your provider implementation in the `META-INF/services/io.debezium.pipeline.signal.actions.SignalActionProvider` file.
|
|
|
|
// Type: procedure
|
|
// ModuleID: debezium-signaling-deploying-a-custom-action
|
|
// Title: Deploying a custom signal action
|
|
[id="deploying-a-debezium-custom-action"]
|
|
=== Deploying a custom action
|
|
|
|
.Prerequisites
|
|
* You have a custom actions Java program.
|
|
|
|
.Procedure
|
|
* To use a custom action with a {prodname} connector, export the Java project to a JAR file, and copy the file to the directory that contains the JAR file for each {prodname} connector that you want to use it with. +
|
|
+
|
|
For example, in a typical deployment, the {prodname} connector files are stored in subdirectories of a Kafka Connect directory (`/kafka/connect`), with each connector JAR in its own subdirectory (`/kafka/connect/debezium-connector-db2`, `/kafka/connect/debezium-connector-mysql`, and so forth).
|
|
|
|
NOTE: To use a custom action with multiple connectors, you must place a copy of the custom signaling channel JAR file in the subdirectory for each connector.
|