From b48c14a66bc5e54753bb1435afcaf4a5161b3b92 Mon Sep 17 00:00:00 2001 From: PlugaruT Date: Wed, 15 Dec 2021 11:30:23 +0200 Subject: [PATCH] DBZ-4450 Basic implementation of change consumer --- .../debezium-server-pubsub-lite/pom.xml | 1 - .../pubsublite/PubSubLiteChangeConsumer.java | 160 +++++++++++++++++- 2 files changed, 159 insertions(+), 2 deletions(-) diff --git a/debezium-server/debezium-server-pubsub-lite/pom.xml b/debezium-server/debezium-server-pubsub-lite/pom.xml index c8bd089eb..82b678ff4 100644 --- a/debezium-server/debezium-server-pubsub-lite/pom.xml +++ b/debezium-server/debezium-server-pubsub-lite/pom.xml @@ -27,7 +27,6 @@ com.google.cloud google-cloud-pubsublite - 1.4.5 diff --git a/debezium-server/debezium-server-pubsub-lite/src/main/java/io/debezium/server/pubsublite/PubSubLiteChangeConsumer.java b/debezium-server/debezium-server-pubsub-lite/src/main/java/io/debezium/server/pubsublite/PubSubLiteChangeConsumer.java index c3733f010..be24d9c9e 100644 --- a/debezium-server/debezium-server-pubsub-lite/src/main/java/io/debezium/server/pubsublite/PubSubLiteChangeConsumer.java +++ b/debezium-server/debezium-server-pubsub-lite/src/main/java/io/debezium/server/pubsublite/PubSubLiteChangeConsumer.java @@ -1,4 +1,162 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ package io.debezium.server.pubsublite; -public class PubSubLiteChangeConsumer { +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import javax.enterprise.context.Dependent; +import javax.enterprise.inject.Instance; +import javax.inject.Inject; +import javax.inject.Named; + +import com.google.cloud.pubsublite.CloudRegionOrZone; +import com.google.cloud.pubsublite.ProjectNumber; +import com.google.cloud.pubsublite.TopicName; +import com.google.cloud.pubsublite.TopicPath; +import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings; +import org.eclipse.microprofile.config.Config; +import org.eclipse.microprofile.config.ConfigProvider; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; +import com.google.protobuf.ByteString; +import com.google.cloud.pubsublite.cloudpubsub.Publisher; +import com.google.pubsub.v1.PubsubMessage; + +import io.debezium.DebeziumException; +import io.debezium.engine.ChangeEvent; +import io.debezium.engine.DebeziumEngine; +import io.debezium.engine.DebeziumEngine.RecordCommitter; +import io.debezium.server.BaseChangeConsumer; +import io.debezium.server.CustomConsumerBuilder; + +/** + * Implementation of the consumer that delivers the messages into Google Pub/Sub Lite destination. + */ +@Named("pubsublite") +@Dependent +public class PubSubLiteChangeConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer> { + + private static final Logger LOGGER = LoggerFactory.getLogger(PubSubLiteChangeConsumer.class); + + private static final String PROP_PREFIX = "debezium.sink.pubsublite."; + private static final String PROP_PROJECT_ID = PROP_PREFIX + "default.project.id"; + private static final String PROP_REGION = PROP_PREFIX + "default.region"; + + public static interface PublisherBuilder { + Publisher get(String topicName); + } + + private PublisherBuilder publisherBuilder; + private final Map publishers = new HashMap<>(); + + @ConfigProperty(name = PROP_PREFIX + "ordering.enabled", defaultValue = "true") + boolean orderingEnabled; + + @ConfigProperty(name = PROP_PREFIX + "null.key", defaultValue = "default") + String nullKey; + + @Inject + @CustomConsumerBuilder + Instance customPublisherBuilder; + + @PostConstruct + void connect() { + final Config config = ConfigProvider.getConfig(); + long projectId = config.getValue(PROP_PROJECT_ID, long.class); + String region = config.getValue(PROP_REGION, String.class); + + + if (customPublisherBuilder.isResolvable()) { + publisherBuilder = customPublisherBuilder.get(); + LOGGER.info("Obtained custom configured PublisherBuilder '{}'", customPublisherBuilder); + return; + } + publisherBuilder = (t) -> { + TopicPath topicPath = TopicPath + .newBuilder() + .setName(TopicName.of(t)) + .setProject(ProjectNumber.of(projectId)) + .setLocation(CloudRegionOrZone.parse(region)) + .build(); + + PublisherSettings publisherSettings = PublisherSettings + .newBuilder() + .setTopicPath(topicPath) + .build(); + Publisher publisher = Publisher.create(publisherSettings); + publisher.startAsync().awaitRunning(); + return publisher; + }; + LOGGER.info("Using default PublisherBuilder '{}'", publisherBuilder); + } + + @PreDestroy + void close() { + publishers.values().forEach(publisher -> { + try { + publisher.stopAsync().awaitTerminated(); + } catch (Exception e) { + LOGGER.warn("Exception while closing publisher: {}", e); + } + }); + } + + + @Override + public void handleBatch(List> records, RecordCommitter> committer) throws InterruptedException { + final List> deliveries = new ArrayList<>(); + for (ChangeEvent record : records) { + LOGGER.trace("Received event '{}'", record); + final String topicName = streamNameMapper.map(record.destination()); + + Publisher publisher = publishers.computeIfAbsent(topicName, (topic) -> publisherBuilder.get(topic)); + + final PubsubMessage.Builder pubsubMessage = PubsubMessage.newBuilder(); + + if (orderingEnabled) { + if (record.key() == null) { + pubsubMessage.setOrderingKey(nullKey); + } else if (record.key() instanceof String) { + pubsubMessage.setOrderingKey((String) record.key()); + } else if (record.key() instanceof byte[]) { + pubsubMessage.setOrderingKeyBytes(ByteString.copyFrom((byte[]) record.key())); + } + } + + if (record.value() instanceof String) { + pubsubMessage.setData(ByteString.copyFromUtf8((String) record.value())); + } else if (record.value() instanceof byte[]) { + pubsubMessage.setData(ByteString.copyFrom((byte[]) record.value())); + } + + deliveries.add(publisher.publish(pubsubMessage.build())); + committer.markProcessed(record); + }; + List messageIds; + try { + messageIds = ApiFutures.allAsList(deliveries).get(); + } catch (ExecutionException e) { + throw new DebeziumException(e); + } + LOGGER.trace("Sent messages with ids: {}", messageIds); + committer.markBatchFinished(); + } + + @Override + public boolean supportsTombstoneEvents() { + return false; + } }