DBZ-4028 Add test cases for Connect REST extension

This commit is contained in:
Anisha Mohanty 2021-12-08 16:02:57 +05:30 committed by GitHub
parent 8f007e08a2
commit 1ccc674131
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 251 additions and 1 deletions

View File

@ -37,4 +37,4 @@ jobs:
restore-keys: |
${{ runner.os }}-maven-
- name: Build Connect Rest Extension
run: mvn clean install -B -pl debezium-connect-rest-extension -am -Passembly -Dformat.formatter.goal=validate -Dformat.imports.goal=check -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120
run: mvn clean install -B -pl debezium-connect-rest-extension -am -Dformat.formatter.goal=validate -Dformat.imports.goal=check -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120

View File

@ -15,6 +15,7 @@
<!-- check new release version at https://github.com/confluentinc/schema-registry/releases -->
<version.confluent.platform>6.0.2</version.confluent.platform>
<version.apicurio>2.0.2.Final</version.apicurio>
<rest-assured.version>4.3.3</rest-assured.version>
<!-- Connectors -->
<version.com.google.protobuf>3.8.0</version.com.google.protobuf>
@ -460,6 +461,11 @@
<artifactId>junit</artifactId>
<version>${version.junit}</version>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<version>${rest-assured.version}</version>
</dependency>
<dependency>
<groupId>org.easytesting</groupId>
<artifactId>fest-assert</artifactId>
@ -589,6 +595,11 @@
<artifactId>debezium-connector-cassandra</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-testing-testcontainers</artifactId>
<version>${project.version}</version>
</dependency>
<!-- Debezium test artifacts -->
<dependency>

View File

@ -24,6 +24,26 @@
<artifactId>connect-runtime</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-testing-testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -0,0 +1,93 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.kcrestextension;
import static io.restassured.RestAssured.given;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.is;
import java.util.List;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests topic creation (which is enabled in Kafka version greater than 2.6.0) and transforms endpoints.
* Debezium Container with 1.7 image is used for the same.
*/
public class DebeziumResourceIT {
private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumResourceIT.class);
private static final String DEBEZIUM_VERSION = "1.7";
@BeforeEach
public void start() {
TestHelper.stopContainers();
TestHelper.setupDebeziumContainer(DEBEZIUM_VERSION);
}
@AfterEach
public void stop() {
TestHelper.stopContainers();
}
@Test
public void testTopicCreationEndpoint() {
TestHelper.startContainers();
given()
.port(TestHelper.getDebeziumContainer().getFirstMappedPort())
.when()
.get(TestHelper.API_PREFIX + TestHelper.TOPIC_CREATION_ENDPOINT)
.then().log().all()
.statusCode(200)
.body(is("true"));
}
@Test
public void testTopicCreationEndpointWhenExplicitlyDisabled() {
TestHelper.withEnv("CONNECT_TOPIC_CREATION_ENABLE", "false");
TestHelper.startContainers();
given()
.port(TestHelper.getDebeziumContainer().getFirstMappedPort())
.when()
.get(TestHelper.API_PREFIX + TestHelper.TOPIC_CREATION_ENDPOINT)
.then().log().all()
.statusCode(200)
.body(is("false"));
}
@Test
public void testTransformsEndpoint() {
TestHelper.startContainers();
given()
.port(TestHelper.getDebeziumContainer().getFirstMappedPort())
.when().get(TestHelper.API_PREFIX + TestHelper.TRANSFORMS_ENDPOINT)
.then().log().all()
.statusCode(200)
.body("transform.size()", is(33))
.body("transform",
containsInAnyOrder(List.of("io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
"io.debezium.connector.mysql.transforms.ReadToInsertEvent", "io.debezium.transforms.ByLogicalTableRouter",
"io.debezium.transforms.ContentBasedRouter", "io.debezium.transforms.ExtractNewRecordState", "io.debezium.transforms.Filter",
"io.debezium.transforms.outbox.EventRouter", "io.debezium.transforms.tracing.ActivateTracingSpan",
"org.apache.kafka.connect.transforms.predicates.HasHeaderKey", "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
"org.apache.kafka.connect.transforms.predicates.TopicNameMatches", "org.apache.kafka.connect.transforms.Cast$Key",
"org.apache.kafka.connect.transforms.Cast$Value", "org.apache.kafka.connect.transforms.ExtractField$Key",
"org.apache.kafka.connect.transforms.ExtractField$Value", "org.apache.kafka.connect.transforms.Filter",
"org.apache.kafka.connect.transforms.Flatten$Key", "org.apache.kafka.connect.transforms.Flatten$Value",
"org.apache.kafka.connect.transforms.HoistField$Key", "org.apache.kafka.connect.transforms.HoistField$Value",
"org.apache.kafka.connect.transforms.InsertField$Key",
"org.apache.kafka.connect.transforms.InsertField$Value", "org.apache.kafka.connect.transforms.MaskField$Key",
"org.apache.kafka.connect.transforms.MaskField$Value", "org.apache.kafka.connect.transforms.RegexRouter",
"org.apache.kafka.connect.transforms.ReplaceField$Key", "org.apache.kafka.connect.transforms.ReplaceField$Value",
"org.apache.kafka.connect.transforms.SetSchemaMetadata$Key", "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
"org.apache.kafka.connect.transforms.TimestampConverter$Key", "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"org.apache.kafka.connect.transforms.TimestampRouter", "org.apache.kafka.connect.transforms.ValueToKey").toArray()));
}
}

View File

@ -0,0 +1,47 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.kcrestextension;
import static io.restassured.RestAssured.given;
import static org.hamcrest.Matchers.is;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests topic creation endpoint which is disabled in Kafka version less than 2.6.0.
* Debezium Container with 1.2 image is used for the same.
*/
public class DebeziumResourceNoTopicCreationIT {
private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumResourceNoTopicCreationIT.class);
private static final String DEBEZIUM_VERSION = "1.2";
@BeforeEach
public void start() {
TestHelper.stopContainers();
TestHelper.setupDebeziumContainer(DEBEZIUM_VERSION);
TestHelper.startContainers();
}
@AfterEach
public void stop() {
TestHelper.stopContainers();
}
@Test
public void testTopicCreationEndpoint() {
given()
.port(TestHelper.getDebeziumContainer().getFirstMappedPort())
.when()
.get(TestHelper.API_PREFIX + TestHelper.TOPIC_CREATION_ENDPOINT)
.then().log().all()
.statusCode(200)
.body(is("false"));
}
}

View File

@ -0,0 +1,79 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.kcrestextension;
import java.time.Duration;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.MountableFile;
import io.debezium.testing.testcontainers.DebeziumContainer;
public class TestHelper {
private static final Logger LOGGER = LoggerFactory.getLogger(TestHelper.class);
private static final String KAFKA_HOSTNAME = "kafka-dbz-ui";
public static final String API_PREFIX = "/debezium";
public static final String TRANSFORMS_ENDPOINT = "/transforms";
public static final String TOPIC_CREATION_ENDPOINT = "/topic-creation";
private static final Network NETWORK = Network.newNetwork();
private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.5"))
.withNetworkAliases(KAFKA_HOSTNAME)
.withNetwork(NETWORK);
private static DebeziumContainer DEBEZIUM_CONTAINER;
public static DebeziumContainer getDebeziumContainer() {
return DEBEZIUM_CONTAINER;
}
public static void setupDebeziumContainer(String debeziumVersion) {
DEBEZIUM_CONTAINER = new DebeziumContainer(DockerImageName.parse("debezium/connect:" + debeziumVersion))
.withEnv("ENABLE_DEBEZIUM_SCRIPTING", "true")
.withEnv("CONNECT_REST_EXTENSION_CLASSES", "io.debezium.kcrestextension.DebeziumConnectRestExtension")
.withNetwork(NETWORK)
.withCopyFileToContainer(
MountableFile.forHostPath(
"target/debezium-connect-rest-extension-1.8.0-SNAPSHOT.jar"),
"/kafka/libs/debezium-kcd-rest-extension-1.8.0.jar")
.withKafka(KAFKA_CONTAINER.getNetwork(), KAFKA_HOSTNAME + ":9092")
.withLogConsumer(new Slf4jLogConsumer(LOGGER))
.withStartupTimeout(Duration.ofSeconds(90))
.dependsOn(KAFKA_CONTAINER);
}
public static void withEnv(String key, String value) {
DEBEZIUM_CONTAINER = DEBEZIUM_CONTAINER.withEnv(key, value);
}
public static void startContainers() {
Startables.deepStart(Stream.of(KAFKA_CONTAINER, DEBEZIUM_CONTAINER)).join();
}
public static void stopContainers() {
try {
if (DEBEZIUM_CONTAINER != null) {
DEBEZIUM_CONTAINER.stop();
}
if (KAFKA_CONTAINER != null) {
KAFKA_CONTAINER.stop();
}
}
catch (Exception ignored) {
}
}
}