diff --git a/debezium-server/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubChangeConsumer.java b/debezium-server/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubChangeConsumer.java index 64e8cedcc..0c4113493 100644 --- a/debezium-server/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubChangeConsumer.java +++ b/debezium-server/debezium-server-pubsub/src/main/java/io/debezium/server/pubsub/PubSubChangeConsumer.java @@ -10,6 +10,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ExecutionException; import javax.annotation.PostConstruct; @@ -31,9 +32,15 @@ import com.google.api.gax.batching.BatchingSettings; import com.google.api.gax.batching.FlowControlSettings; import com.google.api.gax.batching.FlowController; +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannelProvider; import com.google.cloud.ServiceOptions; import com.google.cloud.pubsub.v1.Publisher; +import com.google.cloud.pubsub.v1.Publisher.Builder; import com.google.protobuf.ByteString; import com.google.pubsub.v1.ProjectTopicName; import com.google.pubsub.v1.PubsubMessage; @@ -44,6 +51,8 @@ import io.debezium.engine.DebeziumEngine.RecordCommitter; import io.debezium.server.BaseChangeConsumer; import io.debezium.server.CustomConsumerBuilder; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; /** * Implementation of the consumer that delivers the messages into Google Pub/Sub destination. @@ -113,10 +122,17 @@ public static interface PublisherBuilder { @ConfigProperty(name = PROP_PREFIX + "retry.rpc.timeout.multiplier", defaultValue = "2.0") Double rpcTimeoutMultiplier; + + @ConfigProperty(name = PROP_PREFIX + "address", defaultValue = "") + Optional address; @Inject @CustomConsumerBuilder Instance customPublisherBuilder; + + private ManagedChannel channel; + private TransportChannelProvider channelProvider; + private CredentialsProvider credentialsProvider; @PostConstruct void connect() { @@ -141,10 +157,20 @@ void connect() { .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block) .build()); } + + if (address.isPresent()) { + String hostport = address.get(); + channel = ManagedChannelBuilder + .forTarget(hostport) + .usePlaintext() + .build(); + channelProvider = FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel)); + credentialsProvider = NoCredentialsProvider.create(); + } publisherBuilder = (t) -> { try { - return Publisher.newBuilder(t) + Builder builder = Publisher.newBuilder(t) .setEnableMessageOrdering(orderingEnabled) .setBatchingSettings(batchingSettings.build()) .setRetrySettings( @@ -156,8 +182,13 @@ void connect() { .setMaxRetryDelay(Duration.ofMillis(maxRetryDelay)) .setInitialRpcTimeout(Duration.ofMillis(initialRpcTimeout)) .setRpcTimeoutMultiplier(rpcTimeoutMultiplier) - .build()) - .build(); + .build()); + + if (address.isPresent()) { + builder.setChannelProvider(channelProvider).setCredentialsProvider(credentialsProvider); + } + + return builder.build(); } catch (IOException e) { throw new DebeziumException(e); @@ -177,6 +208,10 @@ void close() { LOGGER.warn("Exception while closing publisher: {}", e); } }); + + if (channel != null && !channel.isShutdown()) { + channel.shutdown(); + } } @Override diff --git a/debezium-server/debezium-server-pubsub/src/test/java/io/debezium/server/pubsub/PubSubIT.java b/debezium-server/debezium-server-pubsub/src/test/java/io/debezium/server/pubsub/PubSubIT.java index 087ca700b..0ab73ca0b 100644 --- a/debezium-server/debezium-server-pubsub/src/test/java/io/debezium/server/pubsub/PubSubIT.java +++ b/debezium-server/debezium-server-pubsub/src/test/java/io/debezium/server/pubsub/PubSubIT.java @@ -19,14 +19,24 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Test; +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.AlreadyExistsException; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannelProvider; import com.google.cloud.ServiceOptions; import com.google.cloud.pubsub.v1.AckReplyConsumer; import com.google.cloud.pubsub.v1.MessageReceiver; import com.google.cloud.pubsub.v1.Subscriber; import com.google.cloud.pubsub.v1.SubscriptionAdminClient; +import com.google.cloud.pubsub.v1.SubscriptionAdminSettings; +import com.google.cloud.pubsub.v1.TopicAdminClient; +import com.google.cloud.pubsub.v1.TopicAdminSettings; import com.google.pubsub.v1.ProjectSubscriptionName; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.PushConfig; +import com.google.pubsub.v1.Topic; import com.google.pubsub.v1.TopicName; import io.debezium.server.DebeziumServer; @@ -34,12 +44,15 @@ import io.debezium.server.events.ConnectorCompletedEvent; import io.debezium.server.events.ConnectorStartedEvent; import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager; +import io.debezium.util.Strings; import io.debezium.util.Testing; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; /** - * Integration test that verifies basic reading from PostgreSQL database and writing to a Google Cloud PubSub stream. + * Integration test that verifies basic reading from PostgreSQL database and writing to a Google Cloud PubSub stream running on a Google PubSub Emulator * * @author Jiri Pechanec */ @@ -51,14 +64,17 @@ public class PubSubIT { // The topic of this name must exist and be empty private static final String STREAM_NAME = "testc.inventory.customers"; private static final String SUBSCRIPTION_NAME = "testsubs"; - protected static Subscriber subscriber; private static ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(ServiceOptions.getDefaultProjectId(), SUBSCRIPTION_NAME); - + private static TopicName topicName = TopicName.of(ServiceOptions.getDefaultProjectId(), STREAM_NAME); { Testing.Files.delete(TestConfigSource.OFFSET_STORE_PATH); Testing.Files.createTestingFile(PubSubTestConfigSource.OFFSET_STORE_PATH); } + + private static ManagedChannel channel; + private static TransportChannelProvider channelProvider; + private static CredentialsProvider credentialsProvider; @AfterAll static void stop() throws IOException { @@ -66,9 +82,16 @@ static void stop() throws IOException { subscriber.stopAsync(); subscriber.awaitTerminated(); - try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) { + try (SubscriptionAdminClient subscriptionAdminClient = createSubscriptionAdminClient()) { subscriptionAdminClient.deleteSubscription(subscriptionName); } + + try (TopicAdminClient topicAdminClient = createTopicAdminClient()){ + topicAdminClient.deleteTopic(topicName); + } + } + if (channel != null && !channel.isShutdown()) { + channel.shutdown(); } } @@ -89,18 +112,76 @@ public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) { void setupDependencies(@Observes ConnectorStartedEvent event) throws IOException { Testing.Print.enable(); - - try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) { - final TopicName topic = TopicName.ofProjectTopicName(ServiceOptions.getDefaultProjectId(), STREAM_NAME); + + trySetupEmulatorChannel(); + + try (TopicAdminClient topicAdminClient = createTopicAdminClient()) { + Topic topic = topicAdminClient.createTopic(topicName); + Testing.print("Created topic: " + topic.getName()); + } + catch (AlreadyExistsException e) { + Testing.print("Topic already exists"); + } + + try (SubscriptionAdminClient subscriptionAdminClient = createSubscriptionAdminClient()) { int ackDeadlineSeconds = 0; - subscriptionAdminClient.createSubscription(subscriptionName, topic, + subscriptionAdminClient.createSubscription(subscriptionName, topicName, PushConfig.newBuilder().build(), ackDeadlineSeconds); } + catch (AlreadyExistsException e) { + Testing.print("Subscription already exists"); + } - subscriber = Subscriber.newBuilder(subscriptionName, new TestMessageReceiver()).build(); + subscriber = createSubscriber(); subscriber.startAsync().awaitRunning(); } + + + void trySetupEmulatorChannel() { + if (!Strings.isNullOrEmpty(PubSubTestConfigSource.PUB_SUB_ADDRESS)) { + channel = ManagedChannelBuilder.forTarget(PubSubTestConfigSource.PUB_SUB_ADDRESS).usePlaintext().build(); + channelProvider = FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel)); + credentialsProvider = NoCredentialsProvider.create(); + Testing.print("Executing test towards pubsub emulator running at: " + PubSubTestConfigSource.PUB_SUB_ADDRESS); + } + } + + Subscriber createSubscriber() { + if (!Strings.isNullOrEmpty(PubSubTestConfigSource.PUB_SUB_ADDRESS)) { + return Subscriber.newBuilder(subscriptionName, new TestMessageReceiver()) + .setChannelProvider(channelProvider) + .setCredentialsProvider(credentialsProvider) + .build(); + } + return Subscriber.newBuilder(subscriptionName, new TestMessageReceiver()) + .build(); + } + + + static SubscriptionAdminClient createSubscriptionAdminClient() throws IOException { + if (!Strings.isNullOrEmpty(PubSubTestConfigSource.PUB_SUB_ADDRESS)) { + return SubscriptionAdminClient.create( + SubscriptionAdminSettings.newBuilder() + .setTransportChannelProvider(channelProvider) + .setCredentialsProvider(credentialsProvider) + .build()); + } + return SubscriptionAdminClient.create(); + } + + static TopicAdminClient createTopicAdminClient() throws IOException { + if (!Strings.isNullOrEmpty(PubSubTestConfigSource.PUB_SUB_ADDRESS)) { + return TopicAdminClient.create( + TopicAdminSettings.newBuilder() + .setTransportChannelProvider(channelProvider) + .setCredentialsProvider(credentialsProvider) + .build()); + } + return TopicAdminClient.create(); + } + + void connectorCompleted(@Observes ConnectorCompletedEvent event) throws Exception { if (!event.isSuccess()) { diff --git a/debezium-server/debezium-server-pubsub/src/test/java/io/debezium/server/pubsub/PubSubTestConfigSource.java b/debezium-server/debezium-server-pubsub/src/test/java/io/debezium/server/pubsub/PubSubTestConfigSource.java index 87a620d40..57c7dc3ac 100644 --- a/debezium-server/debezium-server-pubsub/src/test/java/io/debezium/server/pubsub/PubSubTestConfigSource.java +++ b/debezium-server/debezium-server-pubsub/src/test/java/io/debezium/server/pubsub/PubSubTestConfigSource.java @@ -14,10 +14,13 @@ public class PubSubTestConfigSource extends TestConfigSource { + public static final String PUB_SUB_ADDRESS = System.getenv("PUBSUB_EMULATOR_HOST"); + public PubSubTestConfigSource() { Map pubsubTest = new HashMap<>(); pubsubTest.put("debezium.sink.type", "pubsub"); + pubsubTest.put("debezium.sink.pubsub.address", PUB_SUB_ADDRESS); pubsubTest.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector"); pubsubTest.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());