DBZ-3141 Documentation for signalling

This commit is contained in:
Jiri Pechanec 2021-03-15 08:11:20 +01:00 committed by Gunnar Morling
parent a5c1194145
commit eb0c5ccb45
3 changed files with 292 additions and 1 deletions

View File

@ -13,6 +13,7 @@
** xref:configuration/filtering.adoc[Message Filtering] ** xref:configuration/filtering.adoc[Message Filtering]
** xref:configuration/content-based-routing.adoc[Content-Based Routing] ** xref:configuration/content-based-routing.adoc[Content-Based Routing]
** xref:configuration/topic-auto-create-config.adoc[Customizing Topic Auto-Creation] ** xref:configuration/topic-auto-create-config.adoc[Customizing Topic Auto-Creation]
** xref:configuration/signalling.adoc[Sending signals to Debezium]
* Connectors * Connectors
** xref:connectors/index.adoc[Overview] ** xref:connectors/index.adoc[Overview]
** xref:connectors/mysql.adoc[MySQL] ** xref:connectors/mysql.adoc[MySQL]

View File

@ -0,0 +1,124 @@
// Category: debezium-using
// Type: assembly
// ModuleID: customization-of-kafka-connect-automatic-topic-creation
// Title: Sending signals to a Debezium connector
[id="sending-signals-to-a-debezium-connector"]
= Sending signals to a Debezium connector
:toc:
:toc-placement: macro
:linkattrs:
:icons: font
:source-highlighter: highlight.js
toc::[]
[NOTE]
====
This feature is currently in incubating state, i.e. exact semantics, configuration options etc. may change in future revisions, based on the feedback we receive. Please let us know if you encounter any problems while using this extension.
====
== Overview
Sometimes it is necessary for the user to modify a connector behaviour or trigger a one-time action (e.g. snapshot of a single table).
To fullwfill such needs {prodname} provides a mechanism how to send a signal to a {prodname} connector to trigger an action.
As it might be necessary to synchronize such an action with the dataflow the signal is implemented as a write to a data collection.
Signalling is disabled by default.
The name of the signalling data collection must be set via connector configuration parameter to enable the function.
The signalling data collection must be *explictly* added among captured data collections.
.Connector configuration
[cols="3,9",options="header"]
|===
|Parameter | Description
|`signal.data.collection`
|Fully-qualified name of data cloection. +
The name format is connector specific.
|===
=== Data collection structure
The signalling data collection must conform to a required format.
it must contain three fields.
The naming of the fields is arbitrary and only order of the field is important.
It is recommended but not mandatory to use the field names as defined in the table below.
.Signalling data collection structure
[cols="1,1,9",options="header"]
|===
|Column | Type | Description
|`id`
|`string`
|A unique identifier of this signal instance. +
It can be used for logging, debugging or deduplication.
Usually an UUID string.
|`type`
|`string`
|The type of the signal to be sent. +
There are signals common for all connectors or specific to a subset of connectors.
|`data`
|`string`
|JSON formatted parameters that are passed to a signal action. +
Each signal has its own expected set of data.
|===
.Example of a signal record
[cols="1,9",options="header"]
|===
|Column | Value
|id
|`924e3ff8-2245-43ca-ba77-2af9af02fa07`
|type
|`log`
|data
|`{"message": "Signal message at offset {}"}`
|===
== Signal Actions
These signals are common to all connectors
=== Logging
The type of the logging action is `log`.
It will print a provided message to the log optionally including streaming position.
.Action parameters
[cols="1,9",options="header"]
|===
|Name | Description
|message
|The string printed to the log. +
If a placeholder `{}` is added to the message it will be replaced with streaming coordinates.
|===
.Example of a logging record
[cols="1,9",options="header"]
|===
|Column | Value
|id
|`924e3ff8-2245-43ca-ba77-2af9af02fa07`
|type
|`log`
|data
|`{"message": "Signal message at offset {}"}`
|===

View File

@ -15,10 +15,11 @@ This connector is currently in incubating state, i.e. exact semantics, configura
==== ====
{prodname}'s Oracle Connector can monitor and record all of the row-level changes in the databases on an Oracle server. {prodname}'s Oracle Connector can monitor and record all of the row-level changes in the databases on an Oracle server.
Most notably, the connector does not yet support changes to the structure of captured tables (e.g. `ALTER TABLE...`) after the initial snapshot has been completed Most notably, the connector does not yet provide full-blown support of changes to the structure of captured tables (e.g. `ALTER TABLE...`) after the initial snapshot has been completed
(see {jira-url}/browse/DBZ-718[DBZ-718]). (see {jira-url}/browse/DBZ-718[DBZ-718]).
It is supported though to capture tables newly added while the connector is running It is supported though to capture tables newly added while the connector is running
(provided the new table's name matches the connector's filter configuration). (provided the new table's name matches the connector's filter configuration).
For table schema changes an xref:surrogate-schema-evolution[auxilliary solution] could be used.
[[oracle-overview]] [[oracle-overview]]
== Overview == Overview
@ -1779,6 +1780,171 @@ The *MBean* is `debezium.mysql:type=connector-metrics,context=schema-history,ser
include::{partialsdir}/modules/all-connectors/ref-connector-monitoring-schema-history-metrics.adoc[leveloffset=+1] include::{partialsdir}/modules/all-connectors/ref-connector-monitoring-schema-history-metrics.adoc[leveloffset=+1]
[[surrogate-schema-evolution]]
== Surrogate schema evolution
Oracle connector does not yet support updates in table schemas parsing DDL `ALTER TABLE` statements.
It is possible to use {prodname} link:/documentation/reference/configuration/signalling[signalling] to trigger the update of the database schema.
The type of the schema update action is `schema-changes`.
It will update the schema of all tables enumerated in the signal parameters.
The message contain not the update to the schema but the complete new schema structure.
.Action parameters
[cols="3,9",options="header"]
|===
|Name | Description
|`database`
|The name of the Oracle database.
|`schema`
|The name of the schema where changes are applied.
|`changes`
|An array containing the requested schema updates.
|`changes.type`
|Type of the schema change, usually `ALTER`
|`changes.id`
|The fully-qualified name of the table
|`changes.table`
|The fully-qualified name of the table
|`changes.table.defaultCharsetName`
|The character set name used for the table if different from database default
|`changes.table.primaryKeyColumnNames`
|Array with the name of columns composing the primary key
|`changes.table.columns`
|Array with the column metadata
|`...columns.name`
|The name of the column
|`...columns.jdbcType`
|The JDBC type of the column as defined at link:https://docs.oracle.com/javase/8/docs/api/java/sql/Types.html[JDBC API]
|`...columns.typeName`
|The name of the column type
|`...columns.typeExpression`
|The full column type definition
|`...columns.charsetName`
|The column character set if different from the default
|`...columns.length`
|The length/size constraint of the column
|`...columns.scale`
|The scale of numeric column
|`...columns.position`
|The position of the column in the table starting with `1`
|`...columns.optional`
|Boolean `true` if column value is not mandatory
|`...columns.autoIncremented`
|Boolean `true` if column value is automatically calculated from a sequence
|`...columns.generated`
|Boolean `true` if column value is automatically calculated
|===
.Example of a logging record
[cols="1,9a",options="header"]
|===
|Column | Value
|id
|`924e3ff8-2245-43ca-ba77-2af9af02fa07`
|type
|`schema-changes`
|data
|[source,json,indent=0,subs="attributes"]
----
{
"database":"ORCLPDB1",
"schema":"DEBEZIUM",
"changes":[
{
"type":"ALTER",
"id":"\"ORCLPDB1\".\"DEBEZIUM\".\"CUSTOMER\"",
"table":{
"defaultCharsetName":null,
"primaryKeyColumnNames":[
"ID",
"NAME"
],
"columns":[
{
"name":"ID",
"jdbcType":2,
"typeName":"NUMBER",
"typeExpression":"NUMBER",
"charsetName":null,
"length":9,
"scale":0,
"position":1,
"optional":false,
"autoIncremented":false,
"generated":false
},
{
"name":"NAME",
"jdbcType":12,
"typeName":"VARCHAR2",
"typeExpression":"VARCHAR2",
"charsetName":null,
"length":1000,
"position":2,
"optional":true,
"autoIncremented":false,
"generated":false
},
{
"name":"SCORE",
"jdbcType":2,
"typeName":"NUMBER",
"typeExpression":"NUMBER",
"charsetName":null,
"length":6,
"scale":2,
"position":3,
"optional":true,
"autoIncremented":false,
"generated":false
},
{
"name":"REGISTERED",
"jdbcType":93,
"typeName":"TIMESTAMP(6)",
"typeExpression":"TIMESTAMP(6)",
"charsetName":null,
"length":6,
"position":4,
"optional":true,
"autoIncremented":false,
"generated":false
}
]
}
}
]
}
----
|===
[[oracle-when-things-go-wrong]] [[oracle-when-things-go-wrong]]
== Behavior when things go wrong == Behavior when things go wrong