DBZ-4491 add support for Google PubSub emulator

This commit is contained in:
Calin Laurentiu Ilie 2022-02-17 20:31:27 +01:00 committed by Jiri Pechanec
parent 8665866b59
commit c416c1aeed
3 changed files with 131 additions and 12 deletions

View File

@ -10,6 +10,7 @@
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
@ -31,9 +32,15 @@
import com.google.api.gax.batching.BatchingSettings; import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.FlowControlSettings; import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController; import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.ServiceOptions; import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.Publisher; import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.Publisher.Builder;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName; import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.PubsubMessage;
@ -44,6 +51,8 @@
import io.debezium.engine.DebeziumEngine.RecordCommitter; import io.debezium.engine.DebeziumEngine.RecordCommitter;
import io.debezium.server.BaseChangeConsumer; import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.CustomConsumerBuilder; import io.debezium.server.CustomConsumerBuilder;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
/** /**
* Implementation of the consumer that delivers the messages into Google Pub/Sub destination. * Implementation of the consumer that delivers the messages into Google Pub/Sub destination.
@ -113,10 +122,17 @@ public static interface PublisherBuilder {
@ConfigProperty(name = PROP_PREFIX + "retry.rpc.timeout.multiplier", defaultValue = "2.0") @ConfigProperty(name = PROP_PREFIX + "retry.rpc.timeout.multiplier", defaultValue = "2.0")
Double rpcTimeoutMultiplier; Double rpcTimeoutMultiplier;
@ConfigProperty(name = PROP_PREFIX + "address", defaultValue = "")
Optional<String> address;
@Inject @Inject
@CustomConsumerBuilder @CustomConsumerBuilder
Instance<PublisherBuilder> customPublisherBuilder; Instance<PublisherBuilder> customPublisherBuilder;
private ManagedChannel channel;
private TransportChannelProvider channelProvider;
private CredentialsProvider credentialsProvider;
@PostConstruct @PostConstruct
void connect() { void connect() {
@ -141,10 +157,20 @@ void connect() {
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block) .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
.build()); .build());
} }
if (address.isPresent()) {
String hostport = address.get();
channel = ManagedChannelBuilder
.forTarget(hostport)
.usePlaintext()
.build();
channelProvider = FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
credentialsProvider = NoCredentialsProvider.create();
}
publisherBuilder = (t) -> { publisherBuilder = (t) -> {
try { try {
return Publisher.newBuilder(t) Builder builder = Publisher.newBuilder(t)
.setEnableMessageOrdering(orderingEnabled) .setEnableMessageOrdering(orderingEnabled)
.setBatchingSettings(batchingSettings.build()) .setBatchingSettings(batchingSettings.build())
.setRetrySettings( .setRetrySettings(
@ -156,8 +182,13 @@ void connect() {
.setMaxRetryDelay(Duration.ofMillis(maxRetryDelay)) .setMaxRetryDelay(Duration.ofMillis(maxRetryDelay))
.setInitialRpcTimeout(Duration.ofMillis(initialRpcTimeout)) .setInitialRpcTimeout(Duration.ofMillis(initialRpcTimeout))
.setRpcTimeoutMultiplier(rpcTimeoutMultiplier) .setRpcTimeoutMultiplier(rpcTimeoutMultiplier)
.build()) .build());
.build();
if (address.isPresent()) {
builder.setChannelProvider(channelProvider).setCredentialsProvider(credentialsProvider);
}
return builder.build();
} }
catch (IOException e) { catch (IOException e) {
throw new DebeziumException(e); throw new DebeziumException(e);
@ -177,6 +208,10 @@ void close() {
LOGGER.warn("Exception while closing publisher: {}", e); LOGGER.warn("Exception while closing publisher: {}", e);
} }
}); });
if (channel != null && !channel.isShutdown()) {
channel.shutdown();
}
} }
@Override @Override

View File

@ -19,14 +19,24 @@
import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.ServiceOptions; import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.AckReplyConsumer; import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver; import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber; import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient; import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.pubsub.v1.ProjectSubscriptionName; import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig; import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName; import com.google.pubsub.v1.TopicName;
import io.debezium.server.DebeziumServer; import io.debezium.server.DebeziumServer;
@ -34,12 +44,15 @@
import io.debezium.server.events.ConnectorCompletedEvent; import io.debezium.server.events.ConnectorCompletedEvent;
import io.debezium.server.events.ConnectorStartedEvent; import io.debezium.server.events.ConnectorStartedEvent;
import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager; import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
import io.debezium.util.Strings;
import io.debezium.util.Testing; import io.debezium.util.Testing;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest; import io.quarkus.test.junit.QuarkusTest;
/** /**
* Integration test that verifies basic reading from PostgreSQL database and writing to a Google Cloud PubSub stream. * Integration test that verifies basic reading from PostgreSQL database and writing to a Google Cloud PubSub stream running on a Google PubSub Emulator
* *
* @author Jiri Pechanec * @author Jiri Pechanec
*/ */
@ -51,14 +64,17 @@ public class PubSubIT {
// The topic of this name must exist and be empty // The topic of this name must exist and be empty
private static final String STREAM_NAME = "testc.inventory.customers"; private static final String STREAM_NAME = "testc.inventory.customers";
private static final String SUBSCRIPTION_NAME = "testsubs"; private static final String SUBSCRIPTION_NAME = "testsubs";
protected static Subscriber subscriber; protected static Subscriber subscriber;
private static ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(ServiceOptions.getDefaultProjectId(), SUBSCRIPTION_NAME); private static ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(ServiceOptions.getDefaultProjectId(), SUBSCRIPTION_NAME);
private static TopicName topicName = TopicName.of(ServiceOptions.getDefaultProjectId(), STREAM_NAME);
{ {
Testing.Files.delete(TestConfigSource.OFFSET_STORE_PATH); Testing.Files.delete(TestConfigSource.OFFSET_STORE_PATH);
Testing.Files.createTestingFile(PubSubTestConfigSource.OFFSET_STORE_PATH); Testing.Files.createTestingFile(PubSubTestConfigSource.OFFSET_STORE_PATH);
} }
private static ManagedChannel channel;
private static TransportChannelProvider channelProvider;
private static CredentialsProvider credentialsProvider;
@AfterAll @AfterAll
static void stop() throws IOException { static void stop() throws IOException {
@ -66,9 +82,16 @@ static void stop() throws IOException {
subscriber.stopAsync(); subscriber.stopAsync();
subscriber.awaitTerminated(); subscriber.awaitTerminated();
try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) { try (SubscriptionAdminClient subscriptionAdminClient = createSubscriptionAdminClient()) {
subscriptionAdminClient.deleteSubscription(subscriptionName); subscriptionAdminClient.deleteSubscription(subscriptionName);
} }
try (TopicAdminClient topicAdminClient = createTopicAdminClient()){
topicAdminClient.deleteTopic(topicName);
}
}
if (channel != null && !channel.isShutdown()) {
channel.shutdown();
} }
} }
@ -89,18 +112,76 @@ public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
void setupDependencies(@Observes ConnectorStartedEvent event) throws IOException { void setupDependencies(@Observes ConnectorStartedEvent event) throws IOException {
Testing.Print.enable(); Testing.Print.enable();
try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) { trySetupEmulatorChannel();
final TopicName topic = TopicName.ofProjectTopicName(ServiceOptions.getDefaultProjectId(), STREAM_NAME);
try (TopicAdminClient topicAdminClient = createTopicAdminClient()) {
Topic topic = topicAdminClient.createTopic(topicName);
Testing.print("Created topic: " + topic.getName());
}
catch (AlreadyExistsException e) {
Testing.print("Topic already exists");
}
try (SubscriptionAdminClient subscriptionAdminClient = createSubscriptionAdminClient()) {
int ackDeadlineSeconds = 0; int ackDeadlineSeconds = 0;
subscriptionAdminClient.createSubscription(subscriptionName, topic, subscriptionAdminClient.createSubscription(subscriptionName, topicName,
PushConfig.newBuilder().build(), ackDeadlineSeconds); PushConfig.newBuilder().build(), ackDeadlineSeconds);
} }
catch (AlreadyExistsException e) {
Testing.print("Subscription already exists");
}
subscriber = Subscriber.newBuilder(subscriptionName, new TestMessageReceiver()).build(); subscriber = createSubscriber();
subscriber.startAsync().awaitRunning(); subscriber.startAsync().awaitRunning();
} }
void trySetupEmulatorChannel() {
if (!Strings.isNullOrEmpty(PubSubTestConfigSource.PUB_SUB_ADDRESS)) {
channel = ManagedChannelBuilder.forTarget(PubSubTestConfigSource.PUB_SUB_ADDRESS).usePlaintext().build();
channelProvider = FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
credentialsProvider = NoCredentialsProvider.create();
Testing.print("Executing test towards pubsub emulator running at: " + PubSubTestConfigSource.PUB_SUB_ADDRESS);
}
}
Subscriber createSubscriber() {
if (!Strings.isNullOrEmpty(PubSubTestConfigSource.PUB_SUB_ADDRESS)) {
return Subscriber.newBuilder(subscriptionName, new TestMessageReceiver())
.setChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.build();
}
return Subscriber.newBuilder(subscriptionName, new TestMessageReceiver())
.build();
}
static SubscriptionAdminClient createSubscriptionAdminClient() throws IOException {
if (!Strings.isNullOrEmpty(PubSubTestConfigSource.PUB_SUB_ADDRESS)) {
return SubscriptionAdminClient.create(
SubscriptionAdminSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.build());
}
return SubscriptionAdminClient.create();
}
static TopicAdminClient createTopicAdminClient() throws IOException {
if (!Strings.isNullOrEmpty(PubSubTestConfigSource.PUB_SUB_ADDRESS)) {
return TopicAdminClient.create(
TopicAdminSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.build());
}
return TopicAdminClient.create();
}
void connectorCompleted(@Observes ConnectorCompletedEvent event) throws Exception { void connectorCompleted(@Observes ConnectorCompletedEvent event) throws Exception {
if (!event.isSuccess()) { if (!event.isSuccess()) {

View File

@ -14,10 +14,13 @@
public class PubSubTestConfigSource extends TestConfigSource { public class PubSubTestConfigSource extends TestConfigSource {
public static final String PUB_SUB_ADDRESS = System.getenv("PUBSUB_EMULATOR_HOST");
public PubSubTestConfigSource() { public PubSubTestConfigSource() {
Map<String, String> pubsubTest = new HashMap<>(); Map<String, String> pubsubTest = new HashMap<>();
pubsubTest.put("debezium.sink.type", "pubsub"); pubsubTest.put("debezium.sink.type", "pubsub");
pubsubTest.put("debezium.sink.pubsub.address", PUB_SUB_ADDRESS);
pubsubTest.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector"); pubsubTest.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector");
pubsubTest.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, pubsubTest.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG,
OFFSET_STORE_PATH.toAbsolutePath().toString()); OFFSET_STORE_PATH.toAbsolutePath().toString());