From b80f06413114175f904682ca8ba58cffdba3645e Mon Sep 17 00:00:00 2001 From: Gunnar Morling Date: Mon, 22 Jun 2020 09:56:12 +0200 Subject: [PATCH] DBZ-2250 Allowing to use CloudEventsConverter with Avro via Apicurio --- .../config/CommonConnectorConfig.java | 8 ++- .../converters/CloudEventsConverter.java | 22 ++++++-- .../testcontainers/ApicurioRegistryTest.java | 53 ++++++++++++++++++- 3 files changed, 74 insertions(+), 9 deletions(-) diff --git a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java index cf4ee0840..8ce66afa4 100644 --- a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java +++ b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java @@ -230,6 +230,9 @@ public static BinaryHandlingMode parse(String value, String defaultValue) { } } + private static String CONFLUENT_AVRO_CONVERTER = "io.confluent.connect.avro.AvroConverter"; + private static String APICURIO_AVRO_CONVERTER = "io.apicurio.registry.utils.converter.AvroConverter"; + public static final int DEFAULT_MAX_QUEUE_SIZE = 8192; public static final int DEFAULT_MAX_BATCH_SIZE = 2048; public static final long DEFAULT_POLL_INTERVAL_MILLIS = 500; @@ -548,10 +551,11 @@ private static int validateSkippedOperation(Configuration config, Field field, V } private static boolean isUsingAvroConverter(Configuration config) { - final String avroConverter = "io.confluent.connect.avro.AvroConverter"; final String keyConverter = config.getString("key.converter"); final String valueConverter = config.getString("value.converter"); - return avroConverter.equals(keyConverter) || avroConverter.equals(valueConverter); + + return CONFLUENT_AVRO_CONVERTER.equals(keyConverter) || CONFLUENT_AVRO_CONVERTER.equals(valueConverter) + || APICURIO_AVRO_CONVERTER.equals(keyConverter) || APICURIO_AVRO_CONVERTER.equals(valueConverter); } protected static int validateServerNameIsDifferentFromHistoryTopicName(Configuration config, Field field, ValidationOutput problems) { diff --git a/debezium-core/src/main/java/io/debezium/converters/CloudEventsConverter.java b/debezium-core/src/main/java/io/debezium/converters/CloudEventsConverter.java index 6265b16b0..16e8ed3d5 100644 --- a/debezium-core/src/main/java/io/debezium/converters/CloudEventsConverter.java +++ b/debezium-core/src/main/java/io/debezium/converters/CloudEventsConverter.java @@ -75,8 +75,11 @@ public class CloudEventsConverter implements Converter { /** * Instantiated reflectively to avoid hard dependency to Avro converter. */ - private static final String AVRO_CONVERTER_CLASS = "io.confluent.connect.avro.AvroConverter"; - private static final String SCHEMA_REGISTRY_URL_CONFIG = "schema.registry.url"; + private static final String CONFLUENT_AVRO_CONVERTER_CLASS = "io.confluent.connect.avro.AvroConverter"; + private static final String CONFLUENT_SCHEMA_REGISTRY_URL_CONFIG = "schema.registry.url"; + + private static String APICURIO_AVRO_CONVERTER_CLASS = "io.apicurio.registry.utils.converter.AvroConverter"; + private static final String APICURIO_SCHEMA_REGISTRY_URL_CONFIG = "apicurio.registry.url"; /** * Suffix appended to schema names of data schema in case of Avro/Avro, to keep @@ -153,13 +156,22 @@ public void configure(Map configs, boolean isKey) { if (usingAvro) { Configuration avroConfig = Configuration.from(configs).subset("avro", true); - schemaRegistryUrls = avroConfig.getStrings(SCHEMA_REGISTRY_URL_CONFIG, ","); + boolean useApicurio = true; + if (avroConfig.hasKey(APICURIO_SCHEMA_REGISTRY_URL_CONFIG)) { + schemaRegistryUrls = avroConfig.getStrings(APICURIO_SCHEMA_REGISTRY_URL_CONFIG, ","); + } + else if (avroConfig.hasKey(CONFLUENT_SCHEMA_REGISTRY_URL_CONFIG)) { + schemaRegistryUrls = avroConfig.getStrings(CONFLUENT_SCHEMA_REGISTRY_URL_CONFIG, ","); + useApicurio = false; + } + if (schemaRegistryUrls == null || schemaRegistryUrls.isEmpty()) { - throw new DataException("Need URL(s) for schema registry instances for CloudEvents"); + throw new DataException("Need URL(s) for schema registry instances for CloudEvents when using Apache Avro"); } if (avroConverter == null) { - avroConverter = Instantiator.getInstance(AVRO_CONVERTER_CLASS, null, null); + avroConverter = Instantiator.getInstance(useApicurio ? APICURIO_AVRO_CONVERTER_CLASS : CONFLUENT_AVRO_CONVERTER_CLASS, null, null); + LOGGER.info("Using Avro converter {}", avroConverter.getClass().getName()); avroConverter.configure(avroConfig.asMap(), false); } } diff --git a/debezium-testing/debezium-testing-testcontainers/src/test/java/io/debezium/testing/testcontainers/ApicurioRegistryTest.java b/debezium-testing/debezium-testing-testcontainers/src/test/java/io/debezium/testing/testcontainers/ApicurioRegistryTest.java index a465ef34e..4977c056b 100644 --- a/debezium-testing/debezium-testing-testcontainers/src/test/java/io/debezium/testing/testcontainers/ApicurioRegistryTest.java +++ b/debezium-testing/debezium-testing-testcontainers/src/test/java/io/debezium/testing/testcontainers/ApicurioRegistryTest.java @@ -14,6 +14,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Base64; import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -48,7 +49,8 @@ */ public class ApicurioRegistryTest { - private static final String DEBEZIUM_VERSION = "1.2.0.CR1"; + // TODO: revert to upstream version + private static final String DEBEZIUM_VERSION = "1.2"; private static final String APICURIO_VERSION = "1.2.2.Final"; private static final Logger LOGGER = LoggerFactory.getLogger(ApicurioRegistryTest.class); @@ -69,7 +71,7 @@ public class ApicurioRegistryTest { public static ImageFromDockerfile apicurioDebeziumImage = new ImageFromDockerfile() .withDockerfileFromBuilder(builder -> builder - .from("debezium/connect:" + DEBEZIUM_VERSION) + .from("gunnarmorling/debezium-connect:" + DEBEZIUM_VERSION) .env("KAFKA_CONNECT_DEBEZIUM_DIR", "$KAFKA_CONNECT_PLUGINS_DIR/debezium-connector-postgres") .env("APICURIO_VERSION", APICURIO_VERSION) .run("cd $KAFKA_CONNECT_DEBEZIUM_DIR && curl https://repo1.maven.org/maven2/io/apicurio/apicurio-registry-distro-connect-converter/$APICURIO_VERSION/apicurio-registry-distro-connect-converter-$APICURIO_VERSION-converter.tar.gz | tar xzv") @@ -155,6 +157,53 @@ public void shouldConvertToAvro() throws Exception { } } + @Test + public void shouldConvertToCloudEventWithDataAsAvro() throws Exception { + try (Connection connection = getConnection(postgresContainer); + Statement statement = connection.createStatement(); + KafkaConsumer consumer = getConsumerString(kafkaContainer)) { + + statement.execute("drop schema if exists todo cascade"); + statement.execute("create schema todo"); + statement.execute("create table todo.Todo (id int8 not null, title varchar(255), primary key (id))"); + statement.execute("alter table todo.Todo replica identity full"); + statement.execute("insert into todo.Todo values (3, 'Be Awesome')"); + + final String host = apicurioContainer.getContainerInfo().getConfig().getHostName(); + final int port = apicurioContainer.getExposedPorts().get(0); + final String apicurioUrl = "http://" + host + ":" + port + "/api"; + String id = "3"; + + // host, database, user etc. are obtained from the container + final ConnectorConfiguration config = ConnectorConfiguration.forJdbcContainer(postgresContainer) + .with("database.server.name", "dbserver" + id) + .with("slot.name", "debezium_" + id) + .with("key.converter", "org.apache.kafka.connect.json.JsonConverter") + .with("value.converter", "io.debezium.converters.CloudEventsConverter") + .with("value.converter.data.serializer.type", "avro") + .with("value.converter.avro.apicurio.registry.url", apicurioUrl) + .with("value.converter.avro.apicurio.registry.global-id", "io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy"); + + debeziumContainer.registerConnector("my-connector-cloudevents-avro", config); + + consumer.subscribe(Arrays.asList("dbserver3.todo.todo")); + + List> changeEvents = drain(consumer, 1); + + assertThat(JsonPath. read(changeEvents.get(0).key(), "$.payload.id")).isEqualTo(3); + assertThat(JsonPath. read(changeEvents.get(0).value(), "$.iodebeziumop")).isEqualTo("r"); + assertThat(JsonPath. read(changeEvents.get(0).value(), "$.iodebeziumname")).isEqualTo("dbserver3"); + assertThat(JsonPath. read(changeEvents.get(0).value(), "$.datacontenttype")).isEqualTo("application/avro"); + assertThat(JsonPath. read(changeEvents.get(0).value(), "$.iodebeziumtable")).isEqualTo("todo"); + + // Verify magic byte of Avro messages + byte[] decodedBytes = Base64.getDecoder().decode(JsonPath. read(changeEvents.get(0).value(), "$.data")); + assertThat(decodedBytes[0]).isZero(); + + consumer.unsubscribe(); + } + } + private Connection getConnection(PostgreSQLContainer postgresContainer) throws SQLException { return DriverManager.getConnection(postgresContainer.getJdbcUrl(), postgresContainer.getUsername(), postgresContainer.getPassword());