tet123/documentation/modules/ROOT/pages/configuration/signalling.adoc

504 lines
20 KiB
Plaintext

// Category: debezium-using
// Type: assembly
[id="sending-signals-to-a-debezium-connector"]
= Sending signals to a {prodname} connector
ifdef::community[]
:toc:
:toc-placement: macro
:linkattrs:
:icons: font
:source-highlighter: highlight.js
toc::[]
== Overview
endif::community[]
The {prodname} signaling mechanism provides a way to modify the behavior of a connector, or to trigger a one-time action, such as initiating an xref:debezium-signaling-ad-hoc-snapshots[ad hoc snapshot] of a table.
To trigger a connector to perform a specified action, you can use different channels:
* SourceSignalChannel: you can issue a SQL command to add a signal message to a specialized signaling table, also referred to as a signaling data collection.
The signaling table, which you create on the source database, is designated exclusively for communicating with {prodname}.
* KafkaSignalChannel: you can issue a message to a configurable Kafka topic.
* Custom: you can xref:debezium-signaling-enabling-custom-signaling-channel[implement your custom channel] to provide signals to {prodname}
When {prodname} detects that a new xref:debezium-signaling-example-of-a-logging-record[logging record] or xref:debezium-signaling-example-of-an-ad-hoc-signal-record[ad hoc snapshot record] is added to the channel, it reads the signal, and initiates the requested operation.
Signaling is available for use with the following {prodname} connectors:
* Db2
* MongoDB (Technology Preview)
* MySQL
* Oracle
* PostgreSQL
* SQL Server
You can define which channel is enabled with `signal.enabled.channels` configuration property. It's a list of channels names that are enabled, the built-in channels are: `source` and `kafka`.
The `source` channel is enabled by default since it is required for incremental snapshot signals.
// Type: procedure
// Title: Enabling {prodname} source signaling channel
[id="debezium-signaling-enabling-source-signaling-channel"]
== Enabling source signaling channel
By default, the {prodname} source signaling channel is enabled.
You must explicitly configure signaling for each connector that you want to use it with.
.Procedure
. On the source database, create a signaling data collection table for sending signals to the connector.
For information about the required structure of the signaling data collection, see xref:debezium-signaling-data-collection-structure[Structure of a signaling data collection].
. For source databases such as Db2 or SQL Server that implement a native change data capture (CDC) mechanism, enable CDC for the signaling table.
. Add the name of the signaling data collection to the {prodname} connector configuration. +
In the connector configuration, add the property `signal.data.collection`, and set its value to the fully-qualified name of the signaling data collection that you created in Step 1. +
+
For example, `signal.data.collection = inventory.debezium_signals`. +
+
The format for the fully-qualified name of the signaling collection depends on the connector. +
The following example shows the naming formats to use for each connector:
Db2:: `_<schemaName>_._<tableName>_`
MongoDB:: `_<databaseName>_._<collectionName>_`
MySQL:: `_<databaseName>_._<tableName>_`
Oracle:: `_<databaseName>_._<schemaName>_._<tableName>_`
PostgreSQL:: `_<schemaName>_._<tableName>_`
SQL Server:: `_<databaseName>_._<schemaName>_._<tableName>_` +
+
For more information about setting the `signal.data.collection` property, see the table of configuration properties for your connector.
// Type: reference
// ModuleID: debezium-signaling-required-structure-of-a-signaling-data-collection
// Title: Required structure of a {prodname} signaling data collection
[id="debezium-signaling-data-collection-structure"]
=== Structure of a signaling data collection
A signaling data collection, or signaling table, stores signals that you send to a connector to trigger a specified operation.
The structure of the signaling table must conform to the following standard format.
* Contains three fields (columns).
* Fields are arranged in a specific order, as shown in xref:debezium-signaling-description-of-required-structure-of-a-signaling-data-collection[Table 1].
.Structure of a signaling data collection
[id="debezium-signaling-description-of-required-structure-of-a-signaling-data-collection"]
.Required structure of a signaling data collection
[cols="1,1,9",options="header"]
|===
|Field | Type | Description
|`id` +
(required)
|`string`
|An arbitrary unique string that identifies a signal instance. +
You assign an `id` to each signal that you submit to the signaling table. +
Typically the ID is a UUID string. +
You can use signal instances for logging, debugging, or de-duplication. +
When a signal triggers {prodname} to perform an incremental snapshot, it generates a signal message with an arbitrary `id` string.
The `id` string that the generated message contains is unrelated to the `id` string in the submitted signal.
|`type` +
(required)
|`string`
|Specifies the type of signal to send. +
You can use some signal types with any connector for which signaling is available, while other signal types are available for specific connectors only.
|`data` +
(optional)
|`string`
|Specifies JSON-formatted parameters to pass to a signal action. +
Each signal type requires a specific set of data.
|===
NOTE: The field names in a data collection are arbitrary.
The preceding table provides suggested names.
If you use a different naming convention, ensure that the values in each field are consistent with the expected content.
// Type: procedure
// Title: Creating a {prodname} signaling data collection
[id="debezium-signaling-creating-a-signal-data-collection"]
=== Creating a signaling data collection
You create a signaling table by submitting a standard SQL DDL query to the source database.
.Prerequisites
* You have sufficient access privileges to create a table on the source database.
.Procedure
* Submit a SQL query to the source database to create a table that is consistent with the xref:debezium-signaling-description-of-required-structure-of-a-signaling-data-collection[required structure], as shown in the following example: +
+
`CREATE TABLE _<tableName>_ (id VARCHAR(_<varcharValue>_) PRIMARY KEY, type VARCHAR(__<varcharValue>__) NOT NULL, data VARCHAR(_<varcharValue>_) NULL);` +
[NOTE]
====
The amount of space that you allocate to the `VARCHAR` parameter of the `id` variable must be sufficient to accommodate the size of the ID strings of signals sent to the signaling table. +
If the size of an ID exceeds the available space, the connector cannot process the signal.
====
The following example shows a `CREATE TABLE` command that creates a three-column `debezium_signal` table:
[source,sql]
----
CREATE TABLE debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);
----
// Type: procedure
// Title: Enabling {prodname} kafka signaling channel
[id="debezium-signaling-enabling-kafka-signaling-channel"]
== Enabling Kafka signaling channel
You can enable Kafka signaling channel adding it to the `signal.enabled.channels` configuration property and configuring the `signal.kafka.topic` property with the name of the topic where you will send signals.
=== Message format
The key of the Kafka message must match the value of the `topic.prefix` connector configuration option.
The value is a JSON object with `type` and `data` fields.
When signal type is `execute-snapshot`, the `data` field must have the following fields:
.Execute snapshot data fields
[cols="2,2,6",options="header"]
|===
|Field | Default | Value
|`type`
|`incremental`
| The type of the snapshot to be executed. Currently only `incremental` is supported. +
See the next section for more details.
|`data-collections`
|_N/A_
| An array of comma-separated regular expressions that match fully-qualified names of tables to be snapshotted. +
The format of the names is the same as for xref:#{context}-property-signal-data-collection[signal.data.collection] configuration option.
|`additional-condition`
|_N/A_
| An optional string, which specifies a condition based on the column(s) of the {data-collection}(s), to capture a
subset of the contents of the {data-collection}(s).
|===
An example of the execute-snapshot Kafka message:
----
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`
----
// Type: procedure
// Title: Provide custom signaling channel
[id="debezium-signaling-enabling-custom-signaling-channel"]
== Provide custom signaling channel
Custom signaling channels are Java classes that implement the `io.debezium.pipeline.signal.channels.SignalChannelReader` service provider interface (SPI).
[source,java,indent=0]
----
public interface SignalChannelReader {
String name(); // <1>
void init(CommonConnectorConfig connectorConfig); // <2>
List<SignalRecord> read(); // <3>
void close(); // <4>
}
----
<1> The name of the reader, used in `signal.enabled.channels` to enable the channel.
<2> Initialize specific configuration/variables/connections required by the channel
<3> Read signal form the channel. This method is called by `SignalProcessor` class to retrieve the signal to process.
<4> Close all allocated resources. Called when the connector is stopped.
// Type: concept
[id="debezium-core-module-dependency"]
=== {prodname} Core module dependency
A custom converter Java project has compile dependency on the {prodname} Core module.
These compile dependencies must be included in your project's `pom.xml`, as shown in the following example:
[source,xml]
----
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>${version.debezium}</version> // <1>
</dependency>
----
<1> `${version.debezium}` represents the version of the {prodname} connector.
Declare you implementation in the `META-INF/services/io.debezium.pipeline.signal.channels.SignalChannelReader` file.
// Type: procedure
[id="deploying-a-debezium-custom-signaling-channel"]
=== Deploying a custom signaling channel
.Prerequisites
* You have a custom signaling channel Java program.
.Procedure
* To use a signaling channel with a {prodname} connector, export the Java project to a JAR file, and copy the file to the directory that contains the JAR file for each {prodname} connector that you want to use it with. +
+
For example, in a typical deployment, the {prodname} connector files are stored in subdirectories of a Kafka Connect directory (`/kafka/connect`), with each connector JAR in its own subdirectory (`/kafka/connect/debezium-connector-db2`, `/kafka/connect/debezium-connector-mysql`, and so forth).
To use a signaling channel with a connector, add the converter JAR file to the connector's subdirectory.
NOTE: To use a signaling channel with multiple connectors, you must place a copy of the signaling channel JAR file in each connector subdirectory.
// Type: procedure
[id="configuring-a-connectors-to-use-a-custom-signaling-channel"]
=== Configuring a connector to use a custom signaling channel
You need to add the name of the custom signaling channel to the configuration property `signal.enabled.channels`.
// Type: concept
// ModuleID: debezium-signaling-types-of-signal-actions
// Title: Types of {prodname} signal actions
== Signal actions
You can use signaling to initiate the following actions:
* xref:debezium-signaling-logging[Add messages to the log].
* xref:debezium-signaling-ad-hoc-snapshots[Trigger ad hoc snapshots].
* xref:debezium-signaling-stop-ad-hoc-snapshots[Stop execution of an ad hoc snapshot].
* xref:debezium-signaling-pause-incremental-snapshots[Pause incremental snapshots].
* xref:debezium-signaling-resume-incremental-snapshots[Resume incremenal snapshots].
Some signals are not compatible with all connectors.
// Type: concept
[id="debezium-signaling-logging"]
=== Logging signals
You can request a connector to add an entry to the log by creating a signaling table entry with the `log` signal type.
After processing the signal, the connector prints the specified message to the log.
Optionally, you can configure the signal so that the resulting message includes the streaming coordinates.
[id="debezium-signaling-example-of-a-logging-record"]
.Example of a signaling record for adding a log message
[cols="1,9,9",options="header"]
|===
|Column | Value | Description
|id
|`924e3ff8-2245-43ca-ba77-2af9af02fa07`
|
|type
|`log`
|The action type of the signal.
|data
|`{"message": "Signal message at offset {}"}`
| The `message` parameter specifies the string to print to the log. +
If you add a placeholder (`{}`) to the message, it is replaced with streaming coordinates.
|===
// Type: concept
[id="debezium-signaling-ad-hoc-snapshots"]
=== Ad hoc snapshot signals
You can request a connector to initiate an ad hoc snapshot by creating a signal with the `execute-snapshot` signal type.
After processing the signal, the connector runs the requested snapshot operation.
Unlike the initial snapshot that a connector runs after it first starts, an ad hoc snapshot occurs during runtime, after the connector has already begun to stream change events from a database.
You can initiate ad hoc snapshots at any time.
Ad hoc snapshots are available for the following {prodname} connectors:
* Db2
ifdef::community[]
* MongoDB
endif::community[]
* MySQL
* Oracle
* PostgreSQL
* SQL Server
[id="debezium-signaling-example-of-an-ad-hoc-signal-record"]
.Example of an ad hoc snapshot signal record
[cols="1,9",options="header"]
|===
|Column | Value
|id
|`d139b9b7-7777-4547-917d-e1775ea61d41`
|type
|`execute-snapshot`
|data
|`{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}`
|===
[id="debezium-signaling-example-of-an-ad-hoc-signal-message"]
.Example of an ad hoc snapshot signal message
[cols="1,9",options="header"]
|===
|Key | Value
|test_connector
|`{"type":"execute-snapshot","data": {"data-collections": ["public.MyFirstTable"], "type": "INCREMENTAL", "additional-condition":"color=blue AND brand=foo"}}`
|===
For more information about ad hoc snapshots, see the _Snapshots_ topic in the documentation for your connector.
.Additional resources
* {link-prefix}:{link-db2-connector}#db2-ad-hoc-snapshots[Db2 connector ad hoc snapshots]
ifdef::community[]
* {link-prefix}:{link-mongodb-connector}#mongodb-ad-hoc-snapshots[MongoDB connector ad hoc snapshots]
endif::community[]
* {link-prefix}:{link-mysql-connector}#mysql-ad-hoc-snapshots[MySQL connector ad hoc snapshots]
* {link-prefix}:{link-oracle-connector}#oracle-ad-hoc-snapshots[Oracle connector ad hoc snapshots]
* {link-prefix}:{link-postgresql-connector}#postgresql-ad-hoc-snapshots[PostgreSQL connector ad hoc snapshots]
* {link-prefix}:{link-sqlserver-connector}#sqlserver-ad-hoc-snapshots[SQL Server connector ad hoc snapshots]
[id="debezium-signaling-stop-ad-hoc-snapshots"]
=== Ad hoc snapshot stop signals
You can request a connector to stop an in-progress ad hoc snapshot by creating a signal table entry with the `stop-snapshot` signal type.
After processing the signal, the connector will stop the current in-progress snapshot operation.
You can stop ad hoc snapshots for the following {prodname} connectors:
* Db2
ifdef::community[]
* MongoDB
endif::community[]
* MySQL
* Oracle
* PostgreSQL
* SQL Server
[id="debezium-signaling-example-of-a-stop-ad-hoc-signal-record"]
.Example of a stop ad hoc snapshot signal record
[cols="1,9",options="header"]
|===
|Column | Value
|id
|`d139b9b7-7777-4547-917d-e1775ea61d41`
|type
|`stop-snapshot`
|data
|`{"type":"INCREMENTAL", "data-collections": ["public.MyFirstTable"]}`
|===
You must specify the `type` of the signal.
The `data-collections` field is optional.
Leave the `data-collections` field blank to request the connector to stop all activity in the current snapshot.
If you want the incremental snapshot to proceed, but you want to exclude specific collections from the snapshot, provide a comma-separated list of the names of the collections or regular expressions to exclude.
After the connector processes the signal, the incremental snapshot proceeds, but it excludes data from the collections that you specify.
// Type: concept
[id="debezium-signaling-incremental-snapshots"]
=== Incremental snapshots
Incremental snapshots are a specific type of ad hoc snapshot.
In an incremental snapshot, the connector captures the baseline state of the tables that you specify, similar to an initial snapshot.
However, unlike an initial snapshot, an incremental snapshot captures tables in chunks, rather than all at once.
The connector uses a watermarking method to track the progress of the snapshot.
By capturing the initial state of the specified tables in chunks rather than in a single monolithic operation, incremental snapshots provide the following advantages over the initial snapshot process:
* While the connector captures the baseline state of the specified tables, streaming of near real-time events from the transaction log continues uninterrupted.
* If the incremental snapshot process is interrupted, it can be resumed from the point at which it stopped.
* You can initiate an incremental snapshot at any time.
[id="debezium-signaling-pause-incremental-snapshots"]
==== Incremental snapshot pause signals
You can request a connector to pause an in-progress incremental snapshot by creating a signal table entry with the `pause-snapshot` signal type.
After processing the signal, the connector will stop pause current in-progress snapshot operation.
Therefor it's not possible to specify the data collection as the snapshot processing will be paused in position where it is in time of processing of the signal.
You can pause incremental snapshots for the following {prodname} connectors:
* Db2
ifdef::community[]
* MongoDB
endif::community[]
* MySQL
* Oracle
* PostgreSQL
* SQL Server
[id="debezium-signaling-example-of-a-pause-incremental-signal-record"]
.Example of a pause incremental snapshot signal record
[cols="1,9",options="header"]
|===
|Column | Value
|id
|`d139b9b7-7777-4547-917d-e1775ea61d41`
|type
|`pause-snapshot`
|===
You must specify the `type` of the signal.
The `data` field is ignored.
[id="debezium-signaling-resume-incremental-snapshots"]
==== Incremental snapshot resume signals
You can request a connector to resume a paused incremental snapshot by creating a signal table entry with the `resume-snapshot` signal type.
After processing the signal, the connector will resume previously paused snapshot operation.
You can resume incremental snapshots for the following {prodname} connectors:
* Db2
ifdef::community[]
* MongoDB
endif::community[]
* MySQL
* Oracle
* PostgreSQL
* SQL Server
[id="debezium-signaling-example-of-a-resume-incremental-signal-record"]
.Example of a resume incremental snapshot signal record
[cols="1,9",options="header"]
|===
|Column | Value
|id
|`d139b9b7-7777-4547-917d-e1775ea61d41`
|type
|`resume-snapshot`
|===
You must specify the `type` of the signal.
The `data` field is ignored.
For more information about incremental snapshots, see the _Snapshots_ topic in the documentation for your connector.
.Additional resources
* {link-prefix}:{link-db2-connector}#db2-incremental-snapshots[Db2 connector incremental snapshots]
ifdef::community[]
* {link-prefix}:{link-mongodb-connector}#mongodb-incremental-snapshots[MongoDB connector incremental snapshots]
endif::community[]
* {link-prefix}:{link-mysql-connector}#mysql-incremental-snapshots[MySQL connector incremental snapshots]
* {link-prefix}:{link-oracle-connector}#oracle-incremental-snapshots[Oracle connector incremental snapshots]
* {link-prefix}:{link-postgresql-connector}#postgresql-incremental-snapshots[PostgreSQL connector incremental snapshots]
* {link-prefix}:{link-sqlserver-connector}#sqlserver-incremental-snapshots[SQL Server connector incremental snapshots]