diff --git a/debezium-testing/debezium-testing-testcontainers/src/test/java/io/debezium/testing/testcontainers/ApicurioRegistryTest.java b/debezium-testing/debezium-testing-testcontainers/src/test/java/io/debezium/testing/testcontainers/ApicurioRegistryTest.java index 2622c3d81..d7614ed21 100644 --- a/debezium-testing/debezium-testing-testcontainers/src/test/java/io/debezium/testing/testcontainers/ApicurioRegistryTest.java +++ b/debezium-testing/debezium-testing-testcontainers/src/test/java/io/debezium/testing/testcontainers/ApicurioRegistryTest.java @@ -42,18 +42,20 @@ import com.jayway.jsonpath.JsonPath; /** - * An integration test veryfing the Apicurio registry is interoperable with Debezium + * An integration test verifying the Apicurio registry is interoperable with Debezium * * @author Jiri Pechanec - * */ public class ApicurioRegistryTest { + private static final String DEBEZIUM_VERSION = "1.1.1.Final"; + private static final String APICURIO_VERSION = "1.1.2.Final"; + private static final Logger LOGGER = LoggerFactory.getLogger(ApicurioRegistryTest.class); private static Network network = Network.newNetwork(); - private static GenericContainer apicurioContainer = new GenericContainer("apicurio/apicurio-registry-mem:1.1.2.Final") + private static GenericContainer apicurioContainer = new GenericContainer<>("apicurio/apicurio-registry-mem:" + APICURIO_VERSION) .withNetwork(network) .withExposedPorts(8080) .waitingFor(new LogMessageWaitStrategy().withRegEx(".*apicurio-registry-app.*started in.*")); @@ -67,9 +69,9 @@ public class ApicurioRegistryTest { public static ImageFromDockerfile apicurioDebeziumImage = new ImageFromDockerfile() .withDockerfileFromBuilder(builder -> builder - .from("debezium/connect:1.1.1.Final") + .from("debezium/connect:" + DEBEZIUM_VERSION) .env("KAFKA_CONNECT_DEBEZIUM_DIR", "$KAFKA_CONNECT_PLUGINS_DIR/debezium-connector-postgres") - .env("APICURIO_VERSION", "1.1.2.Final") + .env("APICURIO_VERSION", APICURIO_VERSION) .run("cd $KAFKA_CONNECT_DEBEZIUM_DIR && curl https://repo1.maven.org/maven2/io/apicurio/apicurio-registry-distro-connect-converter/$APICURIO_VERSION/apicurio-registry-distro-connect-converter-$APICURIO_VERSION-converter.tar.gz | tar xzv") .build()); @@ -150,8 +152,8 @@ public void shouldConvertToAvro() throws Exception { List> changeEvents = drain(consumer, 1); // Verify magic byte of Avro messages - assertThat(((byte[]) changeEvents.get(0).key())[0]).isZero(); - assertThat(((byte[]) changeEvents.get(0).value())[0]).isZero(); + assertThat(changeEvents.get(0).key()[0]).isZero(); + assertThat(changeEvents.get(0).value()[0]).isZero(); consumer.unsubscribe(); } @@ -198,7 +200,7 @@ private List> drain(KafkaConsumer consumer, int e private ConnectorConfiguration getConfiguration(int id, String converter, String... options) { final String host = apicurioContainer.getContainerInfo().getConfig().getHostName(); - final int port = (Integer) apicurioContainer.getExposedPorts().get(0); + final int port = apicurioContainer.getExposedPorts().get(0); final String apicurioUrl = "http://" + host + ":" + port; // host, database, user etc. are obtained from the container diff --git a/documentation/modules/ROOT/pages/configuration/avro.adoc b/documentation/modules/ROOT/pages/configuration/avro.adoc index d24abe19b..39e96d3b6 100644 --- a/documentation/modules/ROOT/pages/configuration/avro.adoc +++ b/documentation/modules/ROOT/pages/configuration/avro.adoc @@ -10,40 +10,51 @@ toc::[] Debezium connectors are used with the Kafka Connect framework to capture changes in databases and generate change events. The Kafka Connect workers then apply to each of the messages generated by the connector the transformations configured for the connector, -serialize each message key and value into a binary form using the worker's http://docs.confluent.io/current/connect/concepts.html#connect-converters[_converters_], +serialize each message key and value into a binary form using the configured https://kafka.apache.org/documentation/#connect_running[_converters_], and finally write each messages into the correct Kafka topic. -The converters are specified in the Kafka Connect worker configuration, and the same converters are used for all connectors deployed to that worker's cluster. +The converters can either be specified in the Kafka Connect worker configuration, +in which case the same converters are used for all connectors deployed to that worker's cluster. +Alternatively, they can be specified for an individual connector. Kafka Connect comes with a _JSON converter_ that serializes the message keys and values into JSON documents. The JSON converter can be configured to include or exclude the message schema using the (`key.converter.schemas.enable` and `value.converter.schemas.enable`) properties. Our xref:tutorial.adoc[tutorial] shows what the messages look like when both payload and schemas are included, but the schemas make the messages very verbose. If you want your messages serialized with JSON, consider setting these properties to `false` to exclude the verbose schema information. -Another option is to serialize the message keys and values using https://avro.apache.org/[Apache Avro]. +Alternatively, you can serialize the message keys and values using https://avro.apache.org/[Apache Avro]. The Avro binary format is extremely compact and efficient, and Avro schemas make it possible to ensure that the messages have the correct structure. Avro's schema evolution mechanism makes it possible to evolve the schemas over time, which is essential for Debezium connectors that dynamically generate the message schemas to match the structure of the database tables. Over time, the change events captured by Debezium connectors and written by Kafka Connect into a topic may have different versions of the same schema, and Avro serialization makes it far easier for consumers to adapt to the changing schema. -Open source project https://github.com/Apicurio/apicurio-registry[Apicurio Registry] provides several components that work with Avro: +The open-source project https://github.com/Apicurio/apicurio-registry[Apicurio Registry] provides several components that work with Avro: -* An Avro Converter that can be used in Kafka Connect workers to map the Kafka Connect schemas into Avro schemas and to then use those Avro schemas to serialize the message keys and values into the very compact Avro binary form. +* An Avro converter that can be used in Kafka Connect workers to map the Kafka Connect schemas into Avro schemas and to then use those Avro schemas to serialize the message keys and values into the very compact Avro binary form. * An API/Schema registry that tracks all of the Avro schemas used in Kafka topics, and where the Avro Converter sends the generated Avro schemas. Since the Avro schemas are stored in this registry, each message need only include a tiny _schema identifier_. This makes each message even smaller, and for an I/O bound system like Kafka this means more total throughput of the producers and consumers. * Avro _Serdes_ (serializers and deserializers) for Kafka producers and consumers. -Any Kafka consumer applications you write to consume change events can use the Avro serdes to deserialize the changes events. +Any Kafka consumer applications you write to consume change events can use the Avro Serdes to deserialize the changes events. You can install them into any Kafka distribution and use them with Kafka Connect. +[NOTE] +==== +The Apicurio project also provides a JSON converter that can be used with the Apicurio registry. +This combines the advantage of less verbose messages (as messages don't contain the schema information themselves, but only a schema id) +with human-readable JSON. +==== + +Another option is using the Confluent schema registry, which is described further below. + == Technical Information A system that wants to use Avro serialization needs to complete two steps: * Deploy a https://github.com/Apicurio/apicurio-registry[Apicurio API/Schema Registry] instance -* Install Avro converter from https://repo1.maven.org/maven2/io/apicurio/apicurio-registry-distro-connect-converter/{apicurio-version}/apicurio-registry-distro-connect-converter-{apicurio-version}-converter.tar.gz[the installation package] into Kafka `libs` directory or directly into a plug-in directory -* Use these properties to configure Apache Connect instance +* Install the Avro converter from https://repo1.maven.org/maven2/io/apicurio/apicurio-registry-distro-connect-converter/{apicurio-version}/apicurio-registry-distro-connect-converter-{apicurio-version}-converter.tar.gz[the installation package] into Kafka's _libs_ directory or directly into a plug-in directory +* Use thes the following properties to configure Apache Connect instance [source] ---- @@ -59,20 +70,13 @@ value.converter.apicurio.registry.converter.deserializer=io.apicurio.registry.ut value.converter.apicurio.registry.global-id=io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy ---- -Note: In addition to setting key/value converters, it is *strongly recommended* to set internal key/value converters to use JSON converters for easier analysis of stored configuration and offsets. -If you would still prefer to use Avro converter it is not possible now due to a https://issues.apache.org/jira/browse/KAFKA-3988[known issue]. +Note that Kafka Connect internally always uses the JSON key/value converters for storing configuration and offsets. -[source] ----- -internal.key.converter=org.apache.kafka.connect.json.JsonConverter -internal.value.converter=org.apache.kafka.connect.json.JsonConverter ----- - -== Debezium Docker Images +== Debezium Container Images See the https://github.com/debezium/debezium-examples/tree/master/tutorial#using-mysql-and-apicurio-registry[MySQL and the Avro message format] tutorial example for a quickstart with MySQL. -Deploy an Apicurio Registry instance (this example uses non-production memory only instance): +Deploy an Apicurio Registry instance (this example uses a non-production in-memory instance): [source] ---- @@ -103,8 +107,6 @@ docker run -it --rm --name connect \ -e OFFSET_STORAGE_TOPIC=my_connect_offsets \ -e KEY_CONVERTER=io.confluent.connect.avro.AvroConverter \ -e VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter \ - -e INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter \ - -e INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter \ -e CONNECT_KEY_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \ -e CONNECT_KEY_CONVERTER_APICURIO.REGISTRY_URL=http://apicurio:8080 \ -e CONNECT_KEY_CONVERTER_APICURIO.REGISTRY_CONVERTER.SERIALIZER=io.apicurio.registry.utils.serde.AvroKafkaSerializer \ @@ -130,17 +132,12 @@ Debezium uses the column's name as the basis for the Avro field. This can lead to problems during serialization if the column name does not also adhere to the Avro naming rules above. Debezium provides a configuration option, `sanitize.field.names` that can be set to `true` if you have columns that do not adhere to the rule-set above, allowing those fields to be serialized without having to actually modify your schema. -== Getting More Information - -link:/blog/2016/09/19/Serializing-Debezium-events-with-Avro/[This post] from the Debezium blog -describes the concepts of serializers, converters etc. and discusses the advantages of using Avro. - == Confluent Schema Registry -There is an alternative https://github.com/confluentinc/schema-registry[Schema Registry] implementation developed by Confluent. +There is an alternative https://github.com/confluentinc/schema-registry[schema registry] implementation developed by Confluent. The configuration is slightly different. -Properties that should be used: +Here are the properties that should be used: [source] ---- @@ -150,7 +147,7 @@ value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081 ---- -Deployment of Schema Registry instance: +An instance of the Confluent Schema Registry can be deployed like so: [source] ---- @@ -177,8 +174,6 @@ docker run -it --rm --name connect \ -e OFFSET_STORAGE_TOPIC=my_connect_offsets \ -e KEY_CONVERTER=io.confluent.connect.avro.AvroConverter \ -e VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter \ - -e INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter \ - -e INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter \ -e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 \ -e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 \ -p 8083:8083 debezium/connect:{debezium-docker-label} @@ -203,4 +198,11 @@ docker run -it --rm --name avro-consumer \ --topic db.myschema.mytable ---- -For more details the https://github.com/debezium/debezium-examples/tree/master/tutorial#using-mysql-and-the-avro-message-format[MySQL and the Avro message format] tutorial example for a quickstart with MySQL. +== Getting More Information + +link:/blog/2016/09/19/Serializing-Debezium-events-with-Avro/[This post] from the Debezium blog +describes the concepts of serializers, converters etc. and discusses the advantages of using Avro. +Note that some details around Kafka Connect converters have slightly changed since the time of writing this post. + +For a complete example of using Avro as the message format for Debezium data change events, +please see the https://github.com/debezium/debezium-examples/tree/master/tutorial#using-mysql-and-the-avro-message-format[MySQL and the Avro message format] tutorial example.