DBZ-2250 Allowing to use CloudEventsConverter with Avro via Apicurio

This commit is contained in:
Gunnar Morling 2020-06-22 09:56:12 +02:00 committed by Jiri Pechanec
parent b42ef797ab
commit b80f064131
3 changed files with 74 additions and 9 deletions

View File

@ -230,6 +230,9 @@ public static BinaryHandlingMode parse(String value, String defaultValue) {
}
}
private static String CONFLUENT_AVRO_CONVERTER = "io.confluent.connect.avro.AvroConverter";
private static String APICURIO_AVRO_CONVERTER = "io.apicurio.registry.utils.converter.AvroConverter";
public static final int DEFAULT_MAX_QUEUE_SIZE = 8192;
public static final int DEFAULT_MAX_BATCH_SIZE = 2048;
public static final long DEFAULT_POLL_INTERVAL_MILLIS = 500;
@ -548,10 +551,11 @@ private static int validateSkippedOperation(Configuration config, Field field, V
}
private static boolean isUsingAvroConverter(Configuration config) {
final String avroConverter = "io.confluent.connect.avro.AvroConverter";
final String keyConverter = config.getString("key.converter");
final String valueConverter = config.getString("value.converter");
return avroConverter.equals(keyConverter) || avroConverter.equals(valueConverter);
return CONFLUENT_AVRO_CONVERTER.equals(keyConverter) || CONFLUENT_AVRO_CONVERTER.equals(valueConverter)
|| APICURIO_AVRO_CONVERTER.equals(keyConverter) || APICURIO_AVRO_CONVERTER.equals(valueConverter);
}
protected static int validateServerNameIsDifferentFromHistoryTopicName(Configuration config, Field field, ValidationOutput problems) {

View File

@ -75,8 +75,11 @@ public class CloudEventsConverter implements Converter {
/**
* Instantiated reflectively to avoid hard dependency to Avro converter.
*/
private static final String AVRO_CONVERTER_CLASS = "io.confluent.connect.avro.AvroConverter";
private static final String SCHEMA_REGISTRY_URL_CONFIG = "schema.registry.url";
private static final String CONFLUENT_AVRO_CONVERTER_CLASS = "io.confluent.connect.avro.AvroConverter";
private static final String CONFLUENT_SCHEMA_REGISTRY_URL_CONFIG = "schema.registry.url";
private static String APICURIO_AVRO_CONVERTER_CLASS = "io.apicurio.registry.utils.converter.AvroConverter";
private static final String APICURIO_SCHEMA_REGISTRY_URL_CONFIG = "apicurio.registry.url";
/**
* Suffix appended to schema names of data schema in case of Avro/Avro, to keep
@ -153,13 +156,22 @@ public void configure(Map<String, ?> configs, boolean isKey) {
if (usingAvro) {
Configuration avroConfig = Configuration.from(configs).subset("avro", true);
schemaRegistryUrls = avroConfig.getStrings(SCHEMA_REGISTRY_URL_CONFIG, ",");
boolean useApicurio = true;
if (avroConfig.hasKey(APICURIO_SCHEMA_REGISTRY_URL_CONFIG)) {
schemaRegistryUrls = avroConfig.getStrings(APICURIO_SCHEMA_REGISTRY_URL_CONFIG, ",");
}
else if (avroConfig.hasKey(CONFLUENT_SCHEMA_REGISTRY_URL_CONFIG)) {
schemaRegistryUrls = avroConfig.getStrings(CONFLUENT_SCHEMA_REGISTRY_URL_CONFIG, ",");
useApicurio = false;
}
if (schemaRegistryUrls == null || schemaRegistryUrls.isEmpty()) {
throw new DataException("Need URL(s) for schema registry instances for CloudEvents");
throw new DataException("Need URL(s) for schema registry instances for CloudEvents when using Apache Avro");
}
if (avroConverter == null) {
avroConverter = Instantiator.getInstance(AVRO_CONVERTER_CLASS, null, null);
avroConverter = Instantiator.getInstance(useApicurio ? APICURIO_AVRO_CONVERTER_CLASS : CONFLUENT_AVRO_CONVERTER_CLASS, null, null);
LOGGER.info("Using Avro converter {}", avroConverter.getClass().getName());
avroConverter.configure(avroConfig.asMap(), false);
}
}

View File

@ -14,6 +14,7 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@ -48,7 +49,8 @@
*/
public class ApicurioRegistryTest {
private static final String DEBEZIUM_VERSION = "1.2.0.CR1";
// TODO: revert to upstream version
private static final String DEBEZIUM_VERSION = "1.2";
private static final String APICURIO_VERSION = "1.2.2.Final";
private static final Logger LOGGER = LoggerFactory.getLogger(ApicurioRegistryTest.class);
@ -69,7 +71,7 @@ public class ApicurioRegistryTest {
public static ImageFromDockerfile apicurioDebeziumImage = new ImageFromDockerfile()
.withDockerfileFromBuilder(builder -> builder
.from("debezium/connect:" + DEBEZIUM_VERSION)
.from("gunnarmorling/debezium-connect:" + DEBEZIUM_VERSION)
.env("KAFKA_CONNECT_DEBEZIUM_DIR", "$KAFKA_CONNECT_PLUGINS_DIR/debezium-connector-postgres")
.env("APICURIO_VERSION", APICURIO_VERSION)
.run("cd $KAFKA_CONNECT_DEBEZIUM_DIR && curl https://repo1.maven.org/maven2/io/apicurio/apicurio-registry-distro-connect-converter/$APICURIO_VERSION/apicurio-registry-distro-connect-converter-$APICURIO_VERSION-converter.tar.gz | tar xzv")
@ -155,6 +157,53 @@ public void shouldConvertToAvro() throws Exception {
}
}
@Test
public void shouldConvertToCloudEventWithDataAsAvro() throws Exception {
try (Connection connection = getConnection(postgresContainer);
Statement statement = connection.createStatement();
KafkaConsumer<String, String> consumer = getConsumerString(kafkaContainer)) {
statement.execute("drop schema if exists todo cascade");
statement.execute("create schema todo");
statement.execute("create table todo.Todo (id int8 not null, title varchar(255), primary key (id))");
statement.execute("alter table todo.Todo replica identity full");
statement.execute("insert into todo.Todo values (3, 'Be Awesome')");
final String host = apicurioContainer.getContainerInfo().getConfig().getHostName();
final int port = apicurioContainer.getExposedPorts().get(0);
final String apicurioUrl = "http://" + host + ":" + port + "/api";
String id = "3";
// host, database, user etc. are obtained from the container
final ConnectorConfiguration config = ConnectorConfiguration.forJdbcContainer(postgresContainer)
.with("database.server.name", "dbserver" + id)
.with("slot.name", "debezium_" + id)
.with("key.converter", "org.apache.kafka.connect.json.JsonConverter")
.with("value.converter", "io.debezium.converters.CloudEventsConverter")
.with("value.converter.data.serializer.type", "avro")
.with("value.converter.avro.apicurio.registry.url", apicurioUrl)
.with("value.converter.avro.apicurio.registry.global-id", "io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy");
debeziumContainer.registerConnector("my-connector-cloudevents-avro", config);
consumer.subscribe(Arrays.asList("dbserver3.todo.todo"));
List<ConsumerRecord<String, String>> changeEvents = drain(consumer, 1);
assertThat(JsonPath.<Integer> read(changeEvents.get(0).key(), "$.payload.id")).isEqualTo(3);
assertThat(JsonPath.<String> read(changeEvents.get(0).value(), "$.iodebeziumop")).isEqualTo("r");
assertThat(JsonPath.<String> read(changeEvents.get(0).value(), "$.iodebeziumname")).isEqualTo("dbserver3");
assertThat(JsonPath.<String> read(changeEvents.get(0).value(), "$.datacontenttype")).isEqualTo("application/avro");
assertThat(JsonPath.<String> read(changeEvents.get(0).value(), "$.iodebeziumtable")).isEqualTo("todo");
// Verify magic byte of Avro messages
byte[] decodedBytes = Base64.getDecoder().decode(JsonPath.<String> read(changeEvents.get(0).value(), "$.data"));
assertThat(decodedBytes[0]).isZero();
consumer.unsubscribe();
}
}
private Connection getConnection(PostgreSQLContainer<?> postgresContainer) throws SQLException {
return DriverManager.getConnection(postgresContainer.getJdbcUrl(), postgresContainer.getUsername(),
postgresContainer.getPassword());