tet123/documentation/modules/ROOT/pages/tutorial.adoc
2019-09-27 08:56:56 +02:00

1448 lines
105 KiB
Plaintext

= Tutorial
include::_attributes.adoc[]
:toc:
:toc-placement: macro
:sectanchors:
:linkattrs:
:icons: font
:source-highlighter: highlight.js
toc::[]
This tutorial walks you through running Debezium {debezium-version} for change data capture (CDC). You will use Docker (1.9 or later) to start the Debezium services, run a MySQL database server with a simple example database, use Debezium to monitor the database, and see the resulting event streams respond as the data in the database changes.
[NOTE]
====
You have already completed the tutorial?
Try link:#docker-compose[the fast track] using Docker Compose, including example set-ups for all the databases supported by Debezium (MySQL, Postgres, MongoDB, SQL Server and Oracle).
====
== What is Debezium?
Debezium is a distributed platform that turns your existing databases into event streams, so applications can see and respond immediately to each row-level change in the databases. Debezium is built on top of http://kafka.apache.org[Apache Kafka] and provides http://kafka.apache.org/documentation.html#connect[Kafka Connect] compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, from where your application consumes them. This makes it possible for your application to easily consume all of the events correctly and completely. Even if your application stops (or crashes), upon restart it will start consuming the events where it left off so it misses nothing.
Debezium {debezium-version} includes support for monitoring MySQL database servers with its xref:connectors/mysql.adoc[MySQL connector], and this is what we'll use in this demonstration. Support for other DBMSes will be added in future releases.
== Running Debezium with Docker
Running Debezium involves three major services: http://zookeeper.apache.org[ZooKeeper], Kafka, and Debezium's connector service. This tutorial walks you through starting a single instance of these services using http://docker.com[Docker] and https://hub.docker.com/u/debezium/[Debezium's Docker images]. Production environments, on the other hand, require running multiple instances of each service to provide the performance, reliability, replication, and fault tolerance. This can be done with a platform like https://www.openshift.com[OpenShift] and http://kubernetes.io[Kubernetes] that manages multiple Docker containers running on multiple hosts and machines, but often you'll want to xref:install.adoc[install on dedicated hardware].
== Starting Docker
Make sure that Docker is https://docs.docker.com/engine/installation/[installed and running] on Linux, OS X, or Windows. We highly recommend using the latest version of Docker on these platforms, and we've written these instructions with this in mind. (Running Docker in a virtual machine via Docker Machine is no longer the preferred approach, and Docker recommends you upgrade.)
== Starting simple with Debezium
For simple evaluation and experimentation, this tutorial will walk you through starting a single instance of each service in a separate container on your local machine. ZooKeeper and Kafka both store data locally inside the container, and normal usage requires mounting directories on the host machines as volumes so that when the containers stop the persisted data will remain. We're skipping that in this tutorial, although the documentation for our https://hub.docker.com/r/debezium/[Docker images] describes how to do that. This means that when a container is removed, all persisted data will be lost. That's actually ideal for our experiment, since nothing will be left on your computer when we're finished, and you can run this experiment many times without having to clean anything up in between.
Running multiple services locally can be confusing, so we're going to use a separate terminal to run each container in the foreground. This way all of the output of a container will be displayed in the terminal used to run it.
[NOTE]
====
This is not the only way to run Docker containers. Rather than running a container in the foreground (with `-it`), Docker lets you run a container in _detached_ mode (with `-d`), where the container is started and the Docker command returns immediately. Detached mode containers don't display their output in the terminal, though you can always see the output by using `docker logs --follow --name <container-name>`. This is one reason we name each of the containers we run. See the Docker documentation for more detail.
====
[[start-zookeeper]]
=== Start ZooKeeper
Of all the different services/processes that make up Debezium, the first one to start is ZooKeeper. Start a new terminal and start a container with ZooKeeper by running:
[source,bash,subs="attributes"]
----
$ docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:{debezium-docker-label}
----
This runs a new container using version {debezium-docker-label} of the `debezium/zookeeper` image, and assigns the name `zookeeper` to this container. The `-it` flag makes the container interactive, meaning it attaches the terminal's standard input and output to the container so that you can see what is going on in the container. The `--rm` flag instructs Docker to remove the container when it is stopped. The three `-p` options map three of the container's ports (e.g., 2181, 2888, and 3888) to the same ports on the Docker host so that other containers (and software outside the container) can talk with ZooKeeper.
You should see in your terminal the typical output of ZooKeeper:
[listing,indent=0,options="nowrap"]
----
Starting up in standalone mode
ZooKeeper JMX enabled by default
Using config: /zookeeper/conf/zoo.cfg
2017-09-21 07:15:55,417 - INFO [main:QuorumPeerConfig@134] - Reading configuration from: /zookeeper/conf/zoo.cfg
2017-09-21 07:15:55,419 - INFO [main:DatadirCleanupManager@78] - autopurge.snapRetainCount set to 3
2017-09-21 07:15:55,419 - INFO [main:DatadirCleanupManager@79] - autopurge.purgeInterval set to 1
2017-09-21 07:15:55,420 - WARN [main:QuorumPeerMain@113] - Either no config or no quorum defined in config, running in standalone mode
2017-09-21 07:15:55,420 - INFO [PurgeTask:DatadirCleanupManager$PurgeTask@138] - Purge task started.
2017-09-21 07:15:55,425 - INFO [PurgeTask:DatadirCleanupManager$PurgeTask@144] - Purge task completed.
2017-09-21 07:15:55,427 - INFO [main:QuorumPeerConfig@134] - Reading configuration from: /zookeeper/conf/zoo.cfg
2017-09-21 07:15:55,427 - INFO [main:ZooKeeperServerMain@96] - Starting server
2017-09-21 07:15:55,432 - INFO [main:Environment@100] - Server environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT
2017-09-21 07:15:55,432 - INFO [main:Environment@100] - Server environment:host.name=51b46dd211d0
2017-09-21 07:15:55,432 - INFO [main:Environment@100] - Server environment:java.version=1.8.0_131
2017-09-21 07:15:55,432 - INFO [main:Environment@100] - Server environment:java.vendor=Oracle Corporation
2017-09-21 07:15:55,432 - INFO [main:Environment@100] - Server environment:java.home=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.131-3.b12.el7_3.x86_64/jre
2017-09-21 07:15:55,432 - INFO [main:Environment@100] - Server environment:java.class.path=/zookeeper/bin/../build/classes:/zookeeper/bin/../build/lib/*.jar:/zookeeper/bin/../lib/slf4j-log4j12-1.6.1.jar:/zookeeper/bin/../lib/slf4j-api-1.6.1.jar:/zookeeper/bin/../lib/netty-3.10.5.Final.jar:/zookeeper/bin/../lib/log4j-1.2.16.jar:/zookeeper/bin/../lib/jline-0.9.94.jar:/zookeeper/bin/../zookeeper-3.4.10.jar:/zookeeper/bin/../src/java/lib/*.jar:/zookeeper/conf:
2017-09-21 07:15:55,432 - INFO [main:Environment@100] - Server environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
2017-09-21 07:15:55,432 - INFO [main:Environment@100] - Server environment:java.io.tmpdir=/tmp
2017-09-21 07:15:55,433 - INFO [main:Environment@100] - Server environment:java.compiler=<NA>
2017-09-21 07:15:55,433 - INFO [main:Environment@100] - Server environment:os.name=Linux
2017-09-21 07:15:55,433 - INFO [main:Environment@100] - Server environment:os.arch=amd64
2017-09-21 07:15:55,433 - INFO [main:Environment@100] - Server environment:os.version=4.4.0-93-generic
2017-09-21 07:15:55,433 - INFO [main:Environment@100] - Server environment:user.name=zookeeper
2017-09-21 07:15:55,433 - INFO [main:Environment@100] - Server environment:user.home=/zookeeper
2017-09-21 07:15:55,433 - INFO [main:Environment@100] - Server environment:user.dir=/zookeeper
2017-09-21 07:15:55,435 - INFO [main:ZooKeeperServer@829] - tickTime set to 2000
2017-09-21 07:15:55,435 - INFO [main:ZooKeeperServer@838] - minSessionTimeout set to -1
2017-09-21 07:15:55,435 - INFO [main:ZooKeeperServer@847] - maxSessionTimeout set to -1
2017-09-21 07:15:55,440 - INFO [main:NIOServerCnxnFactory@89] - binding to port 0.0.0.0/0.0.0.0:2181
----
The last line is important and reports that ZooKeeper is ready and listening on port 2181. The terminal will continue to show additional output as ZooKeeper generates it.
[[start-kafka]]
=== Start Kafka
Open a new terminal, and use it to start Kafka in a new container by running:
[source,bash,subs="attributes"]
----
$ docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:{debezium-docker-label}
----
[NOTE]
====
In this tutorial we're always connecting to Kafka from within a Docker container, and they'll always be able to see and communicate with the `kafka` container as long as we link to the `kafka` container. If we wanted to connect to Kafka from _outside_ of a Docker container, then we'd want Kafka to _advertise_ its address via the Docker host, which we could do by adding `-e ADVERTISED_HOST_NAME=` followed by the IP address or resolvable hostname of the Docker host, which on Linux or Docker on Mac this is the IP address of the host computer (not `localhost`).
====
This runs a new container using version {debezium-docker-label} of the `debezium/kafka` image, and assigns the name `kafka` to this container. The `-it` flag makes the container interactive, meaning it attaches the terminal's standard input and output to the container so that you can see what is going on in the container. The `--rm` flag instructs Docker to remove the container when it is stopped. The command maps port 9092 in the container to the same port on the Docker host so that software outside of the container can talk with Kafka. Finally, the command uses the `--link zookeeper:zookeeper` argument to tell the container that it can find ZooKeeper in the container named `zookeeper` running on the same Docker host.
You should see in your terminal the typical output of Kafka, ending with:
[listing,indent=0,options="nowrap"]
----
...
2017-09-21 07:16:59,085 - INFO [main-EventThread:ZkClient@713] - zookeeper state changed (SyncConnected)
2017-09-21 07:16:59,218 - INFO [main:Logging$class@70] - Cluster ID = LPtcBFxzRvOzDSXhc6AamA
2017-09-21 07:16:59,221 - WARN [main:Logging$class@85] - No meta.properties file under dir /kafka/data/1/meta.properties
2017-09-21 07:16:59,247 - INFO [ThrottledRequestReaper-Fetch:Logging$class@70] - [ThrottledRequestReaper-Fetch]: Starting
2017-09-21 07:16:59,247 - INFO [ThrottledRequestReaper-Produce:Logging$class@70] - [ThrottledRequestReaper-Produce]: Starting
2017-09-21 07:16:59,248 - INFO [ThrottledRequestReaper-Request:Logging$class@70] - [ThrottledRequestReaper-Request]: Starting
2017-09-21 07:16:59,308 - INFO [main:Logging$class@70] - Loading logs.
2017-09-21 07:16:59,312 - INFO [main:Logging$class@70] - Logs loading complete in 4 ms.
2017-09-21 07:16:59,349 - INFO [main:Logging$class@70] - Starting log cleanup with a period of 300000 ms.
2017-09-21 07:16:59,353 - INFO [main:Logging$class@70] - Starting log flusher with a default period of 9223372036854775807 ms.
2017-09-21 07:16:59,385 - INFO [main:Logging$class@70] - Awaiting socket connections on 172.17.0.4:9092.
2017-09-21 07:16:59,387 - INFO [main:Logging$class@70] - [Socket Server on Broker 1], Started 1 acceptor threads
2017-09-21 07:16:59,394 - INFO [ExpirationReaper-1-Produce:Logging$class@70] - [ExpirationReaper-1-Produce]: Starting
2017-09-21 07:16:59,395 - INFO [ExpirationReaper-1-Fetch:Logging$class@70] - [ExpirationReaper-1-Fetch]: Starting
2017-09-21 07:16:59,395 - INFO [ExpirationReaper-1-DeleteRecords:Logging$class@70] - [ExpirationReaper-1-DeleteRecords]: Starting
2017-09-21 07:16:59,435 - INFO [ExpirationReaper-1-topic:Logging$class@70] - [ExpirationReaper-1-topic]: Starting
2017-09-21 07:16:59,441 - INFO [ExpirationReaper-1-Heartbeat:Logging$class@70] - [ExpirationReaper-1-Heartbeat]: Starting
2017-09-21 07:16:59,442 - INFO [controller-event-thread:Logging$class@70] - Creating /controller (is it secure? false)
2017-09-21 07:16:59,447 - INFO [ExpirationReaper-1-Rebalance:Logging$class@70] - [ExpirationReaper-1-Rebalance]: Starting
2017-09-21 07:16:59,456 - INFO [controller-event-thread:Logging$class@70] - Result of znode creation is: OK
2017-09-21 07:16:59,458 - INFO [main:Logging$class@70] - [GroupCoordinator 1]: Starting up.
2017-09-21 07:16:59,459 - INFO [main:Logging$class@70] - [GroupCoordinator 1]: Startup complete.
2017-09-21 07:16:59,460 - INFO [group-metadata-manager-0:Logging$class@70] - [Group Metadata Manager on Broker 1]: Removed 0 expired offsets in 1 milliseconds.
2017-09-21 07:16:59,487 - INFO [main:Logging$class@70] - [ProducerId Manager 1]: Acquired new producerId block (brokerId:1,blockStartProducerId:0,blockEndProducerId:999) by writing to Zk with path version 1
2017-09-21 07:16:59,530 - INFO [main:Logging$class@70] - [Transaction Coordinator 1]: Starting up.
2017-09-21 07:16:59,532 - INFO [TxnMarkerSenderThread-1:Logging$class@70] - [Transaction Marker Channel Manager 1]: Starting
2017-09-21 07:16:59,532 - INFO [main:Logging$class@70] - [Transaction Coordinator 1]: Startup complete.
2017-09-21 07:16:59,551 - INFO [main:Logging$class@70] - Will not load MX4J, mx4j-tools.jar is not in the classpath
2017-09-21 07:16:59,590 - INFO [main:Logging$class@70] - Creating /brokers/ids/1 (is it secure? false)
2017-09-21 07:16:59,604 - INFO [main:Logging$class@70] - Result of znode creation is: OK
2017-09-21 07:16:59,605 - INFO [main:Logging$class@70] - Registered broker 1 at path /brokers/ids/1 with addresses: EndPoint(172.17.0.4,9092,ListenerName(PLAINTEXT),PLAINTEXT)
2017-09-21 07:16:59,606 - WARN [main:Logging$class@85] - No meta.properties file under dir /kafka/data/1/meta.properties
2017-09-21 07:16:59,648 - INFO [main:AppInfoParser$AppInfo@83] - Kafka version : 0.11.0.0
2017-09-21 07:16:59,648 - INFO [main:AppInfoParser$AppInfo@84] - Kafka commitId : cb8625948210849f
2017-09-21 07:16:59,649 - INFO [main:Logging$class@70] - [Kafka Server 1], started
----
The last line shown above reports that the Kafka broker has successfully started and is ready for client connections. The terminal will continue to show additional output as Kafka generates it.
[TIP]
====
Debezium {debezium-version} requires Kafka Connect {debezium-kafka-version}, and in this tutorial we also use version {debezium-kafka-version} of the Kafka broker. Check the http://kafka.apache.org/documentation.html[Kafka documentation] about compatibility between different versions of Kafka Connect and the Kafka broker.
====
[[start-mysql]]
=== Start a MySQL database
At this point, we've started ZooKeeper and Kafka, but we don't yet have a database server from which Debezium can capture changes. Now, let's start a MySQL server with an example database.
Open a new terminal, and use it to start a new container that runs a MySQL database server preconfigured with an `inventory` database:
[source,bash,subs="attributes"]
----
$ docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:{debezium-docker-label}
----
This runs a new container using version {debezium-docker-label} of the `debezium/example-mysql` image, which is https://github.com/debezium/docker-images/blob/master/examples/mysql/0.1/Dockerfile[based on] the https://hub.docker.com/r/_/mysql/[mysql:5.7] image, defines and populates a sample "inventory" database, and creates a `debezium` user with password `dbz` that has the minimum privileges required by Debezium's MySQL connector. The command assigns the name `mysql` to the container so that it can be easily referenced later. The `-it` flag makes the container interactive, meaning it attaches the terminal's standard input and output to the container so that you can see what is going on in the container. The `--rm` flag instructs Docker to remove the container when it is stopped. The command maps port 3306 (the default MySQL port) in the container to the same port on the Docker host so that software outside of the container can connect to the database server. And finally, it also uses the `-e` option three times to set the `MYSQL_ROOT_PASSWORD`, `MYSQL_USER`, and `MYSQL_PASSWORD` environment variables to specific values.
You should see in your terminal something like the following:
[listing,indent=0,options="nowrap"]
----
...
017-09-21T07:18:50.824629Z 0 [Note] mysqld: ready for connections.
Version: '5.7.19-log' socket: '/var/run/mysqld/mysqld.sock' port: 3306 MySQL Community Server (GPL)
----
Notice that the MySQL server starts and stops a few times as the configuration is modified. The last line listed above reports that the MySQL server is running and ready for use.
[[start-mysql-command-line]]
=== Start a MySQL command line client
Open a new terminal, and use it to start a new container for the MySQL command line client and connect it to the MySQL server running in the `mysql` container:
[source,bash,indent=0]
----
$ docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
----
Here we start the container using the https://hub.docker.com/r/_/mysql/[mysql:5.7] image, name the container `mysqlterm` and link it to the `mysql` container where the database server is running. The `--rm` option tells Docker to remove the container when it stops, and the rest of the command defines the shell command that the container should run. This shell command runs the MySQL command line client and specifies the correct options so that it can connect properly.
The container should output lines similar to the following:
[source,bash,indent=0]
----
mysql: [Warning] Using a password on the command line interface can be insecure.
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 3
Server version: 5.7.17-log MySQL Community Server (GPL)
Copyright (c) 2000, 2016, Oracle and/or its affiliates. All rights reserved.
Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql>
----
Unlike the other containers, this container runs a process that produces a prompt. We'll use the prompt to interact with the database. First, switch to the "inventory" database:
[source,sql,indent=0]
----
mysql> use inventory;
----
and then list the tables in the database:
[source,sql,indent=0]
----
mysql> show tables;
----
which should then display:
[source,sql,indent=0]
----
+---------------------+
| Tables_in_inventory |
+---------------------+
| customers |
| orders |
| products |
| products_on_hand |
+---------------------+
4 rows in set (0.00 sec)
----
Use the MySQL command line client to explore the database and view the pre-loaded data in the database. For example:
[source,sql,indent=0]
----
mysql> SELECT * FROM customers;
----
[[start-kafka-connect]]
=== Start Kafka Connect
Open a new terminal, and use it to start the Kafka Connect service in a new container by running:
[source,bash,subs="attributes"]
----
$ docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:{debezium-docker-label}
----
This runs a new Docker container named `connect` using version {debezium-docker-label} of the `debezium/connect` image. The `-it` flag makes the container interactive, meaning it attaches the terminal's standard input and output to the container so that you can see what is going on in the container. The `--rm` flag instructs Docker to remove the container when it is stopped. The command maps port 8083 in the container to the same port on the Docker host so that software outside of the container can use Kafka Connect's REST API to set up and manage new connector instances. The command uses the `--link zookeeper:zookeeper`, `--link kafka:kafka`, and `--link mysql:mysql`, arguments to tell the container that it can find ZooKeeper running in the container named `zookeeper`, the Kafka broker running in the container named `kafka`, and the MySQL server running in the container named `mysql`, all running on the same Docker host. And finally, it also uses the `-e` option three times to set the `GROUP_ID`, `CONFIG_STORAGE_TOPIC`, `OFFSET_STORAGE_TOPIC`, and `STATUS_STORAGE_TOPIC` environment variables, which are all required by this Debezium image (though you can use different values as desired).
You should see in your terminal the typical output of Kafka, ending with:
[listing,indent=0,options="nowrap"]
----
...
2017-09-21 07:21:14,912 INFO || Kafka version : 0.11.0.0 [org.apache.kafka.common.utils.AppInfoParser]
2017-09-21 07:21:14,912 INFO || Kafka commitId : cb8625948210849f [org.apache.kafka.common.utils.AppInfoParser]
2017-09-21 07:21:14,929 INFO || Discovered coordinator 172.17.0.4:9092 (id: 2147483646 rack: null) for group 1. [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
2017-09-21 07:21:14,931 INFO || Finished reading KafkaBasedLog for topic my_connect_configs [org.apache.kafka.connect.util.KafkaBasedLog]
2017-09-21 07:21:14,932 INFO || Started KafkaBasedLog for topic my_connect_configs [org.apache.kafka.connect.util.KafkaBasedLog]
2017-09-21 07:21:14,932 INFO || Started KafkaConfigBackingStore [org.apache.kafka.connect.storage.KafkaConfigBackingStore]
2017-09-21 07:21:14,932 INFO || Herder started [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2017-09-21 07:21:14,938 INFO || Discovered coordinator 172.17.0.4:9092 (id: 2147483646 rack: null) for group 1. [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
2017-09-21 07:21:14,940 INFO || (Re-)joining group 1 [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
2017-09-21 07:21:15,022 INFO || Successfully joined group 1 with generation 1 [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
2017-09-21 07:21:15,022 INFO || Joined group and got assignment: Assignment{error=0, leader='connect-1-4d60cb71-cb93-4388-8908-6f0d299a9d94', leaderUrl='http://172.17.0.7:9092/', offset=-1, connectorIds=[], taskIds=[]} [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2017-09-21 07:21:15,023 INFO || Starting connectors and tasks using config offset -1 [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2017-09-21 07:21:15,023 INFO || Finished starting connectors and tasks [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
----
The last few line shown above reports that the service has started and is ready for connections. The terminal will continue to show additional output as the Kafka Connect service generates it.
[[kafka-connect-api]]
==== Using the Kafka Connect REST API
The Kafka Connect service exposes a RESTful API to manage the set of connectors, so let's use that API using the `curl` command line tool. Because we mapped port 8083 in the `connect` container (where the Kafka Connect service is running) to port 8083 on the Docker host, we can communicate to the service by sending the request to port 8083 on the Docker host, which then forwards the request to the Kafka Connect service. We are using `localhost` in our examples but users of non-native Docker platforms (like Docker Toolbox users on Windows and OS X) should replace `localhost` with the IP address of their Docker host.
Open a new terminal, and use it to check the status of the Kafka Connect service:
[source,bash,indent=0]
----
$ curl -H "Accept:application/json" localhost:8083/
----
The Kafka Connect service should return a JSON response message similar to the following:
[source,json,indent=0,subs="attributes"]
----
{"version":"{debezium-kafka-version}","commit":"cb8625948210849f"}
----
This shows that we're running Kafka Connect version {debezium-kafka-version}. Next, check the list of connectors, again using your IP address in place of `localhost`:
[source,bash,indent=0]
----
$ curl -H "Accept:application/json" localhost:8083/connectors/
----
which should return the following:
[source,json,indent=0]
----
[]
----
This confirms that the Kafka Connect service is running, that we can talk with it, and that it currently has no connectors. Let's remedy that by starting a connector that will capture changes from our MySQL database.
[[monitor-mysql]]
=== Monitor the MySQL database
At this point we are running the Debezium services, a MySQL database server with a sample `inventory` database, and the MySQL command line client that is connected to our database. The next step is to register a connector that will begin monitoring the MySQL database server's binlog and generate change events for each row that has been (or will be) changed. Since this is a new connector, when it starts it will start reading from the beginning of the MySQL binlog, which records all of the transactions, including individual row changes and changes to the schemas.
[NOTE]
====
Normally we'd likely want to use the Kafka tools to manually create the necessary topics, including specifying the number of replicas. However, for this tutorial, Kafka is configured to automatically create the topics with just 1 replica.
====
Using the same terminal, we'll use `curl` to submit to our Kafka Connect service a JSON request message with information about the connector we want to start. Since this command will not be in a Docker container, we need to use the IP address of our Docker host (so Docker Toolbox users on Windows and OS X should replace `localhost` with their IP address):
[source,bash,indent=0]
----
$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'
----
[NOTE]
====
Windows users may need to escape the double-quotes, like so:
[source,bash,indent=0]
----
$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ \"name\": \"inventory-connector\", \"config\": { \"connector.class\": \"io.debezium.connector.mysql.MySqlConnector\", \"tasks.max\": \"1\", \"database.hostname\": \"mysql\", \"database.port\": \"3306\", \"database.user\": \"debezium\", \"database.password\": \"dbz\", \"database.server.id\": \"184054\", \"database.server.name\": \"dbserver1\", \"database.whitelist\": \"inventory\", \"database.history.kafka.bootstrap.servers\": \"kafka:9092\", \"database.history.kafka.topic\": \"dbhistory.inventory\" } }'
----
to avoid this error:
[source,json,indent=0,subs="attributes"]
----
{"error_code":500,"message":"Unexpected character ('n' (code 110)): was expecting double-quote to start field name\n at [Source: (org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream); line: 1, column: 4]"}
----
====
This command uses the Kafka Connect service's RESTful API to submit a `POST` request against `/connectors` resource with a JSON document that describes our new connector. Here's the same JSON message in a more readable format:
[source,json,indent=0]
----
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
}
----
The JSON message specifies the connector name as `inventory-connector`, and provides the detailed xref:connectors/mysql.adoc#connector-properties[configuration properties for our MySQL connector]:
* Exactly one task should operate at any one time. Since the MySQL connect reads the MySQL server's binlog, and using a single connector task is the only way to ensure the proper order and that all events are handled properly.
* The database host is specified as `mysql`, which is the name of our Docker container running the MySQL server. Recall that Docker manipulates the network stack within our containers so that each linked container can be resolved via the `/etc/hosts` using the container name for the hostname. If MySQL were running on a normal network, we'd simply specify the IP address or resolvable hostname for this value.
* The MySQL server's port is specified.
* The MySQL database we're running has a `debezium` user set up expressly for our purposes, so we specify that username and password here.
* A unique server ID and name are given. The server name is the logical identifier for the MySQL server or cluster of servers, and will be used as the prefix for all Kafka topics.
* We only want to detect changes in the `inventory` database, so we use a whitelist.
* The connector should store the history of the database schemas in Kafka using the named broker (the same broker to which we're sending events) and topic name. Upon restart, the connector will recover the schemas of the database(s) that existed at the point in time in the binlog when the connector should begin reading.
This command should produce a response similar to the following (perhaps a bit more compact):
[source,http,indent=0]
----
HTTP/1.1 201 Created
Date: Tue, 07 Feb 2017 20:49:34 GMT
Location: http://localhost:8083/connectors/inventory-connector
Content-Type: application/json
Content-Length: 471
Server: Jetty(9.2.15.v20160210)
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.inventory",
"name": "inventory-connector"
},
"tasks": []
}
----
This response describes the connector resource `/connectors/inventory-connector` that the service just created and includes the connector's configuration and information about the tasks. Since the connector was just created, the service hasn't yet finished starting tasks.
We can even use the RESTful API to verify that our connector is included in the list of connectors:
[source,bash,indent=0]
----
$ curl -H "Accept:application/json" localhost:8083/connectors/
----
which should return the following:
[source,json,indent=0]
----
["inventory-connector"]
----
Recall that the Kafka Connect service uses connectors to start one or more tasks that do the work, and that it will automatically distribute the running tasks across the cluster of Kafka Connect services. Should any of the services stop or crash, those tasks will be redistributed to running services. We can see the tasks when we get the state of the connector:
[source,bash,indent=0]
----
$ curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector
----
which returns:
[source,http,indent=0]
----
HTTP/1.1 200 OK
Date: Mon, 27 Mar 2017 17:09:28 GMT
Content-Type: application/json
Content-Length: 515
Server: Jetty(9.2.15.v20160210)
{
"name": "inventory-connector",
"config": {
"name": "inventory-connector",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.inventory"
},
"tasks": [
{
"connector": "inventory-connector",
"task": 0
}
]
}
----
Here, we can see that the connector is running a single task (e.g., task 0) to do its work. The MySQL connector only supports a single task, since MySQL records all of its activities in one sequential binlog and so the MySQL connector needs only one reader to get a consistent and totally ordered view of all of those events.
If we look at the output of our `connect` container, we see that the connector has generated a lot of output. The first few lines related to our connector are output by Kafka Connect, and start with:
[listing,indent=0,options="nowrap"]
----
...
2017-09-21 07:23:59,051 INFO || Connector inventory-connector config updated [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2017-09-21 07:23:59,550 INFO || Rebalance started [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2017-09-21 07:23:59,550 INFO || Finished stopping tasks in preparation for rebalance [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2017-09-21 07:23:59,550 INFO || (Re-)joining group 1 [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
2017-09-21 07:23:59,556 INFO || Successfully joined group 1 with generation 2 [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
2017-09-21 07:23:59,556 INFO || Joined group and got assignment: Assignment{error=0, leader='connect-1-4d60cb71-cb93-4388-8908-6f0d299a9d94', leaderUrl='http://172.17.0.7:9092/', offset=1, connectorIds=[inventory-connector], taskIds=[]} [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2017-09-21 07:23:59,557 INFO || Starting connectors and tasks using config offset 1 [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2017-09-21 07:23:59,557 INFO || Starting connector inventory-connector [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
...
----
followed by a lot of output from Kafka Connect about starting this connector and the various producer and consumer configurations. Eventually, we see output like the following _from our MySQL connector_:
[listing,indent=0,options="nowrap"]
----
...
2017-09-21 07:24:01,151 INFO MySQL|dbserver1|task Kafka version : 0.11.0.0 [org.apache.kafka.common.utils.AppInfoParser]
2017-09-21 07:24:01,151 INFO MySQL|dbserver1|task Kafka commitId : cb8625948210849f [org.apache.kafka.common.utils.AppInfoParser]
2017-09-21 07:24:01,584 INFO MySQL|dbserver1|task Found no existing offset, so preparing to perform a snapshot [io.debezium.connector.mysql.MySqlConnectorTask]
2017-09-21 07:24:01,614 INFO || Source task WorkerSourceTask{id=inventory-connector-0} finished initialization and start [org.apache.kafka.connect.runtime.WorkerSourceTask]
2017-09-21 07:24:01,615 INFO MySQL|dbserver1|snapshot Starting snapshot for jdbc:mysql://mysql:3306/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=convertToNull with user 'debezium' [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,617 INFO MySQL|dbserver1|snapshot Snapshot is using user 'debezium' with these MySQL grants: [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,618 INFO MySQL|dbserver1|snapshot GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'%' [io.debezium.connector.mysql.SnapshotReader]
...
----
First, Debezium log output makes use of _mapped diagnostic contexts_, or MDC, which allow the log messages to include thread-specific information like the connector type (e.g., `MySQL` in the above log messages after "INFO" or "WARN" fields), the logical name of the connector (e.g., `dbserver1` above), and the connector's activity (e.g., `task`, `snapshot` and `binlog`). Hopefully these will make it easier to understand what is going on in the multi-threaded Kafka Connect service.
The first few lines involve the `task` activity of the connector, and basically report some bookkeeping information such that the connector was started with no prior offset. The new three lines involve the `snapshot` activity of the connector, specifically that a snapshot is being started using the `debezium` MySQL user and the MySQL grants associated with that user.
[TIP]
====
If the connector is not able to connect or does not see any tables or the binlog, check these grants to ensure that all of those listed above are included.
====
The next messages output by the connector are the following:
[listing,indent=0,options="nowrap"]
----
...
2017-09-21 07:24:01,618 INFO MySQL|dbserver1|snapshot MySQL server variables related to change data capture: [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,625 INFO MySQL|dbserver1|snapshot binlog_cache_size = 32768 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,625 INFO MySQL|dbserver1|snapshot binlog_checksum = CRC32 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,625 INFO MySQL|dbserver1|snapshot binlog_direct_non_transactional_updates = OFF [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,625 INFO MySQL|dbserver1|snapshot binlog_error_action = ABORT_SERVER [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot binlog_format = ROW [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot binlog_group_commit_sync_delay = 0 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot binlog_group_commit_sync_no_delay_count = 0 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot binlog_gtid_simple_recovery = ON [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot binlog_max_flush_queue_time = 0 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot binlog_order_commits = ON [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot binlog_row_image = FULL [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot binlog_rows_query_log_events = OFF [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot binlog_stmt_cache_size = 32768 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot character_set_client = utf8 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot character_set_connection = utf8 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot character_set_database = latin1 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot character_set_filesystem = binary [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot character_set_results = utf8 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot character_set_server = latin1 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot character_set_system = utf8 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot character_sets_dir = /usr/share/mysql/charsets/ [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot collation_connection = utf8_general_ci [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot collation_database = latin1_swedish_ci [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot collation_server = latin1_swedish_ci [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot enforce_gtid_consistency = OFF [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,626 INFO MySQL|dbserver1|snapshot gtid_executed_compression_period = 1000 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot gtid_mode = OFF [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot gtid_next = AUTOMATIC [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot gtid_owned = [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot gtid_purged = [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot innodb_api_enable_binlog = OFF [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot innodb_locks_unsafe_for_binlog = OFF [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot innodb_version = 5.7.19 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot log_statements_unsafe_for_binlog = ON [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot max_binlog_cache_size = 18446744073709547520 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot max_binlog_size = 1073741824 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot max_binlog_stmt_cache_size = 18446744073709547520 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot protocol_version = 10 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot session_track_gtids = OFF [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot slave_type_conversions = [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot sync_binlog = 1 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot system_time_zone = UTC [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot time_zone = SYSTEM [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot tls_version = TLSv1,TLSv1.1 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot tx_isolation = REPEATABLE-READ [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot tx_read_only = OFF [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot version = 5.7.19-log [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot version_comment = MySQL Community Server (GPL) [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,627 INFO MySQL|dbserver1|snapshot version_compile_machine = x86_64 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,628 INFO MySQL|dbserver1|snapshot version_compile_os = Linux [io.debezium.connector.mysql.SnapshotReader]
...
----
This reports the relevant MySQL server settings found by our MySQL connector. One of the most important is `binlog_format`, which is set to `ROW`. These lines are followed by the output of the 9 steps that make up the snapshot operation:
[listing,indent=0,options="nowrap"]
----
...
2017-09-21 07:24:01,628 INFO MySQL|dbserver1|snapshot Step 0: disabling autocommit and enabling repeatable read transactions [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,631 INFO MySQL|dbserver1|snapshot Step 1: start transaction with consistent snapshot [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,634 INFO MySQL|dbserver1|snapshot Step 2: flush and obtain global read lock to prevent writes to database [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,636 INFO MySQL|dbserver1|snapshot Step 3: read binlog position of MySQL master [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,638 INFO MySQL|dbserver1|snapshot using binlog 'mysql-bin.000003' at position '154' and gtid '' [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,638 INFO MySQL|dbserver1|snapshot Step 4: read list of available databases [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,638 INFO MySQL|dbserver1|snapshot list of available databases is: [information_schema, inventory, mysql, performance_schema, sys] [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,639 INFO MySQL|dbserver1|snapshot Step 5: read list of available tables in each database [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,641 INFO MySQL|dbserver1|snapshot including 'inventory.customers' [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,641 INFO MySQL|dbserver1|snapshot including 'inventory.orders' [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,641 INFO MySQL|dbserver1|snapshot including 'inventory.products' [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,641 INFO MySQL|dbserver1|snapshot including 'inventory.products_on_hand' [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,642 INFO MySQL|dbserver1|snapshot 'mysql.columns_priv' is filtered out, discarding [io.debezium.connector.mysql.SnapshotReader]
...
2017-09-21 07:24:01,670 INFO MySQL|dbserver1|snapshot snapshot continuing with database(s): [inventory] [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,670 INFO MySQL|dbserver1|snapshot Step 6: generating DROP and CREATE statements to reflect current database schemas: [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,679 INFO MySQL|dbserver1|snapshot SET character_set_server=latin1, collation_server=latin1_swedish_ci; [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,724 WARN MySQL|dbserver1|task Error while fetching metadata with correlation id 1 : {dbhistory.inventory=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]
2017-09-21 07:24:01,853 INFO MySQL|dbserver1|snapshot DROP TABLE IF EXISTS `inventory`.`products_on_hand` [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,861 INFO MySQL|dbserver1|snapshot DROP TABLE IF EXISTS `inventory`.`customers` [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,864 INFO MySQL|dbserver1|snapshot DROP TABLE IF EXISTS `inventory`.`orders` [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,866 INFO MySQL|dbserver1|snapshot DROP TABLE IF EXISTS `inventory`.`products` [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,881 INFO MySQL|dbserver1|snapshot DROP DATABASE IF EXISTS `inventory` [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,889 INFO MySQL|dbserver1|snapshot CREATE DATABASE `inventory` [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,893 INFO MySQL|dbserver1|snapshot USE `inventory` [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,914 INFO MySQL|dbserver1|snapshot CREATE TABLE `customers` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`first_name` varchar(255) NOT NULL,
`last_name` varchar(255) NOT NULL,
`email` varchar(255) NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `email` (`email`)
) ENGINE=InnoDB AUTO_INCREMENT=1005 DEFAULT CHARSET=latin1 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,932 INFO MySQL|dbserver1|snapshot CREATE TABLE `orders` (
`order_number` int(11) NOT NULL AUTO_INCREMENT,
`order_date` date NOT NULL,
`purchaser` int(11) NOT NULL,
`quantity` int(11) NOT NULL,
`product_id` int(11) NOT NULL,
PRIMARY KEY (`order_number`),
KEY `order_customer` (`purchaser`),
KEY `ordered_product` (`product_id`),
CONSTRAINT `orders_ibfk_1` FOREIGN KEY (`purchaser`) REFERENCES `customers` (`id`),
CONSTRAINT `orders_ibfk_2` FOREIGN KEY (`product_id`) REFERENCES `products` (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=10005 DEFAULT CHARSET=latin1 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,937 INFO MySQL|dbserver1|snapshot CREATE TABLE `products` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) NOT NULL,
`description` varchar(512) DEFAULT NULL,
`weight` float DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=110 DEFAULT CHARSET=latin1 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,941 INFO MySQL|dbserver1|snapshot CREATE TABLE `products_on_hand` (
`product_id` int(11) NOT NULL,
`quantity` int(11) NOT NULL,
PRIMARY KEY (`product_id`),
CONSTRAINT `products_on_hand_ibfk_1` FOREIGN KEY (`product_id`) REFERENCES `products` (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,947 INFO MySQL|dbserver1|snapshot Step 7: releasing global read lock to enable MySQL writes [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,949 INFO MySQL|dbserver1|snapshot Step 7: blocked writes to MySQL for a total of 00:00:00.312 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,950 INFO MySQL|dbserver1|snapshot Step 8: scanning contents of 4 tables while still in transaction [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,953 INFO MySQL|dbserver1|snapshot Step 8: - scanning table 'inventory.customers' (1 of 4 tables) [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,958 INFO MySQL|dbserver1|snapshot Step 8: - Completed scanning a total of 4 rows from table 'inventory.customers' after 00:00:00.005 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:01,959 INFO MySQL|dbserver1|snapshot Step 8: - scanning table 'inventory.orders' (2 of 4 tables) [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:02,014 INFO MySQL|dbserver1|snapshot Step 8: - Completed scanning a total of 4 rows from table 'inventory.orders' after 00:00:00.055 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:02,016 INFO MySQL|dbserver1|snapshot Step 8: - scanning table 'inventory.products' (3 of 4 tables) [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:02,017 INFO MySQL|dbserver1|snapshot Step 8: - Completed scanning a total of 9 rows from table 'inventory.products' after 00:00:00.001 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:02,018 INFO MySQL|dbserver1|snapshot Step 8: - scanning table 'inventory.products_on_hand' (4 of 4 tables) [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:02,019 INFO MySQL|dbserver1|snapshot Step 8: - Completed scanning a total of 9 rows from table 'inventory.products_on_hand' after 00:00:00.001 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:02,020 INFO MySQL|dbserver1|snapshot Step 8: scanned 26 rows in 4 tables in 00:00:00.069 [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:02,020 INFO MySQL|dbserver1|snapshot Step 9: committing transaction [io.debezium.connector.mysql.SnapshotReader]
2017-09-21 07:24:02,021 INFO MySQL|dbserver1|snapshot Completed snapshot in 00:00:00.405 [io.debezium.connector.mysql.SnapshotReader]
...
----
Each of these steps reports what the connector is doing to perform the consistent snapshot. For example, Step 6 involves reverse engineering the DDL create statements for the tables that are being captured; Step 7 releases the global write lock just 0.3 seconds after acquiring it, and Step 8 reads all of the rows in each of the tables and reports the time taken and number of rows found. Note that in our example database, the MySQL connector completed its consistent snapshot in just 0.38 seconds.
[NOTE]
====
This process will take longer with your databases, but the connector outputs enough log messages so that you can track what it is working on, even when the tables have very large numbers of rows. And although an exclusive write lock is used at the beginning of the snapshot process, this should be short even for large databases; this lock is released before any data is copied. See the xref:connectors/mysql.adoc[MySQL connector documentation] for more details.
====
The new five lines from Kafka Connect sound ominous, but basically tell us that _new_ Kafka topics were created and Kafka had to assign a new leader for each:
[listing,indent=0,options="nowrap"]
----
...
2017-09-21 07:24:02,632 WARN || Error while fetching metadata with correlation id 1 : {dbserver1=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]
2017-09-21 07:24:02,775 WARN || Error while fetching metadata with correlation id 5 : {dbserver1.inventory.customers=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]
2017-09-21 07:24:02,910 WARN || Error while fetching metadata with correlation id 9 : {dbserver1.inventory.orders=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]
2017-09-21 07:24:03,045 WARN || Error while fetching metadata with correlation id 13 : {dbserver1.inventory.products=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]
2017-09-21 07:24:03,179 WARN || Error while fetching metadata with correlation id 17 : {dbserver1.inventory.products_on_hand=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]
...
----
Finally, we see a line reporting that the connector has transitioned from its snapshot mode into continuously reading the MySQL server's binlog:
[listing,indent=0,options="nowrap"]
----
...
Sep 21, 2017 7:24:03 AM com.github.shyiko.mysql.binlog.BinaryLogClient connect
INFO: Connected to mysql:3306 at mysql-bin.000003/154 (sid:184054, cid:7)
2017-09-21 07:24:03,373 INFO MySQL|dbserver1|binlog Connected to MySQL binlog at mysql:3306, starting at binlog file 'mysql-bin.000003', pos=154, skipping 0 events plus 0 rows [io.debezium.connector.mysql.BinlogReader]
2017-09-21 07:25:01,096 INFO || Finished WorkerSourceTask{id=inventory-connector-0} commitOffsets successfully in 18 ms [org.apache.kafka.connect.runtime.WorkerSourceTask]
...
----
[[viewing-the-change-events]]
[[viewing-the-change-eventsl]]
=== Viewing the change events
We saw in the connector's output that events were written to five topics:
* `dbserver1`
* `dbserver1.inventory.products`
* `dbserver1.inventory.products_on_hand`
* `dbserver1.inventory.customers`
* `dbserver1.inventory.orders`
As described in the xref:connectors/mysql.adoc#topic-names[MySQL connector documentation], each topic names start with `dbserver1`, which is the logical name we gave our connector. The first is our xref:connectors/mysql.adoc#schema-change-topic[schema change topic] to which all of the DDL statements are written. The remaining four topics are used to capture the change events for each of our four tables, and their topic names include the database name (e.g., `inventory`) and the table name.
Let's look at all of the data change events in the `dbserver1.inventory.customers` topic. We'll use the `debezium/kafka` Docker image to start a new container that runs one of Kafka's utilities to watch the topic from the beginning of the topic:
[source,bash,subs="attributes"]
----
$ docker run -it --name watcher --rm --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:{debezium-docker-label} watch-topic -a -k dbserver1.inventory.customers
----
Again, we use the `--rm` flag since we want the container to be removed when it stops, and we use the `-a` flag on `watch-topic` to signal that we want to see _all_ events since the beginning of the topic. (If we were to remove the `-a` flag, we'd see only the events that are recorded in the topic _after_ we start watching.) The `-k` flag specifies that the output should include the event's key, which in our case contains the row's primary key. Here's the output:
[source,bash,indent=0,subs="attributes"]
----
Using ZOOKEEPER_CONNECT=172.17.0.3:2181
Using KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.8:9092
Contents of topic dbserver1.inventory.customers:
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1001}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"},"source":{"version":"{debezium-version}","name":"dbserver1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers"},"op":"c","ts_ms":1490634537160}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1002}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1002,"first_name":"George","last_name":"Bailey","email":"gbailey@foobar.com"},"source":{"version":"{debezium-version}","name":"dbserver1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers"},"op":"c","ts_ms":1490634537160}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1003}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com"},"source":{"version":"{debezium-version}","name":"dbserver1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers"},"op":"c","ts_ms":1490634537160}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1004}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"},"source":{"version":"{debezium-version}","name":"dbserver1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers"},"op":"c","ts_ms":1490634537160}}
----
[NOTE]
====
This utility keeps watching, so any new events would automatically appear as long as the utility keeps running. And this `watch-topic` utility is very simple and is limited in functionality and usefulness - we use it here simply to get an understanding of the kind of events that our connector generates. Applications that want to consume events would instead use Kafka consumers, and those consumer libraries offer far more flexibility and power. In fact, properly configured clients enable our applications to never miss any events, even when those applications crash or shutdown gracefullly.
====
These events happen to be encoded in JSON, since that's how we configured our Kafka Connect service. Each event includes one JSON document for the key, and one for the value. Let's look at the last event in more detail, by first reformatting the event's _key_ to be easier to read:
[source,json,indent=0]
----
{
"schema": {
"type": "struct",
"name": "dbserver1.inventory.customers.Key"
"optional": false,
"fields": [
{
"field": "id",
"type": "int32",
"optional": false
}
]
},
"payload": {
"id": 1004
}
}
----
The event's key has two parts: a `schema` and `payload`. The `schema` contains a Kafka Connect schema describing what is in the payload, and in our case that means that the `payload` is a struct named `dbserver1.inventory.customers.Key` that is not optional and has one required field named `id` of type `int32`.
If we look at the value of the key's `payload` field, we'll see that it is indeed a structure (which in JSON is just an object) with a single `id` field, whose value is `1004`.
Therefore, we interpret this event as applying to the row in the `inventory.customers` table (output from the connector named `dbserver1`) whose `id` primary key column had a value of `1004`.
Now let's look at the same event's _value_, which again we reformat to be easier to read:
[source,json,indent=0,subs="attributes"]
----
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "dbserver1.inventory.customers.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "dbserver1.inventory.customers.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "server_id"
},
{
"type": "int64",
"optional": false,
"field": "ts_sec"
},
{
"type": "string",
"optional": true,
"field": "gtid"
},
{
"type": "string",
"optional": false,
"field": "file"
},
{
"type": "int64",
"optional": false,
"field": "pos"
},
{
"type": "int32",
"optional": false,
"field": "row"
},
{
"type": "boolean",
"optional": true,
"field": "snapshot"
},
{
"type": "int64",
"optional": true,
"field": "thread"
},
{
"type": "string",
"optional": true,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "table"
}
],
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}
],
"optional": false,
"name": "dbserver1.inventory.customers.Envelope",
"version": 1
},
"payload": {
"before": null,
"after": {
"id": 1004,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": {
"version": "{debezium-version}",
"name": "dbserver1",
"server_id": 0,
"ts_sec": 0,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 154,
"row": 0,
"snapshot": true,
"thread": null,
"db": "inventory",
"table": "customers"
},
"op": "c",
"ts_ms": 1486500577691
}
}
----
This portion of the event is much larger, but like the event's _key_ this, too, has a `schema` and a `payload`. The `schema` contains a Kafka Connect schema named `dbserver1.inventory.customers.Envelope` (version 1) that can contain 5 fields:
* `op` is a mandatory field that contains a string value describing the type of operation. Values for the MySQL connector are `c` for create (or insert), `u` for update, `d` for delete, and `r` for read (in the case of a non-initial snapshot).
* `before` is an optional field that if present contains the state of the row _before_ the event occurred. The structure will be described by the `dbserver1.inventory.customers.Value` Kafka Connect schema, which the `dbserver1` connector uses for all rows in the `inventory.customers` table.
* `after` is an optional field that if present contains the state of the row _after_ the event occurred. The structure is described by the same `dbserver1.inventory.customers.Value` Kafka Connect schema used in `before`.
* `source` is a mandatory field that contains a structure describing the source metadata for the event, which in the case of MySQL contains several fields: the connector name, the name of the binlog file where the event was recorded, the position in that binlog file where the event appeared, the row within the event (if there is more than one), the names of the affected database and table, the MySQL thread ID that made the change, whether this event was part of a snapshot, and if available the MySQL server ID, and the timestamp in seconds.
* `ts_ms` is optional and if present contains the time (using the system clock in the JVM running the Kafka Connect task) at which the connector processed the event.
If we look at the `payload` of the event's _value_, we can see the information in the event, namely that it is describing that the row was created, contains the `id`, `first_name`, `last_name`, and `email` of the inserted row.
[TIP]
====
You may have noticed that the JSON representations of the events are much larger than the rows they describe. This is because Kafka Connect ships with every event key and value the _schema_ that describes the _payload_. Over time, this structure may change, and having the schemas for the key and value in the event itself makes it much easier for consuming applications to understand the messages, especially as they evolve over time.
The Debezium MySQL connector constructs these schemas based upon the structure of the database tables. If you use DDL statements to alter the table definitions in the MySQL databases, the connector reads these DDL statements and updates its Kafka Connect schemas. This is the only way that each event is structured exactly like the table from where it originated at the time the event occurred. But the Kafka topic containing all of the events for a single table might have events that correspond to each state of the table definition.
The JSON converter does produce very verbose events since it includes the key and value schemas in every message. The link:http://docs.confluent.io/3.1.2/schema-registry/docs/index.html[Avro converter], on the other hand, is far smarter and results in far smaller event messages. The Avro converter transforms each Kafka Connect schema into an Avro schema and stores the Avro schemas in a separate Schema Registry service. Thus when the Avro converter serializes an event message, it places only an unique identifier for the schema along with an Avro-encoded binary representation of the value. Thus, the serialized messages transferred over the wire and stored in Kafka are far smaller than they appear above. In fact, the Avro Converter is able to use Avro schema evolution techniques to maintain the history of each schema in the Schema Registry.
====
We can compare these to the state of the database. Go back to the terminal that is running the MySQL command line client, and run the following statement:
[source,sql,indent=0]
----
mysql> SELECT * FROM customers;
----
which produces the following output:
[source,sql,indent=0]
----
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
+------+------------+-----------+-----------------------+
4 rows in set (0.00 sec)
----
As we can see, all of our event records match the database.
Now that we're monitoring changes, what happens when we *change* one of the records in the database? Run the following statement in the MySQL command line client:
[source,sql,indent=0]
----
mysql> UPDATE customers SET first_name='Anne Marie' WHERE id=1004;
----
which produces the following output:
[source,indent=0]
----
Query OK, 1 row affected (0.05 sec)
Rows matched: 1 Changed: 1 Warnings: 0
----
Rerun the `select ...` statement to see the updated table:
[source,sql,indent=0]
----
mysql> select * from customers;
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne Marie | Kretchmar | annek@noanswer.org |
+------+------------+-----------+-----------------------+
4 rows in set (0.00 sec)
----
Now, go back to the terminal running `watch-topic` and we should see a _new_ fifth event:
[source,json,indent=0,subs="attributes"]
----
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1004}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"},"after":{"id":1004,"first_name":"Anne Marie","last_name":"Kretchmar","email":"annek@noanswer.org"},"source":{"version":"{debezium-version}","name":"dbserver1","server_id":223344,"ts_sec":1490635059,"gtid":null,"file":"mysql-bin.000003","pos":364,"row":0,"snapshot":null,"thread":3,"db":"inventory","table":"customers"},"op":"u","ts_ms":1490635059389}}
----
Let's reformat the new event's _key_ to be easier to read:
[source,json,indent=0]
----
{
"schema": {
"type": "struct",
"name": "dbserver1.inventory.customers.Key"
"optional": false,
"fields": [
{
"field": "id",
"type": "int32",
"optional": false
}
]
},
"payload": {
"id": 1004
}
}
----
This key is exactly the same key as what we saw in the fourth record. Here's that new event's _value_ formatted to be easier to read:
[source,json,indent=0,subs="attributes"]
----
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "dbserver1.inventory.customers.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "dbserver1.inventory.customers.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "server_id"
},
{
"type": "int64",
"optional": false,
"field": "ts_sec"
},
{
"type": "string",
"optional": true,
"field": "gtid"
},
{
"type": "string",
"optional": false,
"field": "file"
},
{
"type": "int64",
"optional": false,
"field": "pos"
},
{
"type": "int32",
"optional": false,
"field": "row"
},
{
"type": "boolean",
"optional": true,
"field": "snapshot"
},
{
"type": "int64",
"optional": true,
"field": "thread"
},
{
"type": "string",
"optional": true,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "table"
}
],
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}
],
"optional": false,
"name": "dbserver1.inventory.customers.Envelope",
"version": 1
},
"payload": {
"before": {
"id": 1004,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"after": {
"id": 1004,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": {
"name": "{debezium-version}",
"name": "dbserver1",
"server_id": 223344,
"ts_sec": 1486501486,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 364,
"row": 0,
"snapshot": null,
"thread": 3,
"db": "inventory",
"table": "customers"
},
"op": "u",
"ts_ms": 1486501486308
}
}
----
When we compare this to the value in the fourth event, we see no changes in the `schema` section and a couple of changes in the `payload` section:
* The `op` field value is now `u`, signifying that this row changed because of an update
* The `before` field now has the state of the row with the values before the database commit
* The `after` field now has the updated state of the row, and here was can see that the `first_name` value is now `Anne Marie`.
* The `source` field structure has many of the same values as before, except the `ts_sec` and `pos` fields have changed (and the `file` might have changed in other circumstances).
* The `ts_ms` shows the timestamp that Debezium processed this event.
There are several things we can learn by just looking at this `payload` section. We can compare the `before` and `after` structures to determine what actually changed in this row because of the commit. The `source` structure tells us information about MySQL's record of this change (providing traceability), but more importantly this has information we can compare to other events in this and other topics to know whether this event occurred before, after, or as part of the same MySQL commit as other events.
So far we've seen samples of _create_ and _update_ events. Now, let's look at _delete_ events. Since Anne Marie has not placed any orders, we can remove her record from our database using the MySQL command line client:
[source,sql,indent=0]
----
mysql> DELETE FROM customers WHERE id=1004;
----
In our terminal running `watch-topic`, we see _two_ new events:
[source,json,indent=0,subs="attributes"]
----
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1004}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":{"id":1004,"first_name":"Anne Marie","last_name":"Kretchmar","email":"annek@noanswer.org"},"after":null,"source":{"version":"{debezium-version}","name":"dbserver1","server_id":223344,"ts_sec":1490635100,"gtid":null,"file":"mysql-bin.000003","pos":725,"row":0,"snapshot":null,"thread":3,"db":"inventory","table":"customers"},"op":"d","ts_ms":1490635100301}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1004}} {"schema":null,"payload":null}
----
What happened? We only deleted one row, but we now have two events. To understand what the MySQL connector does, let's look at the first of our two new messages. Here's the _key_ reformatted to be easier to read:
[source,json,indent=0]
----
{
"schema": {
"type": "struct",
"name": "dbserver1.inventory.customers.Key"
"optional": false,
"fields": [
{
"field": "id",
"type": "int32",
"optional": false
}
]
},
"payload": {
"id": 1004
}
}
----
Once again, this key is exactly the same key as in the previous two events we looked at. Here's the _value_ of the first new event, formatted to be easier to read:
[source,json,indent=0,subs="attributes"]
----
{
"schema": {...},
"payload": {
"before": {
"id": 1004,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"after": null,
"source": {
"name": "{debezium-version}",
"name": "dbserver1",
"server_id": 223344,
"ts_sec": 1486501558,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 725,
"row": 0,
"snapshot": null,
"thread": 3,
"db": "inventory",
"table": "customers"
},
"op": "d",
"ts_ms": 1486501558315
}
----
Again, the `schema` is identical to the previous messages, but the `payload` fragment has a few things of note:
* The `op` field value is now `d`, signifying that this row was deleted
* The `before` field now has the state of the row that was deleted with the database commit
* The `after` field is null, signifying that the row no longer exists
* The `source` field structure has many of the same values as before, except the `ts_sec` and `pos` fields have changed (and the `file` might have changed in other circumstances).
* The `ts_ms` shows the timestamp that Debezium processed this event.
This event gives a consumer all kinds of information that it can use to process the removal of this row. We include the old values because some consumers might require them in order to properly handle the removal, and without it they may have to resort to far more complex behavior.
Remember that we saw two events when we deleted the row? Let's look at that second event. Here's the _key_ for the event:
[source,json,indent=0]
----
{
"schema": {
"type": "struct",
"name": "dbserver1.inventory.customers.Key"
"optional": false,
"fields": [
{
"field": "id",
"type": "int32",
"optional": false
}
]
},
"payload": {
"id": 1004
}
}
----
Once again, this key is exactly the same key as in the previous three events we looked at. Here's the _value_ of that same event:
[source,json,indent=0]
----
{
"schema": null,
"payload": null
}
----
What gives? Well, all of the Kafka topics that the MySQL connector writes to can be set up to be _log compacted_, which means that Kafka can remove older messages from the topic as long as there is at least one message later in the topic with the exact same key. This is Kafka's way to collect the garbage. This last event is what Debezium calls a _tombstone_ event, and because it has a key and an empty value Kafka understands it can remove all prior messages with this same key.
Kafka log compaction is great, because it still allows consumers to read the topic from the very beginning and not miss any events.
[[restart-kafka-connect]]
=== Restart the Kafka Connect service
One feature of the Kafka Connect service is that it automatically manages tasks for the registered connectors. And, because it stores its data in Kafka, if a running service stops or goes away completely, upon restart (perhaps on another host) the server will start any non-running tasks. To demostrate this, let's stop our Kafka Connect service, change some data in the database, and restart our service.
In a new terminal, use the following Docker commands to stop the `connect` container that is running our Kafka Connect service:
[source,bash,indent=0]
----
$ docker stop connect
----
Stopping the container like this stops the process running inside of it, but the Kafka Connect service handles this by gracefully shutting down. And because we ran the container with the `--rm` flag, Docker removed the container after it stopped it.
While the service is down, let's go back to the MySQL command line client and add a few records:
[source,sql,indent=0]
----
mysql> INSERT INTO customers VALUES (default, "Sarah", "Thompson", "kitt@acme.com");
mysql> INSERT INTO customers VALUES (default, "Kenneth", "Anderson", "kander@acme.com");
----
Notice that in the terminal where we're running `watch-topic`, there's been no update. Also, we're still able to watch the topic because Kafka is still running.
[TIP]
====
In a production system, you would have enough brokers to handle the producers and consumers, and to maintain a minimum number of in sync replicas for each topic. So if enough brokers fail such that there are not the minimum number of ISRs, Kafka should become unavailable. Producers, like the Debezium connectors, and consumers will simply wait patiently for the Kafka cluster or network to recover. Yes, that means that your consumers might temporarily see no change events as data is changed in the databases, but that's because none are being produced. As soon as the Kafka cluster is restarted or the network recovers, Debezium will continue producing change events and your consumers will continue consuming events where they left off.
====
Now, in a new terminal, start a new container using the _same_ command we used before:
[source,bash,subs="attributes"]
----
$ docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:{debezium-docker-label}
----
This creates a whole new container that runs the Kafka Connect distributed service, and since we've intialized it with the same topic information, the new service connects to Kafka, read the previous service's configuration, and starts the registered connectors that will continue exactly where they last left off.
Here's the last few lines from this restarted service:
[listing,indent=0,options="nowrap"]
----
...
2017-09-21 07:38:48,385 INFO MySQL|dbserver1|task Kafka version : 0.11.0.0 [org.apache.kafka.common.utils.AppInfoParser]
2017-09-21 07:38:48,386 INFO MySQL|dbserver1|task Kafka commitId : cb8625948210849f [org.apache.kafka.common.utils.AppInfoParser]
2017-09-21 07:38:48,390 INFO MySQL|dbserver1|task Discovered coordinator 172.17.0.4:9092 (id: 2147483646 rack: null) for group inventory-connector-dbhistory. [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
2017-09-21 07:38:48,390 INFO MySQL|dbserver1|task Revoking previously assigned partitions [] for group inventory-connector-dbhistory [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2017-09-21 07:38:48,391 INFO MySQL|dbserver1|task (Re-)joining group inventory-connector-dbhistory [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
2017-09-21 07:38:48,402 INFO MySQL|dbserver1|task Successfully joined group inventory-connector-dbhistory with generation 1 [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
2017-09-21 07:38:48,403 INFO MySQL|dbserver1|task Setting newly assigned partitions [dbhistory.inventory-0] for group inventory-connector-dbhistory [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2017-09-21 07:38:48,888 INFO MySQL|dbserver1|task Step 0: Get all known binlogs from MySQL [io.debezium.connector.mysql.MySqlConnectorTask]
2017-09-21 07:38:48,903 INFO MySQL|dbserver1|task MySQL has the binlog file 'mysql-bin.000003' required by the connector [io.debezium.connector.mysql.MySqlConnectorTask]
Sep 21, 2017 7:38:49 AM com.github.shyiko.mysql.binlog.BinaryLogClient connect
INFO: Connected to mysql:3306 at mysql-bin.000003/154 (sid:184054, cid:10)
2017-09-21 07:38:49,045 INFO MySQL|dbserver1|binlog Connected to MySQL binlog at mysql:3306, starting at binlog file 'mysql-bin.000003', pos=154, skipping 0 events plus 0 rows [io.debezium.connector.mysql.BinlogReader]
2017-09-21 07:38:49,046 INFO || Source task WorkerSourceTask{id=inventory-connector-0} finished initialization and start [org.apache.kafka.connect.runtime.WorkerSourceTask]
----
As you can see, these lines show that the service finds the offsets previously recorded by the last task before it was shut down, and that it then connects to the MySQL database, starts reading the binlog from that position, and generates events from any changes in the MySQL database since that point in time.
Jump back to the terminal running `watch-topic`, and you should now see events for our two new records:
[source,json,indent=0,subs="attributes"]
----
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1005}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1005,"first_name":"Sarah","last_name":"Thompson","email":"kitt@acme.com"},"source":{"version":"{debezium-version}","name":"dbserver1","server_id":223344,"ts_sec":1490635153,"gtid":null,"file":"mysql-bin.000003","pos":1046,"row":0,"snapshot":null,"thread":3,"db":"inventory","table":"customers"},"op":"c","ts_ms":1490635181455}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1006}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1006,"first_name":"Kenneth","last_name":"Anderson","email":"kander@acme.com"},"source":{"version":"{debezium-version}","name":"dbserver1","server_id":223344,"ts_sec":1490635160,"gtid":null,"file":"mysql-bin.000003","pos":1356,"row":0,"snapshot":null,"thread":3,"db":"inventory","table":"customers"},"op":"c","ts_ms":1490635181456}}
----
These events are _create_ events that are similar to what we saw before. The important point to understand, though, is that Debezium will still report all of the changes in a database even when it is not running, as long as it is restarted before the MySQL database starts purging those commits we missed from its binlog.
[[exploration]]
=== Exploration
Go ahead and use the MySQL command line client to add, modify, and remove rows to the database tables, and see the effect on the topics. You may need to run a separate `watch-topic` command for each topic. And remember that you can't remove a row that is referenced by a foreign key. Have fun!
[[cleanup]]
=== Clean up
You can use Docker to stop all of the running containers:
[source,bash,indent=0]
----
$ docker stop mysqlterm watcher connect mysql kafka zookeeper
----
Again, since we used the `--rm` flag when starting the connectors, Docker should remove them right after it stops them. We can verify that all of the other processes are stopped and removed:
[source,bash,indent=0]
----
$ docker ps -a
----
Of course, if any are still running, simply stop them using `docker stop <name>` or `docker stop <containerId>`.
[[docker-compose]]
== Docker Compose setup
If you have already completed the tutorial and you would like to go again through the setup quickly,
then you can use a https://docs.docker.com/compose/[Docker Compose] version of this tutorial located in our https://github.com/debezium/debezium-examples/tree/master/tutorial[examples repository].
We provide Docker Compose files for running the tutorial with MySQL, Postgres, MongoDB, SQL Server and Oracle.
Please follow the steps described in the https://github.com/debezium/debezium-examples/blob/master/tutorial/README.md[readme file].