DBZ-2105 Started to replace contractions, instances of "we/our", spelling of behavior. Not exhaustive.

This commit is contained in:
TovaCohen 2020-05-26 14:47:15 -04:00 committed by Chris Cranford
parent 6c13e8265e
commit 2e6e81543b
17 changed files with 118 additions and 118 deletions

View File

@ -30,7 +30,7 @@ ifdef::community[]
== Embedded Engine
An alternative way for using the {prodname} connectors is the xref:operations/embedded.adoc[embedded engine].
In this case, {prodname} won't be run via Kafka Connect, but as a library embedded into your custom Java applications.
In this case, {prodname} will not be run via Kafka Connect, but as a library embedded into your custom Java applications.
This can be useful for either consuming change events within your application itself,
without the needed for deploying complete Kafka and Kafka Connect clusters,
or for streaming changes to alternative messaging brokers such as Amazon Kinesis.

View File

@ -45,7 +45,7 @@ You can install them into any Kafka distribution and use them with Kafka Connect
[NOTE]
====
The Apicurio project also provides a JSON converter that can be used with the Apicurio registry.
This combines the advantage of less verbose messages (as messages don't contain the schema information themselves, but only a schema id)
This combines the advantage of less verbose messages (as messages do not contain the schema information themselves, but only a schema id)
with human-readable JSON.
====

View File

@ -42,7 +42,7 @@ E.g. the general message structure for a insert event looks like this:
More details about the message structure are provided in {link-prefix}:{link-mongodb-connector}[the documentation] of the MongoDB connector.
While this structure is a good fit to represent changes to MongoDB's schemaless collections,
it's not understood by existing sink connectors as for instance the Confluent JDBC sink connector.
it is not understood by existing sink connectors such as the Confluent JDBC sink connector.
Therefore {prodname} provides a {link-kafka-docs}/#connect_transforms[a single message transformation] (SMT)
which converts the `after`/`patch` information from the MongoDB CDC events into a structure suitable for consumption by existing sink connectors.
@ -206,7 +206,7 @@ Note that other mongo operations might cause an `$unset` internally, `$rename` i
=== Determine original operation
When a message is flattened the final result won't show whether it was an insert, update or first read (Deletions can be detected via tombstones or rewrites, see {link-prefix}:{link-mongodb-event-flattening}#mongodb-extract-new-record-state-configuration-options[Configuration options]).
When a message is flattened the final result does not show whether it was an insert, update or first read. (Deletions can be detected via tombstones or rewrites, see {link-prefix}:{link-mongodb-event-flattening}#mongodb-extract-new-record-state-configuration-options[Configuration options].)
To solve this problem {prodname} offers an option to propagate the original operation via a header added to the message.
To enable this feature the option `operation.header` must be set to `true`.

View File

@ -103,7 +103,7 @@ This result was achieved with the {link-prefix}:{link-outbox-event-router}#outbo
|[[outbox-event-router-property-table-field-event-timestamp]]<<outbox-event-router-property-table-field-event-timestamp, `table.field.event.timestamp`>>
|
|Table
|Optionally you can override the Kafka message timestamp with a value from a chosen field, otherwise it'll be the {prodname} event processed timestamp.
|Optionally you can override the Kafka message timestamp with a value from a chosen field, otherwise it is the {prodname} event processed timestamp.
|[[outbox-event-router-property-table-field-event-payload]]<<outbox-event-router-property-table-field-event-payload, `table.field.event.payload`>>
|`payload`
@ -123,7 +123,7 @@ This result was achieved with the {link-prefix}:{link-outbox-event-router}#outbo
|[[outbox-event-router-property-table-field-event-schema-version]]<<outbox-event-router-property-table-field-event-schema-version, `table.field.event.schema.version`>>
|
|Table, Schema
|When set, it'll be used as schema version as in the https://kafka.apache.org/20/javadoc/org/apache/kafka/connect/data/ConnectSchema.html#version--[Kafka Connect Schema] javadoc
|When set, it is used as schema version as in the https://kafka.apache.org/20/javadoc/org/apache/kafka/connect/data/ConnectSchema.html#version--[Kafka Connect Schema] javadoc
|[[outbox-event-router-property-route-by-field]]<<outbox-event-router-property-route-by-field, `route.by.field`>>
|`aggregatetype`
@ -148,7 +148,7 @@ This result was achieved with the {link-prefix}:{link-outbox-event-router}#outbo
|[[outbox-event-router-property-debezium-op-invalid-behavior]]<<outbox-event-router-property-debezium-op-invalid-behavior, `debezium.op.invalid.behavior`>>
|`warn`
|{prodname}
|While {prodname} is monitoring the table, it's not expecting to see 'update' row events, in case it happens, this transform can log it as warning, error or stop the process. Options are `warn`, `error` and `fatal`
|While {prodname} is monitoring the table, it is not expecting to see 'update' row events, in case it happens, this transform can log it as warning, error or stop the process. Options are `warn`, `error` and `fatal`
|===
=== Default table columns
@ -198,7 +198,7 @@ transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
=== Using Avro as the payload format
The outbox routing SMT supports arbitrary payload formats, as the payload column value is passed on transparently.
As an alternative to working with JSON as shown above it's therefore also possible to use Avro.
As an alternative to working with JSON as shown above it is therefore also possible to use Avro.
This can be beneficial for the purposes of message format governance and making sure outbox event schemas evolve in a backwards-compatible way.
How a source application produces Avro messages as an outbox event payload is out of the scope of this documentation.

View File

@ -172,7 +172,7 @@ DDLs are not recorded in commit logs. When the schema of a table change, this ch
When sending a message to a topic, the Kafka Connect schema for the key and the value will be automatically registered in the Confluent Schema Registry under the subject t-key and t-value, respectively, if the compatibility test passes. Although it is possible to replay a history of all table schemas via the Schema Registry, only the latest schema of each table is used to generate CDC events.
**TODO**: look into if it's possible to leverage schema history to rebuild schema that exist at the specific position in the commit log, rather than the current schema, when restarting the connector. I don't think it's possible right now, because writes to Cassandra node are not received in order.
**TODO**: look into whether it is possible to leverage schema history to rebuild schema that exist at the specific position in the commit log, rather than the current schema, when restarting the connector. I don't think it is possible right now, because writes to Cassandra node are not received in order.
[[cassandra-events]]
=== Events
@ -622,7 +622,7 @@ When we compare this to the value in the _insert_ and _update_ event, we see a c
* The `source` field structure has the same fields as before, but the values are different since this event is from a different position in the commit log.
* The `ts_ms` shows the timestamp milliseconds which the connector processed this event.
**TODO**: given TTL is not currently support, would it be better to remove delete_ts? would it also be okay to derive whether a field is set or not by looking at the each column to see if it's null?
**TODO**: given TTL is not currently support, would it be better to remove delete_ts? would it also be okay to derive whether a field is set or not by looking at the each column to see if it is null?
**TODO**: discuss tombstone events in Cassandra connector
@ -996,7 +996,7 @@ refreshing the cached Cassandra table schemas.
|[[cassandra-property-tombstones-on-delete]]<<cassandra-property-tombstones-on-delete, `tombstones.on.delete`>>
|`false`
|Whether deletion events should have a subsequent tombstone event (true) or not (false). It's important to note that in Cassandra, two events with the same key may be updating different columns of a given table. So this could potentially result in records being lost during compaction if they haven't been consumed by the consumer yet. In other words, do NOT set this to true if you have kafka compaction turned on.
|Whether deletion events should have a subsequent tombstone event (true) or not (false). It's important to note that in Cassandra, two events with the same key may be updating different columns of a given table. So this could potentially result in records being lost during compaction if they have not been consumed by the consumer yet. In other words, do NOT set this to true if you have Kafka compaction turned on.
|[[cassandra-property-field-blacklist]]<<cassandra-property-field-blacklist, `field.blacklist`>>
|

View File

@ -40,7 +40,7 @@ The connector has been tested with Db2/Linux 11.5.0.0, but our expectation is th
The database administrator must put the tables to be monitored into capture mode.
For convenience and for automating testing we have written a User Defined Function (UDF) in C that can be compiled and then used to control the ASN agent, create the ASN schemas/tables and add/remove tables to capture.
These utilities are described in debezium-connector-db2/src/test/docker/db2-cdc-docker/
These utilities are described in `debezium-connector-db2/src/test/docker/db2-cdc-docker/`
but these are only for convenience and manually performing the same commands using
_db2_ control commands would have the same effect.
@ -49,7 +49,7 @@ The connector then produces a _change event_ for every row-level insert, update,
The client applications read the Kafka topics that correspond to the database tables they're interested in following, and react to every row-level event it sees in those topics.
The database administrator normally enables _CDC_ in the middle of the life of a table.
This means that the connector won't have the complete history of all changes that have been made to the table.
This means that the connector does not have the complete history of all changes that have been made to the table.
Therefore, when the Db2 connector first connects to a particular Db2 database, it starts by performing a _consistent snapshot_ of each of the whitelisted tables.
After the connector completes the snapshot, it continues streaming changes from the exact point at which the snapshot was made for tables in capture mode.
This way, we start with a consistent view of all of the data, yet continue reading without having lost any of the changes made while the snapshot was taking place.
@ -264,7 +264,7 @@ This assures that the changes are replayed by {prodname} in the same order as we
After a restart, the connector will resume from the offset (commit and change LSNs) where it left off before.
The connector is able to detect whether the CDC is enabled or disabled for whitelisted source table during the runtime and modify its behaviour.
The connector is able to detect whether the CDC is enabled or disabled for whitelisted source table during the runtime and modify its behavior.
[[db2-topic-names]]
=== Topic names
@ -490,10 +490,10 @@ Every change event for the `customers` table while it has this definition will f
}
----
The `schema` portion of the key contains a Kafka Connect schema describing what is in the key portion, and in our case that means that the `payload` value is not optional, is a structure defined by a schema named `mydatabase.MYSCHEMA.CUSTOMERS.Key`, and has one required field named `id` of type `int32`.
If we look at the value of the key's `payload` field, we'll see that it is indeed a structure (which in JSON is just an object) with a single `id` field, whose value is `1004`.
The `schema` portion of the key contains a Kafka Connect schema describing what is in the key portion. In this case, it means that the `payload` value is not optional, is a structure defined by a schema named `mydatabase.MYSCHEMA.CUSTOMERS.Key`, and has one required field named `id` of type `int32`.
If you look at the value of the key's `payload` field, you can see that it is indeed a structure (which in JSON is just an object) with a single `id` field, whose value is `1004`.
Therefore, we interpret this key as describing the row in the `MYSCHEMA.CUSTOMERS` table (output from the connector reading from database `mydatabase`) whose `id` primary key column had a value of `1004`.
Therefore, you can interpret this key as describing the row in the `MYSCHEMA.CUSTOMERS` table (output from the connector reading from database `mydatabase`) whose `id` primary key column had a value of `1004`.
////
[NOTE]
@ -1243,8 +1243,8 @@ The `connect.decimal.precision` schema parameter contains an integer representin
== Deploying a connector
If you've already installed https://zookeeper.apache.org[Zookeeper], http://kafka.apache.org/[Kafka], and {link-kafka-docs}.html#connect[Kafka Connect], then using {prodname}'s Db2` connector is easy.
First download the https://repo1.maven.org/maven2/io/debezium/debezium-connector-db2/{debezium-version}/debezium-connector-db2-{debezium-version}-plugin.tar.gz[connector's plugin archive],
extract it, and add the contained directory to Kafka Connect's plugin path using the {link-kafka-docs}/#connectconfigs[plugin.path] configuration property.
First download the https://repo1.maven.org/maven2/io/debezium/debezium-connector-db2/{debezium-version}/debezium-connector-db2-{debezium-version}-plugin.tar.gz[connector's plug-in archive],
extract it, and add the contained directory to Kafka Connect's `plugin.path` by using the {link-kafka-docs}/#connectconfigs[`plugin.path`] configuration property.
In addition, due to licensing reasons you need to separately obtain the https://www.ibm.com/support/pages/db2-jdbc-driver-versions-and-downloads[JDBC driver for Db2].
Add the JDBC driver JAR to the directory with the {prodname} Db2 connector JARs.

View File

@ -257,7 +257,7 @@ Every change event for the `customers` collection will feature the same key stru
}
----
The `schema` portion of the key contains a Kafka Connect schema describing what is in the payload portion, and in our case that means that the `payload` value is not optional, is a structure defined by a schema named `fulfillment.inventory.customers.Key`, and has one required field named `id` of type `string`. If we look at the value of the key's `payload` field, we'll see that it is indeed a structure (which in JSON is just an object) with a single `id` field, whose value is a string containing the integer `1004`.
The `schema` portion of the key contains a Kafka Connect schema describing what is in the payload portion. In this case, it means that the `payload` value is not optional, is a structure defined by a schema named `fulfillment.inventory.customers.Key`, and has one required field named `id` of type `string`. If you look at the value of the key's `payload` field, you can see that it is indeed a structure (which in JSON is just an object) with a single `id` field, whose value is a string containing the integer `1004`.
This example used a document with an integer identifier, but any valid MongoDB document identifier (including documents) will work. The value of the `id` field in the payload will simply be a string representing a MongoDB extended JSON serialization (strict mode) of the original document's `_id` field. Find below a few examples showing how `_id` fields of
different types will get encoded as the event key's payload:
@ -477,13 +477,13 @@ All examples in this document were obtained from MongoDB 3.4 and might differ if
[NOTE]
====
Update events in MongoDB's oplog don't have the _before_ or _after_ states of the changed document, so there's no way for the connector to provide this information.
Update events in MongoDB's oplog do not have the _before_ or _after_ states of the changed document, so there's no way for the connector to provide this information.
However, because _create_ or _read_ events _do_ contain the starting state, downstream consumers of the stream can actually fully-reconstruct the state by keeping the latest state for each document and applying each event to that state. {prodname} connector's are not able to keep such state, so it is not able to do this.
====
So far we've seen samples of _create_/_read_ and _update_ events. Now, let's look at the value of a _delete_ event for the same collection.
The value of an _delete_ event on this collection will also have the exact same _schema_, and its payload is structured the same but will hold different values.
In particular, a delete event will not have an `after` value or a `patch` value:
So far, you have seen samples of _create_/_read_ and _update_ events. The following sample shows the value of a _delete_ event for the same collection.
The value of a _delete_ event on this collection has the exact same _schema_, and its payload is structured the same but it holds different values.
In particular, a delete event does not have an `after` value nor a `patch` value:
[source,json,indent=0,subs="attributes"]
----
@ -611,20 +611,20 @@ An example of messages looks like
== Deploying the MongoDB connector
ifdef::community[]
If you've already installed https://zookeeper.apache.org[Zookeeper], http://kafka.apache.org/[Kafka], and {link-kafka-docs}.html#connect[Kafka Connect], then using {prodname}'s MongoDB connector is easy.
If you have already installed https://zookeeper.apache.org[Zookeeper], http://kafka.apache.org/[Kafka], and {link-kafka-docs}.html#connect[Kafka Connect], then using {prodname}'s MongoDB connector is easy.
Simply download the
ifeval::['{page-version}' == 'master']
{link-mongodb-plugin-snapshot}[connector's plugin archive],
{link-mongodb-plugin-snapshot}[connector's plug-in archive],
endif::[]
ifeval::['{page-version}' != 'master']
https://repo1.maven.org/maven2/io/debezium/debezium-connector-mongodb/{debezium-version}/debezium-connector-mongodb-{debezium-version}-plugin.tar.gz[connector's plugin archive],
https://repo1.maven.org/maven2/io/debezium/debezium-connector-mongodb/{debezium-version}/debezium-connector-mongodb-{debezium-version}-plugin.tar.gz[connector's plug-in archive],
endif::[]
extract the JARs into your Kafka Connect environment, and add the directory with the JARs to Kafka Connect's plugin path using the {link-kafka-docs}/#connectconfigs[plugin.path] configuration property.
extract the JARs into your Kafka Connect environment, and add the directory with the JARs to Kafka Connect's `plugin.path` by using the {link-kafka-docs}/#connectconfigs[plugin.path] configuration property.
Restart your Kafka Connect process to pick up the new JARs.
endif::community[]
ifdef::product[]
Installing the MongoDB connector is a simple process whereby you only need to download the JAR, extract it to your Kafka Connect environment, and ensure the plugin's parent directory is specified in your Kafka Connect environment.
Installing the MongoDB connector is a simple process whereby you only need to download the JAR, extract it to your Kafka Connect environment, and ensure the plug-in's parent directory is specified in your Kafka Connect environment.
.Prerequisites
@ -635,7 +635,7 @@ Installing the MongoDB connector is a simple process whereby you only need to do
. Download the {prodname} link:https://access.redhat.com/jbossnetwork/restricted/listSoftware.html?product=red.hat.integration&downloadType=distributions[MongoDB connector].
. Extract the files into your Kafka Connect environment.
. Add the plugin's parent directory to your Kafka Connect plugin path:
. Add the plug-in's parent directory to your Kafka Connect `plugin.path`:
+
[source]
----
@ -1029,7 +1029,7 @@ Because there is a chance that some events may be duplicated during a recovery f
=== Kafka becomes unavailable
As the connector generates change events, the Kafka Connect framework records those events in Kafka using the Kafka producer API. Kafka Connect will also periodically record the latest offset that appears in those change events, at a frequency you've specified in the Kafka Connect worker configuration. If the Kafka brokers become unavailable, the Kafka Connect worker process running the connectors will simply repeatedly attempt to reconnect to the Kafka brokers. In other words, the connector tasks will simply pause until a connection can be reestablished, at which point the connectors will resume exactly where they left off.
As the connector generates change events, the Kafka Connect framework records those events in Kafka using the Kafka producer API. Kafka Connect will also periodically record the latest offset that appears in those change events, at a frequency that you have specified in the Kafka Connect worker configuration. If the Kafka brokers become unavailable, the Kafka Connect worker process running the connectors will simply repeatedly attempt to reconnect to the Kafka brokers. In other words, the connector tasks will simply pause until a connection can be reestablished, at which point the connectors will resume exactly where they left off.
=== Connector is stopped for a duration
@ -1047,6 +1047,6 @@ In this case, when the connector configured with _initial_ snapshot mode (the de
=== MongoDB loses writes
It is possible for MongoDB to lose commits in specific failure situations. For example, if the primary applies a change and records it in its oplog before it then crashes unexpectedly, the secondary nodes may not have had a chance to read those changes from the primary's oplog before the primary crashed. If one such secondary is then elected as primary, it's oplog is missing the last changes that the old primary had recorded and no longer has those changes.
It is possible for MongoDB to lose commits in specific failure situations. For example, if the primary applies a change and records it in its oplog before it then crashes unexpectedly, the secondary nodes may not have had a chance to read those changes from the primary's oplog before the primary crashed. If one such secondary is then elected as primary, its oplog is missing the last changes that the old primary had recorded and no longer has those changes.
In these cases where MongoDB loses changes recorded in a primary's oplog, it is possible that the MongoDB connector may or may not capture these lost changes. At this time, there is no way to prevent this side effect of MongoDB.

View File

@ -24,7 +24,7 @@ It is supported though to capture tables newly added while the connector is runn
{prodname} ingests change events from Oracle using the https://docs.oracle.com/database/121/XSTRM/xstrm_intro.htm#XSTRM72647[XStream API].
In order to use this API and hence this connector, you need to have a license for the GoldenGate product
(though it's not required that GoldenGate itself is installed).
(though it is not required that GoldenGate itself is installed).
We are currently exploring alternatives to using XStream for a future {prodname} release, e.g. based on LogMiner and/or alternative solutions.
Please track the {jira-url}/browse/DBZ-137[DBZ-137] JIRA issue and join the discussion if you are aware of potential other ways for ingesting change events from Oracle.
@ -229,9 +229,9 @@ ensuring that it does not miss any updates that occurred while the snapshot was
If the connector stops again for any reason, upon restart it will simply continue streaming changes from where it previously left off.
A second snapshotting mode is *schema_only*.
In this case step 6 from the snapshotting routine described above won't be applied.
I.e. the connector will still capture the structure of the relevant tables, but it won't create any `READ` events representing the complete dataset at the point of connector start-up.
This can be useful if you're only interested in any data changes from now onwards but not the complete current state of all records.
In this case, step 6 from the snapshotting routine described above is not applied.
In other words, the connector still captures the structure of the relevant tables, but it does not create any `READ` events representing the complete dataset at the point of connector start-up.
This can be useful if you are interested in data changes only from now onwards but not the complete current state of all records.
[[oracle-reading-the-log]]
=== Reading the Redo Log
@ -457,9 +457,9 @@ every change event for the `customers` table while it has this definition will f
----
The `schema` portion of the key contains a Kafka Connect schema describing what is in the key portion, and in our case that means that the `payload` value is not optional, is a structure defined by a schema named `server1.DEBEZIUM.CUSTOMERS.Key`, and has one required field named `id` of type `int32`.
If we look at the value of the key's `payload` field, we'll see that it is indeed a structure (which in JSON is just an object) with a single `id` field, whose value is `1004`.
If you look at the value of the key's `payload` field, you can see that it is indeed a structure (which in JSON is just an object) with a single `id` field, whose value is `1004`.
Therefore, we interpret this key as describing the row in the `inventory.customers` table (output from the connector named `server1`) whose `id` primary key column had a value of `1004`.
Therefore, you can interpret this key as describing the row in the `inventory.customers` table (output from the connector named `server1`) whose `id` primary key column had a value of `1004`.
////
[NOTE]
@ -488,7 +488,7 @@ Whether or not this field and its elements are available is highly dependent on
====
* `after` is an optional field that if present contains the state of the row _after_ the event occurred. The structure is described by the same `server1.INVENTORY.CUSTOMERS.Value` Kafka Connect schema used in `before`.
* `source` is a mandatory field that contains a structure describing the source metadata for the event, which in the case of Oracle contains these fields: the {prodname} version, the connector name, whether the event is part of an ongoing snapshot or not, the transaction id (not while snapshotting), the SCN of the change, and a timestamp representing the point in time when the record was changed in the source database (during snapshotting, it'll be the point in time of snapshotting)
* `source` is a mandatory field that contains a structure describing the source metadata for the event, which in the case of Oracle contains these fields: the {prodname} version, the connector name, whether the event is part of an ongoing snapshot or not, the transaction id (not while snapshotting), the SCN of the change, and a timestamp representing the point in time when the record was changed in the source database (during snapshotting, this is the point in time of snapshotting)
* `ts_ms` is optional and if present contains the time (using the system clock in the JVM running the Kafka Connect task) at which the connector processed the event.
And of course, the _schema_ portion of the event message's value contains a schema that describes this envelope structure and the nested fields within it.

View File

@ -20,34 +20,34 @@ The first time it connects to a PostgreSQL server/cluster, it reads a consistent
[[postgresql-overview]]
== Overview
PostgreSQL's https://www.postgresql.org/docs/current/static/logicaldecoding-explanation.html[_logical decoding_] feature was first introduced in version 9.4 and is a mechanism which allows the extraction of the changes which were committed to the transaction log and the processing of these changes in a user-friendly manner via the help of an https://www.postgresql.org/docs/current/static/logicaldecoding-output-plugin.html[_output plugin_]. This output plugin must be installed prior to running the PostgreSQL server and enabled together with a replication slot in order for clients to be able to consume the changes.
PostgreSQL's https://www.postgresql.org/docs/current/static/logicaldecoding-explanation.html[_logical decoding_] feature was first introduced in version 9.4 and is a mechanism which allows the extraction of the changes which were committed to the transaction log and the processing of these changes in a user-friendly manner via the help of an https://www.postgresql.org/docs/current/static/logicaldecoding-output-plugin.html[_output plug-in_]. This output plug-in must be installed prior to running the PostgreSQL server and enabled together with a replication slot in order for clients to be able to consume the changes.
PostgreSQL connector contains two different parts which work together in order to be able to read and process server changes:
ifdef::product[]
* A logical decoding output plugin, which has to be installed and configured in the PostgreSQL server.
* Java code (the actual Kafka Connect connector) which reads the changes produced by the plugin, using PostgreSQL's https://www.postgresql.org/docs/current/static/logicaldecoding-walsender.html[_streaming replication protocol_], via the PostgreSQL https://github.com/pgjdbc/pgjdbc[_JDBC driver_]
* A logical decoding output plug-in, which has to be installed and configured in the PostgreSQL server.
* Java code (the actual Kafka Connect connector) which reads the changes produced by the plug-in, using PostgreSQL's https://www.postgresql.org/docs/current/static/logicaldecoding-walsender.html[_streaming replication protocol_], via the PostgreSQL https://github.com/pgjdbc/pgjdbc[_JDBC driver_]
endif::product[]
ifdef::community[]
* a logical decoding output plugin which has to be installed and configured in the PostgreSQL server, one of
* a logical decoding output plug-in which has to be installed and configured in the PostgreSQL server, one of
** https://github.com/debezium/postgres-decoderbufs[decoderbufs] (maintained by the {prodname} community, based on ProtoBuf)
** https://github.com/eulerto/wal2json[wal2json] (maintained by the wal2json community, based on JSON)
** pgoutput, the standard logical decoding plug-in in PostgreSQL 10+ (maintained by the Postgres community, used by Postgres itself for https://www.postgresql.org/docs/current/logical-replication-architecture.html[logical replication]);
this plug-in is always present, meaning that no additional libraries must be installed,
and the {prodname} connector will interpret the raw replication event stream into change events directly.
* Java code (the actual Kafka Connect connector) which reads the changes produced by the chosen plugin, using PostgreSQL's https://www.postgresql.org/docs/current/static/logicaldecoding-walsender.html[_streaming replication protocol_], via the PostgreSQL https://github.com/pgjdbc/pgjdbc[_JDBC driver_]
* Java code (the actual Kafka Connect connector) which reads the changes produced by the chosen plug-in, using PostgreSQL's https://www.postgresql.org/docs/current/static/logicaldecoding-walsender.html[_streaming replication protocol_], via the PostgreSQL https://github.com/pgjdbc/pgjdbc[_JDBC driver_]
endif::community[]
The connector then produces a _change event_ for every row-level insert, update, and delete operation that was received, recording all the change events for each table in a separate Kafka topic. Your client applications read the Kafka topics that correspond to the database tables they're interested in following, and react to every row-level event it sees in those topics.
PostgreSQL normally purges WAL segments after some period of time. This means that the connector won't have the complete history of all changes that have been made to the database. Therefore, when the PostgreSQL connector first connects to a particular PostgreSQL database, it starts by performing a _consistent snapshot_ of each of the database schemas. After the connector completes the snapshot, it continues streaming changes from the exact point at which the snapshot was made. This way, we start with a consistent view of all of the data, yet continue reading without having lost any of the changes made while the snapshot was taking place.
PostgreSQL normally purges WAL segments after some period of time. This means that the connector does not have the complete history of all changes that have been made to the database. Therefore, when the PostgreSQL connector first connects to a particular PostgreSQL database, it starts by performing a _consistent snapshot_ of each of the database schemas. After the connector completes the snapshot, it continues streaming changes from the exact point at which the snapshot was made. This way, we start with a consistent view of all of the data, yet continue reading without having lost any of the changes made while the snapshot was taking place.
The connector is also tolerant of failures. As the connector reads changes and produces events, it records the position in the write-ahead log with each event. If the connector stops for any reason (including communication failures, network problems, or crashes), upon restart it simply continues reading the WAL where it last left off. This includes snapshots: if the snapshot was not completed when the connector is stopped, upon restart it will begin a new snapshot.
ifdef::product[]
[[postgresql-output-plugin]]
=== Logical decoding output plugin
=== Logical decoding output plug-in
The `pgoutput` logical decoder is the only supported logical decoder in the Tecnhology Preview release of {prodname}.
`pgoutput`, the standard logical decoding plug-in in PostgreSQL 10+, is maintained by the Postgres community, and is also used by Postgres for https://www.postgresql.org/docs/current/logical-replication-architecture.html[logical replication].
@ -111,12 +111,12 @@ ifdef::community[]
Before using the PostgreSQL connector to monitor the changes committed on a PostgreSQL server,
first decide which logical decoder method you intend to use.
If you plan *not* to use the native pgoutput logical replication stream support,
then you will need to install the logical decoding plugin into the PostgreSQL server.
then you will need to install the logical decoding plug-in into the PostgreSQL server.
Afterward enable a replication slot, and configure a user with sufficient privileges to perform the replication.
Note that if your database is hosted by a service such as https://www.heroku.com/postgres[Heroku Postgres] you may be unable to install the plugin.
Note that if your database is hosted by a service such as https://www.heroku.com/postgres[Heroku Postgres] you may be unable to install the plug-in.
If so, and if you're using PostgreSQL 10+, you can use the pgoutput decoder support to monitor your database.
If that isn't an option, you'll be unable to monitor your database with {prodname}.
If that is not an option, you'll be unable to monitor your database with {prodname}.
[[postgresql-on-amazon-rds]]
=== PostgreSQL on Amazon RDS
@ -125,8 +125,8 @@ It is possible to monitor PostgreSQL database running in https://aws.amazon.com/
* The instance parameter `rds.logical_replication` is set to `1`.
* Verify that `wal_level` parameter is set to `logical` by running the query `SHOW wal_level` as DB master user; this might not be the case in multi-zone replication setups.
You cannot set this option manually, it's (https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_WorkingWithParamGroups.html[automatically changed]) when the `rds.logical_replication` is set to `1`.
If the `wal_level` is not `logical` after the change above, it's probably because the instance has to be restarted due to the parameter group change, it'll happen accordingly to your maintenance window or can be done manually.
You cannot set this option manually, it is (https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_WorkingWithParamGroups.html[automatically changed]) when the `rds.logical_replication` is set to `1`.
If the `wal_level` is not `logical` after the change above, it is probably because the instance has to be restarted due to the parameter group change. This happens accordingly to your maintenance window or can be done manually.
* Set `plugin.name` {prodname} parameter to `wal2json`. You can skip this on PostgreSQL 10+ if you wish to use pgoutput logical replication stream support.
* Use database master account for replication as RDS currently does not support setting of `REPLICATION` privilege for another account.
@ -160,31 +160,31 @@ As of {prodname} 0.10, the connector supports PostgreSQL 10+ logical replication
This means that a logical decoding output plug-in is no longer necessary and changes can be emitted directly from the replication stream by the connector.
====
As of PostgreSQL 9.4, the only way to read changes to the write-ahead-log is to first install a logical decoding output plugin. Plugins are written in C, compiled, and installed on the machine which runs the PostgreSQL server. Plugins use a number of PostgreSQL specific APIs, as described by the https://www.postgresql.org/docs/current/static/logicaldecoding-output-plugin.html[_PostgreSQL documentation_].
As of PostgreSQL 9.4, the only way to read changes to the write-ahead-log is to first install a logical decoding output plug-in. Plugins are written in C, compiled, and installed on the machine which runs the PostgreSQL server. Plugins use a number of PostgreSQL specific APIs, as described by the https://www.postgresql.org/docs/current/static/logicaldecoding-output-plugin.html[_PostgreSQL documentation_].
PostgreSQL connector works with one of {prodname}'s supported logical decoding plugin to encode the changes in either https://github.com/google/protobuf[_Protobuf format_] or http://www.json.org/[_JSON_] format.
See the documentation of your chosen plugin (https://github.com/debezium/postgres-decoderbufs/blob/master/README.md[_protobuf_], https://github.com/eulerto/wal2json/blob/master/README.md[_wal2json_]) to learn more about the plugin's requirements, limitations, and how to compile it.
The PostgreSQL connector works with one of {prodname}'s supported logical decoding plug-ins to encode the changes in either https://github.com/google/protobuf[_Protobuf format_] or http://www.json.org/[_JSON_] format.
See the documentation of your chosen plug-in (https://github.com/debezium/postgres-decoderbufs/blob/master/README.md[_protobuf_], https://github.com/eulerto/wal2json/blob/master/README.md[_wal2json_]) to learn more about the plug-in's requirements, limitations, and how to compile it.
For simplicity, {prodname} also provides a Docker image based on a vanilla PostgreSQL server image on top of which it compiles and installs the plugins. We recommend https://github.com/debezium/docker-images/tree/master/postgres/9.6[_using this image_] as an example of the detailed steps required for the installation.
For simplicity, {prodname} also provides a Docker image based on a vanilla PostgreSQL server image on top of which it compiles and installs the plug-ins. We recommend https://github.com/debezium/docker-images/tree/master/postgres/9.6[_using this image_] as an example of the detailed steps required for the installation.
[WARNING]
====
The {prodname} logical decoding plugins have only been installed and tested on _Linux_ machines.
The {prodname} logical decoding plug-ins have only been installed and tested on _Linux_ machines.
For Windows and other operating systems it may require different installation steps.
====
[[postgresql-differences-between-plugins]]
==== Differences Between Plug-ins
The plugins' behaviour is not completely same for all cases.
The plug-ins' behavior is not completely same for all cases.
So far these differences have been identified:
* wal2json plug-in is not able to process quoted identifiers (https://github.com/eulerto/wal2json/issues/35[issue])
* wal2json and decoderbufs plug-ins emit events for tables without primary keys
* wal2json plug-in does not support special values (`NaN` or `infinity`) for floating point types
* wal2json should be used with setting the `schema.refresh.mode` connector option to `columns_diff_exclude_unchanged_toast`;
otherwise, when receiving a change event for a row containing an unchanged TOAST column, no field for that column will be contained in the emitted change event's `after` structure.
This is because wal2json's messages won't contain a field for such column.
otherwise, when receiving a change event for a row containing an unchanged TOAST column, no field for that column is contained in the emitted change event's `after` structure.
This is because wal2json's messages do not contain a field for such a column.
The requirement for adding this is tracked under the wal2json https://github.com/eulerto/wal2json/issues/98[issue 98].
See the documentation of `columns_diff_exclude_unchanged_toast` further below for implications of using it.
@ -198,7 +198,7 @@ All up-to-date differences are tracked in a test suite https://github.com/debezi
=== Configuring the PostgreSQL Server
If you are using one of the supported {link-prefix}:{link-postgresql-connector}#postgresql-output-plugin[logical decoding plug-ins] (i.e. not pgoutput) and it has been installed,
configure the server to load the plugin at startup:
configure the server to load the plug-in at startup:
.postgresql.conf
[source,properties]
@ -206,7 +206,7 @@ configure the server to load the plugin at startup:
# MODULES
shared_preload_libraries = 'decoderbufs,wal2json' // <1>
----
<1> tells the server that it should load at startup the `decoderbufs` and `wal2json` logical decoding plugins (the names of the plugins are set in https://github.com/debezium/postgres-decoderbufs/blob/v0.3.0/Makefile[_Protobuf_] and https://github.com/eulerto/wal2json/blob/master/Makefile[_wal2json_] Makefiles)
<1> tells the server that it should load at startup the `decoderbufs` and `wal2json` logical decoding plug-ins (the names of the plug-ins are set in https://github.com/debezium/postgres-decoderbufs/blob/v0.3.0/Makefile[_Protobuf_] and https://github.com/eulerto/wal2json/blob/master/Makefile[_wal2json_] Makefiles)
Next is to configure the replication slot regardless of the decoder being used:
@ -227,9 +227,9 @@ Replication slots are guaranteed to retain all WAL required for {prodname} even
It is important for this reason to closely monitor replication slots to avoid too much disk consumption and other conditions that can happen such as catalog bloat if a replication slot stays unused for too long.
For more information please see the official Postgres docs on https://www.postgresql.org/docs/current/warm-standby.html#STREAMING-REPLICATION-SLOTS[this subject].
In case you working with a `synchronous_commit` setting other than `on`,
it's recommended to set `wal_writer_delay` to a value such as 10 ms in order to achieve a low latency of change events.
Otherwise, its default value will be applied which adds a latency of about 200 ms.
If you are working with a `synchronous_commit` setting other than `on`,
the recommendation is to set `wal_writer_delay` to a value such as 10 ms to achieve a low latency of change events.
Otherwise, its default value is applied, which adds a latency of about 200 ms.
[TIP]
====
@ -280,7 +280,7 @@ ifdef::community[]
The PostgreSQL connector can be used with a standalone PostgreSQL server or with a cluster of PostgreSQL servers.
As mentioned {link-prefix}:{link-postgresql-connector}#postgresql-limitations[in the beginning], PostgreSQL (for all versions <= 12) only supports logical replication slots on `primary` servers. This means that a replica in a PostgreSQL cluster cannot be configured for logical replication, and consequently that the {prodname} PostgreSQL Connector can only connect and communicate with the primary server. Should this server fail, the connector will stop. When the cluster is repaired, if the original primary server is once again promoted to `primary`, the connector can simply be restarted. However, if a different PostgreSQL server _with the plugin and proper configuration_ is promoted to `primary`, the connector configuration must be changed to point to the new `primary` server and then can be restarted.
As mentioned {link-prefix}:{link-postgresql-connector}#postgresql-limitations[in the beginning], PostgreSQL (for all versions <= 12) only supports logical replication slots on `primary` servers. This means that a replica in a PostgreSQL cluster cannot be configured for logical replication, and consequently that the {prodname} PostgreSQL Connector can only connect and communicate with the primary server. Should this server fail, the connector will stop. When the cluster is repaired, if the original primary server is once again promoted to `primary`, the connector can simply be restarted. However, if a different PostgreSQL server _with the plug-in and proper configuration_ is promoted to `primary`, the connector configuration must be changed to point to the new `primary` server and then can be restarted.
endif::community[]
[[postgresql-wal-disk-space]]
@ -292,7 +292,7 @@ There are three potential reasons that explain the situation:
This is visible as `confirmed_flush_lsn` in the `pg_replication_slots` slots table.
The database is responsible for reclaiming the disk space and the WAL size can be calculated from `restart_lsn` of the same table.
So if the `confirmed_flush_lsn` is regularly increasing and `restart_lsn` lags then the database does need to reclaim the space.
Disk space is usually reclaimed in batch blocks so this is expected behaviour and no action on a user's side is necessary.
Disk space is usually reclaimed in batch blocks so this is expected behavior and no action on a user's side is necessary.
* There are many updates in a monitored database but only a minuscule amount relates to the monitored table(s) and/or schema(s).
This situation can be easily solved by enabling periodic heartbeat events using `heartbeat.interval.ms` configuration option.
* The PostgreSQL instance contains multiple databases where one of them is a high-traffic database.
@ -305,7 +305,7 @@ To overcome the third cause it is necessary to
* enable periodic heartbeat record generation using the `heartbeat.interval.ms` configuration option
* regularly emit change events from the database tracked by {prodname}
ifdef::community[]
** In the case of `wal2json` decoder plugin, it is sufficient to generate empty events.
** In the case of `wal2json` decoder plug-in, it is sufficient to generate empty events.
This can be achieved for example by truncating an empty temporary table.
** For other decoder plug-ins, it is recommended to create a supplementary table that is not monitored by {prodname}.
endif::community[]
@ -338,7 +338,7 @@ Most PostgreSQL servers are configured to not retain the complete history of the
If the connector fails, is rebalanced, or stops after Step 1 begins but before Step 6 completes, upon restart the connector will begin a new snapshot. Once the connector does complete its initial snapshot, the PostgreSQL connector then continues streaming from the position read during step 3, ensuring that it does not miss any updates. If the connector stops again for any reason, upon restart it will simply continue streaming changes from where it previously left off.
A second snapshot mode allows the connector to perform snapshots *always*. This behavior tells the connector to _always_ perform a snapshot when it starts up, and after the snapshot completes to continue streaming changes from step 3 in the above sequence. This mode can be used in cases when it's known that some WAL segments have been deleted and are no longer available, or in case of a cluster failure after a new primary has been promoted so that the connector doesn't miss out on any potential changes that could've taken place after the new primary had been promoted but before the connector was restarted on the new primary.
A second snapshot mode allows the connector to perform snapshots *always*. This behavior tells the connector to _always_ perform a snapshot when it starts up, and after the snapshot completes to continue streaming changes from step 3 in the above sequence. This mode can be used in cases when it is known that some WAL segments have been deleted and are no longer available, or in case of a cluster failure after a new primary has been promoted so that the connector does not miss any potential changes that could have taken place after the new primary had been promoted but before the connector was restarted on the new primary.
The third snapshot mode instructs the connector to *never* performs snapshots. When a new connector is configured this way, if will either continue streaming changes from a previous stored offset or it will start from the point in time when the PostgreSQL logical replication slot was first created on the server. Note that this mode is useful only when you know all data of interest is still reflected in the WAL.
@ -427,7 +427,7 @@ public interface Snapshotter {
statements.append("SET lock_timeout = ").append(lockTimeout.toMillis()).append(";").append(lineSeparator);
// we're locking in ACCESS SHARE MODE to avoid concurrent schema changes while we're taking the snapshot
// this does not prevent writes to the table, but prevents changes to the table's schema....
// DBZ-298 Quoting name in case it has been quoted originally; it doesn't do harm if it hasn't been quoted
// DBZ-298 Quoting name in case it has been quoted originally; it does not do harm if it has not been quoted
tableIds.forEach(tableId -> statements.append("LOCK TABLE ")
.append(tableId.toDoubleQuotedString())
.append(" IN ACCESS SHARE MODE;")
@ -445,7 +445,7 @@ endif::community[]
The PostgreSQL connector will typically spend the vast majority of its time streaming changes from the PostgreSQL server to which it is connected. This mechanism relies on https://www.postgresql.org/docs/current/static/protocol-replication.html[_PostgreSQL's replication protocol_] where the client can receive changes from the server as they are committed in the server's transaction log at certain positions (also known as `Log Sequence Numbers` or in short LSNs).
Whenever the server commits a transaction, a separate server process invokes a callback function from the {link-prefix}:{link-postgresql-connector}#postgresql-output-plugin[logical decoding plugin]. This function processes the changes from the transaction, converts them to a specific format (Protobuf or JSON in the case of {prodname} plugin) and writes them on an output stream which can then be consumed by clients.
Whenever the server commits a transaction, a separate server process invokes a callback function from the {link-prefix}:{link-postgresql-connector}#postgresql-output-plugin[logical decoding plug-in]. This function processes the changes from the transaction, converts them to a specific format (Protobuf or JSON in the case of {prodname} plug-in) and writes them on an output stream which can then be consumed by clients.
The PostgreSQL connector acts as a PostgreSQL client, and when it receives these changes it transforms the events into {prodname} _create_, _update_, or _delete_ events that include the LSN position of the event. The PostgreSQL connector forwards these change events to the Kafka Connect framework (running in the same process), which then asynchronously writes them in the same order to the appropriate Kafka topic. Kafka Connect uses the term _offset_ for the source-specific position information that {prodname} includes with each event, and Kafka Connect periodically records the most recent offset in another Kafka topic.
@ -474,7 +474,7 @@ To prevent the issue completely it is recommended to synchronize updates to the
As of PostgreSQL 10+, a new logical replication stream mode was introduced, called _pgoutput_. This logical replication stream mode is natively supported by PostgreSQL,
which means that this connector can consume that replication stream
without the need for additional plug-ins being installed.
This is particularly valuable for environments where installation of plug-ins isn't supported or allowed.
This is particularly valuable for environments where installation of plug-ins is not supported or allowed.
See {link-prefix}:{link-postgresql-connector}#setting-up-postgresql[Setting up PostgreSQL] for more details.
@ -586,7 +586,7 @@ If the `database.server.name` configuration property has the value `PostgreSQL_s
}
----
The `schema` portion of the key contains a Kafka Connect schema describing what is in the key portion, and in our case that means that the `payload` value is not optional, is a structure defined by a schema named `PostgreSQL_server.public.customers.Key`, and has one required field named `id` of type `int32`. If we look at the value of the key's `payload` field, we'll see that it is indeed a structure (which in JSON is just an object) with a single `id` field, whose value is `1`.
The `schema` portion of the key contains a Kafka Connect schema describing what is in the key portion. In this case, it means that the `payload` value is not optional, is a structure defined by a schema named `PostgreSQL_server.public.customers.Key`, and has one required field named `id` of type `int32`. If you look at the value of the key's `payload` field, you see that it is indeed a structure (which in JSON is just an object) with a single `id` field, whose value is `1`.
Therefore, we interpret this key as describing the row in the `public.customers` table (output from the connector named `PostgreSQL_server`) whose `id` primary key column had a value of `1`.
@ -1534,24 +1534,24 @@ As {prodname} cannot safely provide the column value in this case, it returns a
[IMPORTANT]
====
There is a specific problem related to Amazon RDS instances.
`wal2json` plugin has evolved over the time and there were releases that provided out-of-band toasted values.
Amazon supports different versions of the plugin for different PostgreSQL versions.
`wal2json` plug-in has evolved over the time and there were releases that provided out-of-band toasted values.
Amazon supports different versions of the plug-in for different PostgreSQL versions.
Please consult Amazon's https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/CHAP_PostgreSQL.html[documentation] to obtain version to version mapping.
For consistent toasted values handling we recommend to
* use `pgoutput` plugin for PostgreSQL 10+ instances
* set `include-unchanged-toast=0` for older versions of `wal2json` plugin using `slot.stream.params` configuration option
* use `pgoutput` plug-in for PostgreSQL 10+ instances
* set `include-unchanged-toast=0` for older versions of the `wal2json` plug-in by using the `slot.stream.params` configuration option
====
[[postgresql-deploying-a-connector]]
== Deploying the PostgreSQL Connector
ifdef::community[]
If you've already installed https://zookeeper.apache.org[Zookeeper], http://kafka.apache.org/[Kafka], and {link-kafka-docs}.html#connect[Kafka Connect], then using {prodname}'s PostgreSQL connector is easy. Simply download the https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/{debezium-version}/debezium-connector-postgres-{debezium-version}-plugin.tar.gz[connector's plugin archive], extract the JARs into your Kafka Connect environment, and add the directory with the JARs to {link-kafka-docs}/#connectconfigs[Kafka Connect's plugin.path]. Restart your Kafka Connect process to pick up the new JARs.
If you've already installed https://zookeeper.apache.org[Zookeeper], http://kafka.apache.org/[Kafka], and {link-kafka-docs}.html#connect[Kafka Connect], then using {prodname}'s PostgreSQL connector is easy. Simply download the https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/{debezium-version}/debezium-connector-postgres-{debezium-version}-plugin.tar.gz[connector's plug-in archive], extract the JARs into your Kafka Connect environment, and add the directory with the JARs to {link-kafka-docs}/#connectconfigs[Kafka Connect's `plugin.path`]. Restart your Kafka Connect process to pick up the new JARs.
endif::community[]
ifdef::product[]
Installing the PostgreSQL connector is a simple process whereby you only need to download the JAR, extract it to your Kafka Connect environment, and ensure the plugin's parent directory is specified in your Kafka Connect environment.
Installing the PostgreSQL connector is a simple process whereby you only need to download the JAR, extract it to your Kafka Connect environment, and ensure the plug-in's parent directory is specified in your Kafka Connect environment.
.Prerequisites
@ -1562,7 +1562,7 @@ Installing the PostgreSQL connector is a simple process whereby you only need to
. Download the {prodname} link:https://access.redhat.com/jbossnetwork/restricted/listSoftware.html?product=red.hat.integration&downloadType=distributions[PostgreSQL connector].
. Extract the files into your Kafka Connect environment.
. Add the plugin's parent directory to your Kafka Connect plugin path:
. Add the plug-in's parent directory to your Kafka Connect `plugin.path`:
+
[source]
----
@ -1591,7 +1591,7 @@ endif::community[]
To use the connector to produce change events for a particular PostgreSQL server or cluster:
. Install the {link-prefix}:{link-postgresql-connector}#postgresql-output-plugin[logical decoding plugin]
. Install the {link-prefix}:{link-postgresql-connector}#postgresql-output-plugin[logical decoding plug-in]
. Configure the {link-prefix}:{link-postgresql-connector}#postgresql-server-configuration[PostgreSQL server] to support logical replication
. Create a configuration file for the PostgreSQL connector.
@ -1726,7 +1726,7 @@ The following configuration properties are _required_ unless a default value is
|[[postgresql-property-plugin-name]]<<postgresql-property-plugin-name, `plugin.name`>>
|`decoderbufs`
|The name of the Postgres {link-prefix}:{link-postgresql-connector}#postgresql-output-plugin[logical decoding plugin] installed on the server.
|The name of the Postgres {link-prefix}:{link-postgresql-connector}#postgresql-output-plugin[logical decoding plug-in] installed on the server.
ifdef::product[]
The only supported value is `pgoutput`.
endif::product[]
@ -1739,7 +1739,7 @@ In such cases it is possible to switch to so-called *streaming* mode when every
|[[postgresql-property-slot-name]]<<postgresql-property-slot-name, `slot.name`>>
|`debezium`
|The name of the Postgres logical decoding slot created for streaming changes from a plugin and database instance. Values must conform to link:https://www.postgresql.org/docs/current/static/warm-standby.html#STREAMING-REPLICATION-SLOTS-MANIPULATION[Postgres replication slot naming rules] which state: _"Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character."_
|The name of the Postgres logical decoding slot created for streaming changes from a plug-in and database instance. Values must conform to link:https://www.postgresql.org/docs/current/static/warm-standby.html#STREAMING-REPLICATION-SLOTS-MANIPULATION[Postgres replication slot naming rules] which state: _"Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character."_
|[[postgresql-property-slot-drop-on-stop]]<<postgresql-property-slot-drop-on-stop, `slot.drop.on.stop`>>
|`false`
@ -2081,7 +2081,7 @@ The PostgreSQL connector stores externally the last processed offset (in the for
As of `12`, PostgreSQL allows logical replication slots _only on primary servers_, which means that a PostgreSQL connector can only be pointed to the active primary of a database cluster.
Also replication slots themselves are not propagated to replicas.
If the primary node goes down, only after a new primary has been promoted (with the {link-prefix}:{link-postgresql-connector}#postgresql-output-plugin[logical decoding plugin] installed) and a replication slot has been created there, the connector can be restarted and pointed to the new server.
If the primary node goes down, only after a new primary has been promoted (with the {link-prefix}:{link-postgresql-connector}#postgresql-output-plugin[logical decoding plug-in] installed) and a replication slot has been created there, the connector can be restarted and pointed to the new server.
There are some really important caveats to failovers, and you should pause {prodname} until you can verify that you have a replication slot intact which has not lost data. After a failover, you will miss change events unless your administration of failovers includes a process to recreate the {prodname} replication slot before the application is allowed to write to the *new* primary. You also may need to verify in a failover situation that {prodname} was able to read all changes in the slot **before the old primary failed**.

View File

@ -43,7 +43,7 @@ The connector then produces a _change event_ for every row-level insert, update,
The client applications read the Kafka topics that correspond to the database tables they're interested in following, and react to every row-level event it sees in those topics.
The database operator normally enables _CDC_ in the mid-life of a database an/or table.
This means that the connector won't have the complete history of all changes that have been made to the database.
This means that the connector does not have the complete history of all changes that have been made to the database.
Therefore, when the SQL Server connector first connects to a particular SQL Server database, it starts by performing a _consistent snapshot_ of each of the database schemas.
After the connector completes the snapshot, it continues streaming changes from the exact point at which the snapshot was made.
This way, we start with a consistent view of all of the data, yet continue reading without having lost any of the changes made while the snapshot was taking place.
@ -160,7 +160,7 @@ This ensures that the changes are replayed by {prodname} in the same order as we
After a restart, the connector will resume from the offset (commit and change LSNs) where it left off before.
The connector is able to detect whether the CDC is enabled or disabled for whitelisted source table during the runtime and modify its behaviour.
The connector is able to detect whether the CDC is enabled or disabled for whitelisted source table during the runtime and modify its behavior.
[[sqlserver-topic-names]]
=== Topic names
@ -382,10 +382,10 @@ every change event for the `customers` table while it has this definition will f
}
----
The `schema` portion of the key contains a Kafka Connect schema describing what is in the key portion, and in our case that means that the `payload` value is not optional, is a structure defined by a schema named `server1.dbo.customers.Key`, and has one required field named `id` of type `int32`.
If we look at the value of the key's `payload` field, we'll see that it is indeed a structure (which in JSON is just an object) with a single `id` field, whose value is `1004`.
The `schema` portion of the key contains a Kafka Connect schema describing what is in the key portion. In this case, it means that the `payload` value is not optional, is a structure defined by a schema named `server1.dbo.customers.Key`, and has one required field named `id` of type `int32`.
If you look at the value of the key's `payload` field, you can see that it is indeed a structure (which in JSON is just an object) with a single `id` field, whose value is `1004`.
Therefore, we interpret this key as describing the row in the `dbo.customers` table (output from the connector named `server1`) whose `id` primary key column had a value of `1004`.
Therefore, you can interpret this key as describing the row in the `dbo.customers` table (output from the connector named `server1`) whose `id` primary key column had a value of `1004`.
ifdef::community[]
[NOTE]
@ -409,7 +409,7 @@ The payload section of every change event value produced by the SQL Server conne
* `before` is an optional field that if present contains the state of the row _before_ the event occurred. The structure is described by the `server1.dbo.customers.Value` Kafka Connect schema, which the `server1` connector uses for all rows in the `dbo.customers` table.
* `after` is an optional field that if present contains the state of the row _after_ the event occurred. The structure is described by the same `server1.dbo.customers.Value` Kafka Connect schema used in `before`.
* `source` is a mandatory field that contains a structure describing the source metadata for the event, which in the case of SQL Server contains these fields: the {prodname} version, the connector name, whether the event is part of an ongoing snapshot or not, the commit LSN (not while snapshotting), the LSN of the change, database, schema and table where the change happened, and a timestamp representing the point in time when the record was changed in the source database (during snapshotting, it'll be the point in time of snapshotting).
* `source` is a mandatory field that contains a structure describing the source metadata for the event, which in the case of SQL Server contains these fields: the {prodname} version, the connector name, whether the event is part of an ongoing snapshot or not, the commit LSN (not while snapshotting), the LSN of the change, database, schema and table where the change happened, and a timestamp representing the point in time when the record was changed in the source database (during snapshotting, this is the point in time of snapshotting).
+
Also a field `event_serial_no` is present during streaming.
This is used to differentiate among events that have the same commit and change LSN.
@ -676,8 +676,8 @@ When the columns for a row's primary/unique key are updated, the value of the ro
[[sqlserver-delete-events]]
===== Delete events
So far we've seen samples of _create_ and _update_ events.
Now, let's look at the value of a _delete_ event for the same table. Once again, the `schema` portion of the value is exactly the same as with the _create_ and _update_ events:
So far, you have seen samples of _create_ and _update_ events.
The following sample shows the value of a _delete_ event for the same table. Once again, the `schema` portion of the value is exactly the same as with the _create_ and _update_ events:
[source,json,indent=0,subs="attributes"]
----
@ -1183,13 +1183,13 @@ The `connect.decimal.precision` schema parameter contains an integer representin
== Deploying the SQL Server connector
ifdef::community[]
If you've already installed https://zookeeper.apache.org[Zookeeper], http://kafka.apache.org/[Kafka], and {link-kafka-docs}.html#connect[Kafka Connect], then using {prodname}'s SQL Server` connector is easy.
Simply download the https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/{debezium-version}/debezium-connector-sqlserver-{debezium-version}-plugin.tar.gz[connector's plugin archive], extract the JARs into your Kafka Connect environment, and add the directory with the JARs to {link-kafka-docs}/#connectconfigs[Kafka Connect's plugin.path].
If you have already installed https://zookeeper.apache.org[Zookeeper], http://kafka.apache.org/[Kafka], and {link-kafka-docs}.html#connect[Kafka Connect], then using {prodname}'s SQL Server` connector is easy.
Simply download the https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/{debezium-version}/debezium-connector-sqlserver-{debezium-version}-plugin.tar.gz[connector's plug-in archive], extract the JARs into your Kafka Connect environment, and add the directory with the JARs to {link-kafka-docs}/#connectconfigs[Kafka Connect's `plugin.path`].
Restart your Kafka Connect process to pick up the new JARs.
endif::community[]
ifdef::product[]
Installing the SQL Server connector is a simple process whereby you only need to download the JAR, extract it to your Kafka Connect environment, and ensure the plugin's parent directory is specified in your Kafka Connect environment.
Installing the SQL Server connector is a simple process whereby you only need to download the JAR, extract it to your Kafka Connect environment, and ensure the plug-in's parent directory is specified in your Kafka Connect environment.
.Prerequisites
@ -1200,7 +1200,7 @@ Installing the SQL Server connector is a simple process whereby you only need to
. Download the {prodname} link:https://access.redhat.com/jbossnetwork/restricted/listSoftware.html?product=red.hat.integration&downloadType=distributions[SQL Server connector].
. Extract the files into your Kafka Connect environment.
. Add the plugin's parent directory to your Kafka Connect plugin path:
. Add the plug-in's parent directory to your Kafka Connect `plugin.path`:
+
[source]
----

View File

@ -16,7 +16,7 @@ If you're using these installations, there is no need to read the rest of the do
[[docker-toolbox]]
== Docker Toolbox on Windows and OS X
The older approach for running Docker on Windows and OS X is more complicated, and it requires using https://www.docker.com/toolbox[Docker Toolbox] and its https://docs.docker.com/machine/get-started/[Docker Machine] to run the Docker host in a _virtual machine_. Make sure you follow the instructions to install or upgrade Docker Toolbox. Docker Machine supports running multiple virtual machines, but from this point on we'll assume that you're going to run the virtual machine named "default".
The older approach for running Docker on Windows and OS X is more complicated, and it requires using https://www.docker.com/toolbox[Docker Toolbox] and its https://docs.docker.com/machine/get-started/[Docker Machine] to run the Docker host in a _virtual machine_. Make sure you follow the instructions to install or upgrade Docker Toolbox. Docker Machine supports running multiple virtual machines, but from this point on it is assumed that you are going to run the virtual machine named "default".
The rest of this document explains how to use Docker Machine.
@ -115,7 +115,7 @@ If you're using Docker Machine, you may sometimes get the following error when r
Cannot connect to the Docker daemon. Is the docker daemon running on this host?
This means that the commands running in your terminal can't communicate with the Docker virtual machine (i.e., the Docker daemon), either because it is not running or because the required environment variables in the terminal are not set properly. So first, verify that your Docker machine is indeed running:
This means that the commands running in your terminal cannot communicate with the Docker virtual machine (i.e., the Docker daemon), either because it is not running or because the required environment variables in the terminal are not set properly. So first, verify that your Docker machine is indeed running:
[source,bash,indent=0]
----

View File

@ -14,7 +14,7 @@ There are several ways to install and use {prodname} connectors, so we've docume
== Installing a {prodname} Connector
If you've already installed https://zookeeper.apache.org[Zookeeper], https://kafka.apache.org/[Kafka], and {link-kafka-docs}.html#connect[Kafka Connect], then using one of {prodname}'s connectors is easy.
Simply download one or more connector plugin archives (see below), extract their files into your Kafka Connect environment, and add the parent directory of the extracted plugin(s) to Kafka Connect's plugin path.
Simply download one or more connector plug-in archives (see below), extract their files into your Kafka Connect environment, and add the parent directory of the extracted plug-in(s) to Kafka Connect's plugin path.
If not the case yet, specify the plugin path in your worker configuration (e.g. _connect-distributed.properties_) using the {link-kafka-docs}/#connectconfigs[plugin.path] configuration property.
As an example, let's assume you have downloaded the {prodname} MySQL connector archive and extracted its contents to _/kafka/connect/debezium-connector-mysql_.
Then you'd specify the following in the worker config:
@ -26,7 +26,7 @@ plugin.path=/kafka/connect
Restart your Kafka Connect process to pick up the new JARs.
The connector plugins are available from Maven:
The connector plug-ins are available from Maven:
ifeval::['{page-version}' == 'master']
* {link-mysql-plugin-snapshot}[MySQL Connector plugin archive]

View File

@ -112,7 +112,7 @@ Alternatively, Avro can be used as content type for the `data` attribute:
<2> URI of the schema to which the Avro data adheres
<3> The `data` attribute, base64-encoded Avro binary data
Finally, it's also possible to use Avro for the entire envelope as well as the `data` attribute.
Finally, it is also possible to use Avro for the entire envelope as well as the `data` attribute.
[[cloud-events-converter-configuration-options]]
=== Configuration Options

View File

@ -23,7 +23,7 @@ It starts an OpenShift cluster just for you, allowing you to take your first ste
For setting up Kafka and Kafka Connect on OpenShift, a set of images provided by the https://strimzi.io/[Strimzi] project can be used, which offers "Kafka as a Service".
It consists of enterprise grade configuration files and images that bring Kafka to OpenShift.
First we install the operators and templates for the Kafka broker and Kafka Connect into our OpenShift project:
First, install the operators and templates for the Kafka broker and Kafka Connect into our OpenShift project:
[source,shell,subs="attributes",options="nowrap"]
----
@ -36,7 +36,7 @@ oc login -u system:admin
oc create -f install/cluster-operator && oc create -f examples/templates/cluster-operator
----
Next we will deploy a Kafka broker cluster and a Kafka Connect cluster and then create a Kafka Connect image with the {prodname} connectors installed:
Next, deploy a Kafka broker cluster and a Kafka Connect cluster and then create a Kafka Connect image with the {prodname} connectors installed:
[source,shell,subs="attributes",options="nowrap"]
----
@ -76,9 +76,9 @@ image::/images/openshift_pods.png[width=771,align="center"]
== Verifying the Deployment
Next we are going to verify whether the deployment is correct by emulating the xref:tutorial.adoc[{prodname} Tutorial] in the OpenShift environment.
Verify whether the deployment is correct by emulating the xref:tutorial.adoc[{prodname} Tutorial] in the OpenShift environment.
First we need to start a MySQL server instance containing some example tables:
Begin by starting a MySQL server instance that contains some example tables:
[source%nowrap,bash,subs="attributes"]
----
@ -101,7 +101,7 @@ mysql-1-deploy 1/1 Running 0 4s
...
----
Then we are going to register the {prodname} MySQL connector to run against the deployed MySQL instance:
Register the {prodname} MySQL connector to run against the deployed MySQL instance:
[source%nowrap,bash]
----
@ -129,14 +129,14 @@ oc exec -i -c kafka broker-kafka-0 -- curl -X POST \
EOF
----
Kafka Connect's log file should contain messages regarding execution of initial snapshot:
Kafka Connect's log file should contain messages regarding execution of the initial snapshot:
[source%nowrap,bash]
----
oc logs $(oc get pods -o name -l strimzi.io/name=debezium-connect)
----
Now we can read change events for the `customers` table from the corresponding Kafka topic:
Now you can read change events for the `customers` table from the corresponding Kafka topic:
[source%nowrap,bash]
----
@ -215,17 +215,17 @@ You should see an output like the following (formatted for the sake of readabili
...
----
Finally, let's modify some records in the `customers` table of the database:
Finally, the next example modifies some records in the `customers` table of the database:
[source%nowrap,bash]
----
oc exec -it $(oc get pods -o custom-columns=NAME:.metadata.name --no-headers -l app=mysql) \
-- bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'
# E.g. run UPDATE customers SET email="sally.thomas@example.com" WHERE ID = 1001;
# For example, run UPDATE customers SET email="sally.thomas@example.com" WHERE ID = 1001;
----
You should now see additional change messages in the consumer started before.
If you got any questions or requests related to running {prodname} on OpenShift,
please let us know via our https://groups.google.com/forum/#!forum/debezium[user group] or in the {prodname} https://gitter.im/debezium/dev[developer's chat].
If you have any questions or requests related to running {prodname} on OpenShift,
let us know via our https://groups.google.com/forum/#!forum/debezium[user group] or in the {prodname} https://gitter.im/debezium/dev[developer's chat].

View File

@ -142,7 +142,7 @@ To use the RPM in question just issue the standard Fedora installation command:
----
$ sudo dnf -y install postgres-decoderbufs
----
The rest of the configuration is same as described below for `wal2json` plugin.
The rest of the configuration is same as described below for `wal2json` plug-in.
[[postgresql-server-configuration]]
=== PostgreSQL Server Configuration

View File

@ -4,7 +4,7 @@
[id="install-the-mysql-connector_{context}"]
= Installing the MySQL connector
Installing the {prodname} MySQL connector is a simple process whereby you only need to download the JAR, extract it to your Kafka Connect environment, and ensure the plugin's parent directory is specified in your Kafka Connect environment.
Installing the {prodname} MySQL connector is a simple process whereby you only need to download the JAR, extract it to your Kafka Connect environment, and ensure the plug-in's parent directory is specified in your Kafka Connect environment.
.Prerequisites
@ -18,14 +18,14 @@ ifdef::product[]
endif::product[]
ifdef::community[]
ifeval::['{page-version}' == 'master']
. Download the {prodname} link:{link-mysql-plugin-snapshot}[MySQL connector plugin].
. Download the {prodname} link:{link-mysql-plugin-snapshot}[MySQL connector plug-in].
endif::[]
ifeval::['{page-version}' != 'master']
. Download the {prodname} link:https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/{debezium-version}/debezium-connector-mysql-{debezium-version}-plugin.tar.gz[MySQL connector plugin].
. Download the {prodname} link:https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/{debezium-version}/debezium-connector-mysql-{debezium-version}-plugin.tar.gz[MySQL connector plug-in].
endif::[]
endif::community[]
. Extract the files into your Kafka Connect environment.
. Add the plugin's parent directory to your Kafka Connect plugin path:
. Add the plug-in's parent directory to your Kafka Connect `plugin.path`:
[source]
----
plugin.path=/kafka/connect

View File

@ -131,7 +131,7 @@ or `connect` always represents time and timestamp values using Kafka Connect's b
|[[connector-property-bigint-unsigned-handling-mode]]<<connector-property-bigint-unsigned-handling-mode, `bigint.unsigned.handling.mode`>>
|`long`
| Specifies how BIGINT UNSIGNED columns should be represented in change events, including: `precise` uses `java.math.BigDecimal` to represent values, which are encoded in the change events using a binary representation and Kafka Connect's `org.apache.kafka.connect.data.Decimal` type; `long` (the default) represents values using Java's `long`, which may not offer the precision but will be far easier to use in consumers. `long` is usually the preferable setting. Only when working with values larger than 2^63, the `precise` setting should be used as those values can't be conveyed using `long`.
| Specifies how BIGINT UNSIGNED columns should be represented in change events, including: `precise` uses `java.math.BigDecimal` to represent values, which are encoded in the change events using a binary representation and Kafka Connect's `org.apache.kafka.connect.data.Decimal` type; `long` (the default) represents values using Java's `long`, which may not offer the precision but will be far easier to use in consumers. `long` is usually the preferable setting. Only when working with values larger than 2^63, the `precise` setting should be used as those values cannot be conveyed using `long`.
|[[connector-property-include-schema-changes]]<<connector-property-include-schema-changes, `include.schema.changes`>>
|`true`
@ -275,7 +275,7 @@ _Note:_ This feature should be considered an incubating one. We need a feedback
`extended` In some cases where clients are submitting operations that MySQL excludes from REPEATABLE READ semantics, it may be desirable to block all writes for the entire duration of the snapshot. For these such cases, use this option. +
`none` Will prevent the connector from acquiring any table locks during the snapshot process. This value can be used with all snapshot modes but it is safe to use if and _only_ if no schema changes are happening while the snapshot is taken. Note that for tables defined with MyISAM engine, the tables would still be locked despite this property being set as MyISAM acquires a table lock. This behaviour is unlike InnoDB engine which acquires row level locks.
`none` Will prevent the connector from acquiring any table locks during the snapshot process. This value can be used with all snapshot modes but it is safe to use if and _only_ if no schema changes are happening while the snapshot is taken. Note that for tables defined with MyISAM engine, the tables would still be locked despite this property being set as MyISAM acquires a table lock. This behavior is unlike InnoDB engine which acquires row level locks.
|[[connector-property-snapshot-select-statement-overrides]]<<connector-property-snapshot-select-statement-overrides, `snapshot.select.statement.overrides`>>
|