DBZ-2621 Make fulfillment the name of the connector.

This commit is contained in:
Tova Cohen 2020-10-12 16:44:15 -04:00 committed by Jiri Pechanec
parent 53031f4dff
commit d03625f01c

View File

@ -3,8 +3,6 @@
[id="debezium-connector-for-postgresql"]
= {prodname} connector for PostgreSQL
:product: true
ifdef::community[]
:toc:
@ -1086,7 +1084,11 @@ When a row is deleted, the _delete_ event value still works with log compaction,
[[postgresql-data-types]]
== Data type mappings
The PostgreSQL connector represents changes to rows with events that are structured like the table in which the row exists. The event contains a field for each column value. How that value is represented in the event depends on the PostgreSQL data type of the column. This section describes these mappings.
The PostgreSQL connector represents changes to rows with events that are structured like the table in which the row exists. The event contains a field for each column value. How that value is represented in the event depends on the PostgreSQL data type of the column. The following sections describe how the connector maps PostgreSQL data types to a _literal type_ and a _semantic type_ in event fields.
* _literal type_ describes how the value is literally represented using Kafka Connect schema types: `INT8`, `INT16`, `INT32`, `INT64`, `FLOAT32`, `FLOAT64`, `BOOLEAN`, `STRING`, `BYTES`, `ARRAY`, `MAP`, and `STRUCT`.
* _semantic type_ describes how the Kafka Connect schema captures the _meaning_ of the field using the name of the Kafka Connect schema for the field.
ifdef::product[]
Details are in the following sections:
@ -1106,11 +1108,7 @@ endif::product[]
[id="postgresql-basic-types"]
=== Basic types
The following table describes how the connector maps basic PostgreSQL data types to a _literal type_ and a _semantic type_ in event fields.
* _literal type_ describes how the value is literally represented using Kafka Connect schema types: `INT8`, `INT16`, `INT32`, `INT64`, `FLOAT32`, `FLOAT64`, `BOOLEAN`, `STRING`, `BYTES`, `ARRAY`, `MAP`, and `STRUCT`.
* _semantic type_ describes how the Kafka Connect schema captures the _meaning_ of the field using the name of the Kafka Connect schema for the field.
The following table describes how the connector maps basic types.
.Mappings for PostgreSQL basic data types
[cols="25%a,20%a,55%a",options="header"]
@ -1955,7 +1953,11 @@ To deploy a {prodname} PostgreSQL connector, add the connector files to Kafka Co
[[postgresql-deploying-a-connector]]
=== Deploying connectors
To deploy a {prodname} connector, use the provided {prodname} container to build a custom Kafka Connect container image. After pushing the custom image to your container registry, create a `KafkaConnector` custom resource (CR) that configures your connector, and apply this CR to your container.
To deploy a {prodname} PostgreSQL connector, you need to build a custom Kafka Connect container image that contains the {prodname} connector archive and push this container image to a container registry.You then need to create two custom resources (CRs):
* A `KafkaConnect` CR that configures your Kafka Connector and that specifies the name of the image that you created to run your {prodname} connector. You apply this CR to the OpenShift Kafka instance.
* A `KafkaConnector` CR that configures your {prodname} PostgreSQL connector. You apply this CR to the OpenShift instance where Red Hat AMQ Streams is deployed.
.Prerequisites
@ -1963,61 +1965,58 @@ To deploy a {prodname} connector, use the provided {prodname} container to build
* link:https://access.redhat.com/products/red-hat-amq#streams[Red Hat AMQ Streams] was used to set up and start running Apache Kafka and Kafka Connect on OpenShift. AMQ Streams offers operators and images that bring Kafka to OpenShift.
* Podman or Docker is installed and you have sufficient rights to create and manage containers.
* Podman or Docker is installed.
* You have an account and permissions to create and manage containers in the container registry (such as `quay.io` or `docker.io`) to which you plan to add the container that will run your Debezium connector.
.Procedure
. Add the {prodname} PostgreSQL connector files to Kafka Connect:
. Create the {prodname} PostgreSQL container for Kafka Connect:
.. Download the {prodname} link:https://access.redhat.com/jbossnetwork/restricted/listSoftware.html?product=red.hat.integration&downloadType=distributions[PostgreSQL connector archive].
.. Extract the {prodname} PostgreSQL connector archive to create a directory structure for the connector plug-in, for example:
+
[subs="+macros"]
----
pass:quotes[*tree ./my-plugins/*]
./my-plugins/
├── debezium-connector-postgresql
│ ├── ...
----
.. Add the plug-in's parent directory to your Kafka Connect `plugin.path`. For example, if you extracted the {prodname} PostgreSQL connector to `/kafka/connect/debezium-connector-postgresql`, execute this command:
+
[source]
----
plugin.path=/kafka/connect
----
.. Restart your Kafka Connect process so that it picks up the new files in the plug-in's parent directory.
. Create and publish a custom image for running your {prodname} connector:
.. Create a new `Dockerfile` by using `{DockerKafkaConnect}` as the base image. In the following example, you would replace _my-plugins_ with the name of your plug-ins directory:
.. Create a Docker file that uses `{DockerKafkaConnect}` as the base image.
For example, from a terminal window, enter the following:
+
[subs="+macros,+attributes"]
----
FROM {DockerKafkaConnect}
USER root:root
pass:quotes[COPY _./my-plugins/_ /opt/kafka/plugins/]
USER 1001
pass:quotes[*cat <<EOF >debezium-container-for-postgresql.yaml*] // <1>
pass:quotes[*FROM {DockerKafkaConnect}*]
pass:quotes[*USER root:root*]
pass:quotes[*COPY ./my-plugins/ /opt/kafka/plugins/*] // <2>
pass:quotes[*USER 1001*]
pass:quotes[*EOF*]
----
+
Before Kafka Connect starts running the connector, Kafka Connect loads any third-party plug-ins that are in the `/opt/kafka/plugins` directory.
<1> You can specify any file name that you want.
<2> Replace `my-plugins` with the name of your plug-ins directory.
.. Build the container image. For example, if you saved the `Dockerfile` that you created in the previous step as `debezium-container-for-postgresql`, and if the `Dockerfile` is in the current directory, then you would run the following command:
The command creates a Docker file with the name `debezium-container-for-postgresql.yaml` in the current directory.
.. Build the container image from the `debezium-container-for-postgresql.yaml` Docker file that you created in the previous step. From the directory that contains the file, run the following command:
+
[source,shell,options="nowrap"]
----
podman build -t debezium-container-for-postgresql:latest .
----
+
This command builds a container image with the name `debezium-container-for-postgresql`.
.. Push your custom image to your container registry, for example:
.. Push your custom image to a container registry such as `quay.io` or any internal container registry. Ensure that this registry is reachable from your OpenShift instance. For example:
+
[source,shell,options="nowrap"]
----
podman push debezium-container-for-postgresql:latest
----
.. Update your Kafka environment to point to the new container image. Do this by editing your `KafkaConnect` custom resource to set the `spec.image` property to the name of the new container image. If set, this property overrides the `STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE` variable in the Cluster Operator. For example:
.. Create a new {prodname} PostgreSQL `KafkaConnect` custom resource (CR). For example, create a `KafkaConnect` CR with the name `dbz-connect.yaml` that specifies `annotations` and `image` properties as shown in the following example:
+
[source,yaml,subs="+attributes"]
----
@ -2025,40 +2024,49 @@ apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
name: my-connect-cluster
annotations: strimzi.io/use-connector-resources: "true"
annotations: strimzi.io/use-connector-resources: "true" // <1>
spec:
image: debezium-container-for-postgresql
image: debezium-container-for-postgresql // <2>
----
<1> `metadata.annotations` indicates to the Cluster Operator that `KafkaConnector` resources are used to configure connectors in this Kafka Connect cluster.
<2> `spec.image` specifies the name of the image that you created to run your {prodname} connector. This property overrides the `STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE` variable in the Cluster Operator.
.. Apply your `KafkaConnect` CR to the OpenShift Kafka instance by running the following command:
+
[source,shell,options="nowrap"]
----
oc create -f dbz-connect.yaml
----
+
The `metadata.annotations` setting indicates to the Cluster Operator that `KafkaConnector` resources are used to configure connectors in this Kafka Connect cluster.
+
The `spec.image` setting specifies the name of the image that you created to run your {prodname} connector.
+
Alternatively, you can edit the `install/cluster-operator/050-Deployment-strimzi-cluster-operator.yaml` file to set the `STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE` variable to point to the new container image. If you edit this file you must apply it to your OpenShift cluster and then reinstall the Cluster Operator.
This updates your Kafka Connect environment in OpenShift to add a Kafka Connector instance that specifies the name of the image that you created to run your {prodname} connector.
. Create a `KafkaConnector` custom resource that configures your {prodname} PostgreSQL connector instance. Typically, you configure a {prodname} PostgreSQL connector in a `.yaml` file that sets connector configuration properties. For example, a connector configuration can produce events for a subset of the schemas and tables, or it can ignore, mask, or truncate columns that are sensitive, too large, or not needed. See the {link-prefix}:{link-postgresql-connector}#postgresql-connector-properties[complete list of PostgreSQL connector properties] that can be specified in these configurations.
. Create a `KafkaConnector` custom resource that configures your {prodname} PostgreSQL connector instance.
+
The following example configures a {prodname} connector that connects to a PostgreSQL server on port 5432 at 192.168.99.100, whose logical name is `fullfillment`.
You configure a {prodname} PostgreSQL connector in a `.yaml` file that sets connector configuration properties. A connector configuration might instruct {prodname} to produce events for a subset of the schemas and tables, or it might set properties so that {prodname} ignores, masks, or truncates values in specified columns that are sensitive, too large, or not needed. See the {link-prefix}:{link-postgresql-connector}#postgresql-connector-properties[complete list of PostgreSQL connector properties] that can be specified in these configurations.
+
The following example configures a {prodname} connector that connects to a PostgreSQL server host, `192.168.99.100`, on port `5432`. This host has a database named `sampledb`, a schema named `public`, and `fulfillment` is the server's logical name.
+
.`fulfillment-connector.yaml`
[source,yaml,options="nowrap"]
----
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
name: inventory-connector // <1>
name: fulfillment-connector // <1>
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: io.debezium.connector.postgresql.PostgresConnector
tasksMax: 1 // <2>
config: // <3>
database.hostname: postgresqldb // <4>
database.hostname: 192.168.99.100 // <4>
database.port: 5432
database.user: debezium
database.password: dbz
database.dbname: postgres
database.server.name: fullfillment // <5>
schema.include.list: public.inventory // <6>
database.dbname: sampledb
database.server.name: fulfillment // <5>
schema.include.list: public // <6>
plugin.name: pgoutput // <7>
----
<1> The name of the connector.
<2> Only one task should operate at any one time.
@ -2069,20 +2077,21 @@ and it automatically distributes the running tasks across the cluster of Kafka C
If any of the services stop or crash,
those tasks will be redistributed to running services.
<3> The connectors configuration.
<4> The database host, which is the name of the container that is running the PostgreSQL server (`postgresqldb`).
<4> The name of the database host that is running the PostgreSQL server. In this example, the database host name is `192.168.99.100`.
<5> A unique server name.
The server name is the logical identifier for the PostgreSQL server or cluster of servers.
This name is used as the prefix for all Kafka topics that receive change event records.
<6> Changes in only the `public.inventory` schema are captured.
<6> The connector captures changes in only the `public` schema. It is possible to configure the connector to capture changes in only the tables that you choose. See {link-prefix}:{link-postgresql-connector}#postgresql-property-table-include-list[`table.include.list` connector configuration property].
<7> The name of the PostgreSQL {link-prefix}:{link-postgresql-connector}#postgresql-output-plugin[logical decoding plug-in] installed on the PostgreSQL server. While the only supported value for PostgreSQL 10 and later is `pgoutput`, you must explicitly set `plugin.name` to `pgoutput`.
. Register your connector instance with Kafka Connect. For example, if you saved your `KafkaConnector` resource in the `inventory-connector.yaml` file, you would run the following command:
. Create your connector instance with Kafka Connect. For example, if you saved your `KafkaConnector` resource in the `fulfillment-connector.yaml` file, you would run the following command:
+
[source,shell,options="nowrap"]
----
oc apply -f inventory-connector.yaml
oc apply -f fulfillment-connector.yaml
----
+
This registers `inventory-connector` and the connector starts to run against the `inventory` database.
This registers `fulfillment-connector` and the connector starts to run against the `sampledb` database as defined in the `KafkaConnector` CR.
. Verify that the connector was created and has started:
.. Display the Kafka Connect log output to verify that the connector was created and has started to capture changes in the specified database:
@ -2099,6 +2108,15 @@ oc logs $(oc get pods -o name -l strimzi.io/cluster=my-connect-cluster)
... INFO Starting snapshot for ...
... INFO Snapshot is using user 'debezium' ...
----
+
If the connector starts correctly without errors, it creates a topic for each table whose changes the connector is capturing. For the example CR, there would be a topic for each table in the `public` schema. Downstream applications can subscribe to these topics.
.. Verify that the connector created the topics by running the following command:
+
[source,shell,options="nowrap"]
----
oc get kafkatopics
----
.Results
@ -2114,14 +2132,14 @@ ifdef::community[]
[[postgresql-example-configuration]]
=== Connector configuration example
Following is an example of the configuration for a PostgreSQL connector that connects to a PostgreSQL server on port 5432 at 192.168.99.100, whose logical name is `fullfillment`. Typically, you configure the {prodname} PostgreSQL connector in a `.json` file using the configuration properties available for the connector.
Following is an example of the configuration for a PostgreSQL connector that connects to a PostgreSQL server on port 5432 at 192.168.99.100, whose logical name is `fulfillment`. Typically, you configure the {prodname} PostgreSQL connector in a `.json` file using the configuration properties available for the connector.
You can choose to produce events for a subset of the schemas and tables. Optionally, ignore, mask, or truncate columns that are sensitive, too large, or not needed.
[source,json]
----
{
"name": "inventory-connector", // <1>
"name": "fulfillment-connector", // <1>
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector", // <2>
"database.hostname": "192.168.99.100", // <3>
@ -2129,7 +2147,7 @@ You can choose to produce events for a subset of the schemas and tables. Optiona
"database.user": "postgres", // <5>
"database.password": "postgres", // <6>
"database.dbname" : "postgres", // <7>
"database.server.name": "fullfillment", // <8>
"database.server.name": "fulfillment", // <8>
"table.include.list": "public.inventory" // <9>
}
@ -2496,7 +2514,7 @@ endif::community[]
|
|Controls which table rows are included in snapshots. This property affects snapshots only. It does not affect events that are generated by the logical decoding plug-in. Specify a comma-separated list of fully-qualified table names in the form _databaseName.tableName_. +
+
For each table that you specify, also specify another configuration property: `snapshot.select.statement.overrides._DB_NAME_._TABLE_NAME_`, for example: `snapshot.select.statement.overrides.customers.orders`. Set this property to a `SELECT` statement that obtains only the rows that you want in the snapshot. When the connector performs a snapshot, it executes this `SELECT` statement to retrieve data from that table. +
For each table that you specify, also specify another configuration property: `snapshot.select.statement{zwsp}.overrides._DB_NAME_._TABLE_NAME_`, for example: `snapshot.select.statement{zwsp}.overrides.customers.orders`. Set this property to a `SELECT` statement that obtains only the rows that you want in the snapshot. When the connector performs a snapshot, it executes this `SELECT` statement to retrieve data from that table. +
+
A possible use case for setting these properties is large, append-only tables. You can specify a `SELECT` statement that sets a specific point for where to start a snapshot, or where to resume a snapshot if a previous snapshot was interrupted.
@ -2552,7 +2570,7 @@ Heartbeat messages are needed when there are many updates in a database that is
+
_<heartbeat.topics.prefix>_._<server.name>_ +
+
For example, if the database server name is `fullfillment`, the default topic name is `__debezium-heartbeat.fulfillment`.
For example, if the database server name is `fulfillment`, the default topic name is `__debezium-heartbeat.fulfillment`.
|[[postgresql-property-heartbeat-action-query]]<<postgresql-property-heartbeat-action-query, `heartbeat.action{zwsp}.query`>>
|
@ -2662,7 +2680,7 @@ include::{partialsdir}/modules/all-connectors/ref-connector-monitoring-streaming
// Type: assembly
// Type: reference
// ModuleID: how-debezium-postgresql-connectors-handle-faults-and-problems
// Title: How {prodname} PostgreSQL connectors handle faults and problems
[[postgresql-when-things-go-wrong]]