DBZ-4395 Add connector specific Debezium Connect REST Extension/s and move logic from UI backend to the Debezium Connect REST Extension (part 2 of the re-architecturing/re-factoring)

DBZ-6761 Add "validate connection" endpoints in connector specific Connect REST extensions

closes to https://issues.redhat.com/browse/DBZ-4395
closes to https://issues.redhat.com/browse/DBZ-6761
This commit is contained in:
rkerner 2023-08-29 19:53:31 +02:00 committed by Jiri Pechanec
parent 5292096903
commit 6d3528d4fb
40 changed files with 1611 additions and 43 deletions

View File

@ -134,26 +134,6 @@ jobs:
files: |
debezium-testing/**
- name: Get modified files (Debezium UI)
id: changed-files-debezium-ui
uses: tj-actions/changed-files@v39.1.0
with:
files: |
debezium-core/**
debezium-schema-generator/**
debezium-connect-rest-extension/**
debezium-connector-mongodb/**
debezium-connector-mysql/**
debezium-connector-postgres/**
debezium-connector-sqlserver/**
debezium-connector-oracle/**
debezium-storage/**
support/checkstyle/**
debezium-parent/pom.xml
debezium-bom/pom.xml
pom.xml
.github/workflows/debezium-workflow.yml
- name: Get modified files (MySQL DDL parser)
id: changed-files-mysql-ddl-parser
uses: tj-actions/changed-files@v39.1.0
@ -1042,10 +1022,3 @@ jobs:
restore-keys: |
maven-debezium-test-build-${{ hashFiles('core/**/pom.xml') }}
- name: Build Debezium UI
working-directory: debezium-ui
run: >
./mvnw clean install -am -pl backend -B -U
-Ddebezium.test.records.waittime=20
-Dformat.formatter.goal=validate
-Dformat.imports.goal=check -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn

View File

@ -32,6 +32,7 @@
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import io.debezium.DebeziumException;
import io.debezium.kcrestextension.entities.PredicateDefinition;
import io.debezium.kcrestextension.entities.TransformDefinition;
import io.debezium.metadata.ConnectorDescriptor;
@ -151,14 +152,14 @@ private synchronized Herder getHerder() {
herderField = this.connectClusterState.getClass().getDeclaredField("herder");
}
catch (NoSuchFieldException e) {
throw new RuntimeException(e);
throw new DebeziumException(e);
}
herderField.setAccessible(true);
try {
this.herder = (Herder) herderField.get(this.connectClusterState);
}
catch (IllegalAccessException e) {
throw new RuntimeException(e);
throw new DebeziumException(e);
}
}
return this.herder;

View File

@ -39,6 +39,7 @@ public class DebeziumResourceIT {
"io.debezium.transforms.Filter",
"io.debezium.transforms.HeaderToValue",
"io.debezium.transforms.SchemaChangeEventFilter",
"io.debezium.transforms.TimezoneConverter",
"io.debezium.transforms.outbox.EventRouter",
"io.debezium.transforms.partitions.ComputePartition",
"io.debezium.transforms.partitions.PartitionRouting",

View File

@ -103,6 +103,11 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<properties>
<!--
@ -227,6 +232,9 @@
<goal>generate-api-spec</goal>
</goals>
<phase>prepare-package</phase>
<configuration>
<outputDirectory>${project.build.outputDirectory}/META-INF/resources/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
@ -302,6 +310,16 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>${version.failsafe.plugin}</version>
<configuration>
<systemPropertyVariables>
<isAssemblyProfileActive>true</isAssemblyProfileActive>
</systemPropertyVariables>
</configuration>
</plugin>
</plugins>
</build>
</profile>

View File

@ -216,17 +216,15 @@ public Config validate(Map<String, String> connectorConfigs) {
if (userValue.errorMessages().isEmpty()
&& passwordValue.errorMessages().isEmpty()
&& connectionStringValue.errorMessages().isEmpty()) {
// Try to connect to the database ...
ConnectionContext connContext = new ConnectionContext(config);
try (MongoClient client = connContext.connect()) {
client.listDatabaseNames();
client.listDatabaseNames().first(); // only when we try to fetch results a connection gets established
}
catch (MongoException e) {
connectionStringValue.addErrorMessage("Unable to connect: " + e.getMessage());
}
}
return new Config(new ArrayList<>(results.values()));
}
}

View File

@ -0,0 +1,48 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.rest;
import java.util.Map;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
/**
* A Kafka Connect REST extension that enables some advanced features over
* Kafka Connect's REST interface:
* + report available transformations and their configuration
* + return if topic auto-creation is available and enabled
*
* To install this extension put the jar file into a separate Kafka Connect
* plugin dir and configure your Kafka Connect properties file with:
*
* `rest.extension.classes=io.debezium.connector.mongodb.rest.DebeziumMongoDbConnectRestExtension`
*
*/
public class DebeziumMongoDbConnectRestExtension implements ConnectRestExtension {
private Map<String, ?> config;
@Override
public void register(ConnectRestExtensionContext restPluginContext) {
restPluginContext.configurable().register(new DebeziumMongoDbConnectorResource());
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
this.config = configs;
}
@Override
public String version() {
return AppInfoParser.getVersion();
}
}

View File

@ -0,0 +1,49 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.rest;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import org.apache.kafka.connect.connector.Connector;
import io.debezium.connector.mongodb.Module;
import io.debezium.connector.mongodb.MongoDbConnector;
import io.debezium.rest.ConnectionValidationResource;
import io.debezium.rest.SchemaResource;
/**
* A JAX-RS Resource class defining endpoints of the Debezium MySQL Connect REST Extension
*
*/
@Path(DebeziumMongoDbConnectorResource.BASE_PATH)
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public class DebeziumMongoDbConnectorResource implements SchemaResource, ConnectionValidationResource {
public static final String BASE_PATH = "/debezium/mongodb";
public static final String VERSION_ENDPOINT = "/version";
@GET
@Path(VERSION_ENDPOINT)
public String getConnectorVersion() {
return Module.version();
}
@Override
public Connector getConnector() {
return new MongoDbConnector();
}
@Override
public String getSchemaFilePath() {
return "/META-INF/resources/mongodb.json";
}
}

View File

@ -162,6 +162,14 @@ public static ArrayEncoding parse(String value, String defaultValue) {
private SmtManager<R> smtManager;
private final Field.Set configFields = Field.setOf(ARRAY_ENCODING, FLATTEN_STRUCT, DELIMITER,
ExtractNewRecordStateConfigDefinition.HANDLE_DELETES,
ExtractNewRecordStateConfigDefinition.DROP_TOMBSTONES,
ExtractNewRecordStateConfigDefinition.ADD_HEADERS,
ExtractNewRecordStateConfigDefinition.ADD_HEADERS_PREFIX,
ExtractNewRecordStateConfigDefinition.ADD_FIELDS,
ExtractNewRecordStateConfigDefinition.ADD_FIELDS_PREFIX);
@Override
public R apply(R record) {
if (!smtManager.isValidKey(record)) {
@ -367,10 +375,7 @@ private Headers makeHeaders(List<FieldReference> additionalHeaders, Struct origi
@Override
public ConfigDef config() {
final ConfigDef config = new ConfigDef();
Field.group(config, null,
ARRAY_ENCODING,
FLATTEN_STRUCT,
DELIMITER);
Field.group(config, null, configFields.asArray());
return config;
}
@ -383,12 +388,6 @@ public void configure(final Map<String, ?> map) {
final Configuration config = Configuration.from(map);
smtManager = new SmtManager<>(config);
final Field.Set configFields = Field.setOf(ARRAY_ENCODING, FLATTEN_STRUCT, DELIMITER,
ExtractNewRecordStateConfigDefinition.HANDLE_DELETES,
ExtractNewRecordStateConfigDefinition.DROP_TOMBSTONES,
ExtractNewRecordStateConfigDefinition.ADD_HEADERS,
ExtractNewRecordStateConfigDefinition.ADD_FIELDS);
if (!config.validateAndRecord(configFields, LOGGER::error)) {
throw new DebeziumException("Unable to validate config.");
}

View File

@ -0,0 +1 @@
io.debezium.connector.mongodb.rest.DebeziumMongoDbConnectRestExtension

View File

@ -17,7 +17,7 @@ public final class MongoDbDatabaseProvider {
public static final String MONGO_SHARD_REPLICA_SIZE = "mongodb.shard.replica.size";
public static final String MONGO_DOCKER_DESKTOP_PORT_PROPERTY = "mongodb.docker.desktop.ports";
// Should be aligned with definition in pom.xm
// Should be aligned with definition in pom.xml
public static final String MONGO_DOCKER_DESKTOP_PORT_DEFAULT = "27017:27117";
private static MongoDbReplicaSet.Builder dockerReplicaSetBuilder() {

View File

@ -0,0 +1,124 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.rest;
import static io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure.DATABASE;
import static io.restassured.RestAssured.given;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.startsWith;
import java.util.Locale;
import java.util.Map;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.testcontainers.utility.MountableFile;
import io.debezium.connector.mongodb.Module;
import io.debezium.connector.mongodb.MongoDbConnector;
import io.debezium.connector.mongodb.MongoDbConnectorConfig;
import io.debezium.testing.testcontainers.ConnectorConfiguration;
import io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure;
import io.debezium.testing.testcontainers.util.DockerUtils;
import io.restassured.http.ContentType;
public class DebeziumMongoDbConnectorResourceIT {
public static final MountableFile INIT_SCRIPT_RESOURCE = MountableFile.forClasspathResource("/initialize-mongo-single.js");
public static final String INIT_SCRIPT_PATH = "/docker-entrypoint-initdb.d/initialize-mongo-single.js";
@BeforeClass
public static void checkCondition() {
Assume.assumeThat("Skipping DebeziumMongoDbConnectorResourceIT tests when assembly profile is not active!",
System.getProperty("isAssemblyProfileActive", "false"),
is("true"));
}
@Before
public void start() {
RestExtensionTestInfrastructure.setupDebeziumContainer(Module.version(), DebeziumMongoDbConnectRestExtension.class.getName());
DockerUtils.enableFakeDnsIfRequired();
RestExtensionTestInfrastructure.startContainers(DATABASE.MONGODB);
RestExtensionTestInfrastructure.getMongoDbContainer().execMongoScript(INIT_SCRIPT_RESOURCE, INIT_SCRIPT_PATH);
}
@After
public void stop() {
RestExtensionTestInfrastructure.stopContainers();
DockerUtils.disableFakeDns();
}
@Test
public void testValidConnection() {
ConnectorConfiguration config = getMongoDbConnectorConfiguration(1);
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumMongoDbConnectorResource.BASE_PATH + DebeziumMongoDbConnectorResource.VALIDATE_CONNECTION_ENDPOINT)
.then().log().all()
.statusCode(200)
.assertThat().body("status", equalTo("VALID"))
.body("validationResults.size()", is(0));
}
@Test
public void testInvalidIpConnection() {
ConnectorConfiguration config = getMongoDbConnectorConfiguration(1)
.with(MongoDbConnectorConfig.CONNECTION_STRING.name(), "mongodb://192.168.222.222:27017/?replicaSet=zz666");
Locale.setDefault(new Locale("en", "US")); // to enforce errormessages in English
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumMongoDbConnectorResource.BASE_PATH + DebeziumMongoDbConnectorResource.VALIDATE_CONNECTION_ENDPOINT)
.then().log().all()
.statusCode(200)
.assertThat().body("status", equalTo("INVALID"))
.body("validationResults.size()", is(1))
.rootPath("validationResults[0]")
.body("property", equalTo(MongoDbConnectorConfig.CONNECTION_STRING.name()))
.body("message", startsWith("Unable to connect: "));
}
@Test
public void testInvalidConnection() {
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body("{\"connector.class\": \"" + MongoDbConnector.class.getName() + "\"}")
.put(DebeziumMongoDbConnectorResource.BASE_PATH + DebeziumMongoDbConnectorResource.VALIDATE_CONNECTION_ENDPOINT)
.then().log().all()
.statusCode(200)
.assertThat().body("status", equalTo("INVALID"))
.body("validationResults.size()", is(2))
.body("validationResults", hasItems(
Map.of("property", MongoDbConnectorConfig.CONNECTION_STRING.name(), "message",
"The 'mongodb.connection.string' value is invalid: Missing connection string"),
Map.of("property", MongoDbConnectorConfig.TOPIC_PREFIX.name(), "message", "The 'topic.prefix' value is invalid: A value is required")));
}
public static ConnectorConfiguration getMongoDbConnectorConfiguration(int id, String... options) {
final ConnectorConfiguration config = ConnectorConfiguration.forMongoDbReplicaSet(RestExtensionTestInfrastructure.getMongoDbContainer())
.with(MongoDbConnectorConfig.SERVER_SELECTION_TIMEOUT_MS.name(), 10000)
.with(MongoDbConnectorConfig.SNAPSHOT_MODE.name(), "never") // temporarily disable snapshot mode globally until we can check if connectors inside testcontainers are in SNAPSHOT or STREAMING mode (wait for snapshot finished!)
.with(MongoDbConnectorConfig.USER.name(), "debezium")
.with(MongoDbConnectorConfig.PASSWORD.name(), "dbz")
.with(MongoDbConnectorConfig.TOPIC_PREFIX.name(), "mongo" + id);
if (options != null && options.length > 0) {
for (int i = 0; i < options.length; i += 2) {
config.with(options[i], options[i + 1]);
}
}
return config;
}
}

View File

@ -0,0 +1,73 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.rest;
import static io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure.DATABASE;
import static io.restassured.RestAssured.given;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.hasKey;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import io.debezium.connector.mongodb.Module;
import io.debezium.connector.mongodb.MongoDbConnector;
import io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure;
public class DebeziumMongoDbConnectorResourceNoDatabaseIT {
@BeforeClass
public static void checkCondition() {
Assume.assumeThat("Skipping DebeziumMongoDbConnectorResourceIT tests when assembly profile is not active!",
System.getProperty("isAssemblyProfileActive", "false"),
is("true"));
}
@Before
public void start() {
RestExtensionTestInfrastructure.setupDebeziumContainer(Module.version(), DebeziumMongoDbConnectRestExtension.class.getName());
RestExtensionTestInfrastructure.startContainers(DATABASE.NONE);
}
@After
public void stop() {
RestExtensionTestInfrastructure.stopContainers();
}
@Test
public void testVersionEndpoint() {
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when()
.get(DebeziumMongoDbConnectorResource.BASE_PATH + DebeziumMongoDbConnectorResource.VERSION_ENDPOINT)
.then().log().all()
.statusCode(200)
.body(is(Module.version()));
}
@Test
public void testSchemaEndpoint() {
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when()
.get(DebeziumMongoDbConnectorResource.BASE_PATH + DebeziumMongoDbConnectorResource.SCHEMA_ENDPOINT)
.then().log().all()
.statusCode(200)
.body("components.schemas.size()", is(1))
.rootPath("components.schemas.values()[0]")
.body("title", is("Debezium MongoDB Connector"))
.body("properties.size()", is(47))
.body("x-connector-id", is("mongodb"))
.body("x-version", is(Module.version()))
.body("x-className", is(MongoDbConnector.class.getName()))
.body("properties", hasKey("topic.prefix"))
.body("properties", hasKey("mongodb.connection.string"))
.body("properties", hasKey("snapshot.mode"));
}
}

View File

@ -0,0 +1,29 @@
ARG BASE_IMAGE
ARG DEBEZIUM_VERSION
FROM ${BASE_IMAGE}
ARG DEBEZIUM_VERSION
ENV CONNECTOR="mongodb"
RUN echo "Installing Debezium connectors version: ${DEBEZIUM_VERSION}" ; \
MAVEN_REPOSITORY="https://repo1.maven.org/maven2/io/debezium" ; \
if [[ "${DEBEZIUM_VERSION}" == *-SNAPSHOT ]] ; then \
MAVEN_REPOSITORY="https://oss.sonatype.org/content/repositories/snapshots/io/debezium" ; \
fi ; \
CONNECTOR_VERSION="${DEBEZIUM_VERSION}" ; \
for PACKAGE in {scripting,}; do \
local CONNECTOR_VERSION="${DEBEZIUM_VERSION}" ; \
if [[ "${DEBEZIUM_VERSION}" == *-SNAPSHOT ]] ; then \
CONNECTOR_VERSION=$(curl --silent -fSL "${MAVEN_REPOSITORY}/debezium-${PACKAGE}/${DEBEZIUM_VERSION}/maven-metadata.xml" | awk -F'<[^>]+>' '/<extension>tar.gz<\/extension>/ {getline; print $2; exit}'); \
fi ; \
echo "Downloading and installing debezium-${PACKAGE}-${CONNECTOR_VERSION}.tar.gz ..." ; \
curl --silent -fSL -o /tmp/package.tar.gz "${MAVEN_REPOSITORY}/debezium-${PACKAGE}/${DEBEZIUM_VERSION}/debezium-${PACKAGE}-${CONNECTOR_VERSION}.tar.gz" && \
echo "Extracting debezium-${PACKAGE}-${CONNECTOR_VERSION}.tar.gz ..." && \
tar -xzf /tmp/package.tar.gz -C $EXTERNAL_LIBS_DIR && \
echo "Successfully installed debezium-${PACKAGE}-${CONNECTOR_VERSION}!" ; \
rm -f /tmp/package.tar.gz ; \
done
COPY --chown=kafka:kafka debezium-connector-${CONNECTOR}-${DEBEZIUM_VERSION}-plugin.tar.gz /tmp/plugin.tar.gz
RUN tar -xvzf /tmp/plugin.tar.gz -C ${KAFKA_CONNECT_PLUGINS_DIR}/ ; rm -f /tmp/plugin.tar.gz

View File

@ -0,0 +1,68 @@
let error = true
let res = [
db = db.getSiblingDB('inventory'),
db.customers.drop(),
db.orders.drop(),
db.products.drop(),
db = db.getSiblingDB('admin'),
db.runCommand({
createRole: "listDatabases",
privileges: [
{ resource: { cluster : true }, actions: ["listDatabases"]}
],
roles: []
}),
db.createUser({
user: 'debezium',
pwd: 'dbz',
roles: [
{ role: "readWrite", db: "inventory" },
{ role: "read", db: "local" },
{ role: "listDatabases", db: "admin" },
{ role: "read", db: "config" },
{ role: "read", db: "admin" }
]
}),
db = db.getSiblingDB('inventory'),
db.products.insert([
{ _id : NumberLong("101"), name : 'scooter', description: 'Small 2-wheel scooter', weight : 3.14, quantity : NumberInt("3") },
{ _id : NumberLong("102"), name : 'car battery', description: '12V car battery', weight : 8.1, quantity : NumberInt("8") },
{ _id : NumberLong("103"), name : '12-pack drill bits', description: '12-pack of drill bits with sizes ranging from #40 to #3', weight : 0.8, quantity : NumberInt("18") },
{ _id : NumberLong("104"), name : 'hammer', description: "12oz carpenter's hammer", weight : 0.75, quantity : NumberInt("4") },
{ _id : NumberLong("105"), name : 'hammer', description: "14oz carpenter's hammer", weight : 0.875, quantity : NumberInt("5") },
{ _id : NumberLong("106"), name : 'hammer', description: "16oz carpenter's hammer", weight : 1.0, quantity : NumberInt("0") },
{ _id : NumberLong("107"), name : 'rocks', description: 'box of assorted rocks', weight : 5.3, quantity : NumberInt("44") },
{ _id : NumberLong("108"), name : 'jacket', description: 'water resistent black wind breaker', weight : 0.1, quantity : NumberInt("2") },
{ _id : NumberLong("109"), name : 'spare tire', description: '24 inch spare tire', weight : 22.2, quantity : NumberInt("5") }
]),
db.customers.insert([
{ _id : NumberLong("1001"), first_name : 'Sally', last_name : 'Thomas', email : 'sally.thomas@acme.com' },
{ _id : NumberLong("1002"), first_name : 'George', last_name : 'Bailey', email : 'gbailey@foobar.com' },
{ _id : NumberLong("1003"), first_name : 'Edward', last_name : 'Walker', email : 'ed@walker.com' },
{ _id : NumberLong("1004"), first_name : 'Anne', last_name : 'Kretchmar', email : 'annek@noanswer.org' }
]),
db.orders.insert([
{ _id : NumberLong("10001"), order_date : new ISODate("2016-01-16T00:00:00Z"), purchaser_id : NumberLong("1001"), quantity : NumberInt("1"), product_id : NumberLong("102") },
{ _id : NumberLong("10002"), order_date : new ISODate("2016-01-17T00:00:00Z"), purchaser_id : NumberLong("1002"), quantity : NumberInt("2"), product_id : NumberLong("105") },
{ _id : NumberLong("10003"), order_date : new ISODate("2016-02-19T00:00:00Z"), purchaser_id : NumberLong("1002"), quantity : NumberInt("2"), product_id : NumberLong("106") },
{ _id : NumberLong("10004"), order_date : new ISODate("2016-02-21T00:00:00Z"), purchaser_id : NumberLong("1003"), quantity : NumberInt("1"), product_id : NumberLong("107") }
]),
error = false
]
printjson(res)
if (error) {
print('Error, exiting!')
quit(1)
}

View File

@ -142,6 +142,11 @@
<artifactId>mysql</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
<!-- Used for unit testing with Kafka -->
<dependency>
@ -566,6 +571,9 @@
<goal>generate-api-spec</goal>
</goals>
<phase>prepare-package</phase>
<configuration>
<outputDirectory>${project.build.outputDirectory}/META-INF/resources/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
@ -674,6 +682,7 @@
<database.replica.port>${mysql.port}</database.replica.port>
<database.ssl.mode>disabled</database.ssl.mode>
<skipLongRunningTests>false</skipLongRunningTests>
<isAssemblyProfileActive>true</isAssemblyProfileActive>
</systemPropertyVariables>
<runOrder>${runOrder}</runOrder>
</configuration>

View File

@ -0,0 +1,48 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql.rest;
import java.util.Map;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
/**
* A Kafka Connect REST extension that enables some advanced features over
* Kafka Connect's REST interface:
* + report available transformations and their configuration
* + return if topic auto-creation is available and enabled
*
* To install this extension put the jar file into a separate Kafka Connect
* plugin dir and configure your Kafka Connect properties file with:
*
* `rest.extension.classes=io.debezium.connector.mysql.rest.DebeziumMySqlConnectRestExtension`
*
*/
public class DebeziumMySqlConnectRestExtension implements ConnectRestExtension {
private Map<String, ?> config;
@Override
public void register(ConnectRestExtensionContext restPluginContext) {
restPluginContext.configurable().register(new DebeziumMySqlConnectorResource());
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
this.config = configs;
}
@Override
public String version() {
return AppInfoParser.getVersion();
}
}

View File

@ -0,0 +1,51 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql.rest;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import org.apache.kafka.connect.connector.Connector;
import io.debezium.connector.mysql.Module;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.rest.ConnectionValidationResource;
import io.debezium.rest.SchemaResource;
/**
* A JAX-RS Resource class defining endpoints of the Debezium MySQL Connect REST Extension
*
*/
@Path(DebeziumMySqlConnectorResource.BASE_PATH)
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public class DebeziumMySqlConnectorResource implements SchemaResource, ConnectionValidationResource {
public static final String BASE_PATH = "/debezium/mysql";
public static final String VERSION_ENDPOINT = "/version";
public static final String VALIDATE_PROPERTIES_ENDPOINT = "/validate/properties";
public static final String VALIDATE_CONNECTOR_ENDPOINT = "/validate/connector";
@Override
public String getSchemaFilePath() {
return "/META-INF/resources/mysql.json";
}
@Override
public Connector getConnector() {
return new MySqlConnector();
}
@GET
@Path(VERSION_ENDPOINT)
public String getConnectorVersion() {
return Module.version();
}
}

View File

@ -0,0 +1 @@
io.debezium.connector.mysql.rest.DebeziumMySqlConnectRestExtension

View File

@ -0,0 +1,117 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql.rest;
import static io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure.DATABASE;
import static io.restassured.RestAssured.given;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.startsWith;
import java.util.Locale;
import java.util.Map;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import io.debezium.connector.mysql.Module;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.storage.kafka.history.KafkaSchemaHistory;
import io.debezium.testing.testcontainers.ConnectorConfiguration;
import io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure;
import io.restassured.http.ContentType;
public class DebeziumMySqlConnectorResourceIT {
@BeforeClass
public static void checkCondition() {
Assume.assumeThat("Skipping DebeziumMySqlConnectorResourceIT tests when assembly profile is not active!", System.getProperty("isAssemblyProfileActive", "false"),
is("true"));
}
@Before
public void start() {
RestExtensionTestInfrastructure.setupDebeziumContainer(Module.version(), DebeziumMySqlConnectRestExtension.class.getName());
RestExtensionTestInfrastructure.startContainers(DATABASE.MYSQL);
}
@After
public void stop() {
RestExtensionTestInfrastructure.stopContainers();
}
@Test
public void testValidConnection() {
ConnectorConfiguration config = getMySqlConnectorConfiguration(1);
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumMySqlConnectorResource.BASE_PATH + DebeziumMySqlConnectorResource.VALIDATE_CONNECTION_ENDPOINT)
.then().log().all()
.statusCode(200)
.assertThat().body("status", equalTo("VALID"))
.body("validationResults.size()", is(0));
}
@Test
public void testInvalidHostnameConnection() {
ConnectorConfiguration config = getMySqlConnectorConfiguration(1).with(MySqlConnectorConfig.HOSTNAME.name(), "zzzzzzzzzz");
Locale.setDefault(new Locale("en", "US")); // to enforce errormessages in English
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumMySqlConnectorResource.BASE_PATH + DebeziumMySqlConnectorResource.VALIDATE_CONNECTION_ENDPOINT)
.then().log().all()
.statusCode(200)
.assertThat().body("status", equalTo("INVALID"))
.body("validationResults.size()", is(1))
.rootPath("validationResults[0]")
.body("property", equalTo(MySqlConnectorConfig.HOSTNAME.name()))
.body("message", startsWith("Unable to connect: Communications link failure"));
}
@Test
public void testInvalidConnection() {
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body("{\"connector.class\": \"" + MySqlConnector.class.getName() + "\"}")
.put(DebeziumMySqlConnectorResource.BASE_PATH + DebeziumMySqlConnectorResource.VALIDATE_CONNECTION_ENDPOINT)
.then().log().all()
.statusCode(200)
.assertThat().body("status", equalTo("INVALID"))
.body("validationResults.size()", is(4))
.body("validationResults",
hasItems(
Map.of("property", MySqlConnectorConfig.USER.name(), "message", "The 'database.user' value is invalid: A value is required"),
Map.of("property", MySqlConnectorConfig.TOPIC_PREFIX.name(), "message", "The 'topic.prefix' value is invalid: A value is required"),
Map.of("property", MySqlConnectorConfig.SERVER_ID.name(), "message", "The 'database.server.id' value is invalid: A value is required"),
Map.of("property", MySqlConnectorConfig.HOSTNAME.name(), "message", "The 'database.hostname' value is invalid: A value is required")));
}
public static ConnectorConfiguration getMySqlConnectorConfiguration(int id, String... options) {
final ConnectorConfiguration config = ConnectorConfiguration.forJdbcContainer(RestExtensionTestInfrastructure.getMySqlContainer())
.with(MySqlConnectorConfig.SNAPSHOT_MODE.name(), "never") // temporarily disable snapshot mode globally until we can check if connectors inside testcontainers are in SNAPSHOT or STREAMING mode (wait for snapshot finished!)
.with(MySqlConnectorConfig.TOPIC_PREFIX.name(), "dbserver" + id)
.with(KafkaSchemaHistory.BOOTSTRAP_SERVERS.name(), RestExtensionTestInfrastructure.KAFKA_HOSTNAME + ":9092")
.with(KafkaSchemaHistory.TOPIC.name(), "dbhistory.inventory")
.with(MySqlConnectorConfig.SERVER_ID.name(), Long.valueOf(5555 + id - 1));
if (options != null && options.length > 0) {
for (int i = 0; i < options.length; i += 2) {
config.with(options[i], options[i + 1]);
}
}
return config;
}
}

View File

@ -0,0 +1,72 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql.rest;
import static io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure.DATABASE;
import static io.restassured.RestAssured.given;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.hasKey;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import io.debezium.connector.mysql.Module;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure;
public class DebeziumMySqlConnectorResourceNoDatabaseIT {
@BeforeClass
public static void checkCondition() {
Assume.assumeThat("Skipping DebeziumMySqlConnectorResourceIT tests when assembly profile is not active!", System.getProperty("isAssemblyProfileActive", "false"),
is("true"));
}
@Before
public void start() {
RestExtensionTestInfrastructure.setupDebeziumContainer(Module.version(), DebeziumMySqlConnectRestExtension.class.getName());
RestExtensionTestInfrastructure.startContainers(DATABASE.NONE);
}
@After
public void stop() {
RestExtensionTestInfrastructure.stopContainers();
}
@Test
public void testVersionEndpoint() {
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when()
.get(DebeziumMySqlConnectorResource.BASE_PATH + DebeziumMySqlConnectorResource.VERSION_ENDPOINT)
.then().log().all()
.statusCode(200)
.body(is(Module.version()));
}
@Test
public void testSchemaEndpoint() {
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when()
.get(DebeziumMySqlConnectorResource.BASE_PATH + DebeziumMySqlConnectorResource.SCHEMA_ENDPOINT)
.then().log().all()
.statusCode(200)
.body("components.schemas.size()", is(1))
.rootPath("components.schemas.values()[0]")
.body("title", is("Debezium MySQL Connector"))
.body("properties.size()", is(82))
.body("x-connector-id", is("mysql"))
.body("x-version", is(Module.version()))
.body("x-className", is(MySqlConnector.class.getName()))
.body("properties", hasKey("topic.prefix"))
.body("properties", hasKey("database.server.id"))
.body("properties", hasKey("snapshot.mode"));
}
}

View File

@ -0,0 +1,29 @@
ARG BASE_IMAGE
ARG DEBEZIUM_VERSION
FROM ${BASE_IMAGE}
ARG DEBEZIUM_VERSION
ENV CONNECTOR="mysql"
RUN echo "Installing Debezium connectors version: ${DEBEZIUM_VERSION}" ; \
MAVEN_REPOSITORY="https://repo1.maven.org/maven2/io/debezium" ; \
if [[ "${DEBEZIUM_VERSION}" == *-SNAPSHOT ]] ; then \
MAVEN_REPOSITORY="https://oss.sonatype.org/content/repositories/snapshots/io/debezium" ; \
fi ; \
CONNECTOR_VERSION="${DEBEZIUM_VERSION}" ; \
for PACKAGE in {scripting,}; do \
local CONNECTOR_VERSION="${DEBEZIUM_VERSION}" ; \
if [[ "${DEBEZIUM_VERSION}" == *-SNAPSHOT ]] ; then \
CONNECTOR_VERSION=$(curl --silent -fSL "${MAVEN_REPOSITORY}/debezium-${PACKAGE}/${DEBEZIUM_VERSION}/maven-metadata.xml" | awk -F'<[^>]+>' '/<extension>tar.gz<\/extension>/ {getline; print $2; exit}'); \
fi ; \
echo "Downloading and installing debezium-${PACKAGE}-${CONNECTOR_VERSION}.tar.gz ..." ; \
curl --silent -fSL -o /tmp/package.tar.gz "${MAVEN_REPOSITORY}/debezium-${PACKAGE}/${DEBEZIUM_VERSION}/debezium-${PACKAGE}-${CONNECTOR_VERSION}.tar.gz" && \
echo "Extracting debezium-${PACKAGE}-${CONNECTOR_VERSION}.tar.gz ..." && \
tar -xzf /tmp/package.tar.gz -C $EXTERNAL_LIBS_DIR && \
echo "Successfully installed debezium-${PACKAGE}-${CONNECTOR_VERSION}!" ; \
rm -f /tmp/package.tar.gz ; \
done
COPY --chown=kafka:kafka debezium-connector-${CONNECTOR}-${DEBEZIUM_VERSION}-plugin.tar.gz /tmp/plugin.tar.gz
RUN tar -xvzf /tmp/plugin.tar.gz -C ${KAFKA_CONNECT_PLUGINS_DIR}/ ; rm -f /tmp/plugin.tar.gz

View File

@ -287,6 +287,9 @@
<goal>generate-api-spec</goal>
</goals>
<phase>prepare-package</phase>
<configuration>
<outputDirectory>${project.build.outputDirectory}/META-INF/resources/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>

View File

@ -0,0 +1,48 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.rest;
import java.util.Map;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
/**
* A Kafka Connect REST extension that enables some advanced features over
* Kafka Connect's REST interface:
* + report available transformations and their configuration
* + return if topic auto-creation is available and enabled
*
* To install this extension put the jar file into a separate Kafka Connect
* plugin dir and configure your Kafka Connect properties file with:
*
* `rest.extension.classes=io.debezium.connector.oracle.rest.DebeziumOracleConnectRestExtension`
*
*/
public class DebeziumOracleConnectRestExtension implements ConnectRestExtension {
private Map<String, ?> config;
@Override
public void register(ConnectRestExtensionContext restPluginContext) {
restPluginContext.configurable().register(new DebeziumOracleConnectorResource());
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
this.config = configs;
}
@Override
public String version() {
return AppInfoParser.getVersion();
}
}

View File

@ -0,0 +1,50 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.rest;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import org.apache.kafka.connect.connector.Connector;
import io.debezium.connector.oracle.Module;
import io.debezium.connector.oracle.OracleConnector;
import io.debezium.rest.ConnectionValidationResource;
import io.debezium.rest.SchemaResource;
/**
* A JAX-RS Resource class defining endpoints of the Debezium Oracle Connect REST Extension
*
*/
@Path(DebeziumOracleConnectorResource.BASE_PATH)
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public class DebeziumOracleConnectorResource implements SchemaResource, ConnectionValidationResource {
public static final String BASE_PATH = "/debezium/oracle";
public static final String VERSION_ENDPOINT = "/version";
public static final String VALIDATE_PROPERTIES_ENDPOINT = "/validate/properties";
public static final String VALIDATE_CONNECTOR_ENDPOINT = "/validate/connector";
@Override
public String getSchemaFilePath() {
return "/META-INF/resources/oracle.json";
}
@Override
public Connector getConnector() {
return new OracleConnector();
}
@GET
@Path(VERSION_ENDPOINT)
public String getConnectorVersion() {
return Module.version();
}
}

View File

@ -0,0 +1 @@
io.debezium.connector.oracle.rest.DebeziumOracleConnectRestExtension

View File

@ -145,6 +145,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
<!-- Used for unit testing with Kafka -->
<dependency>
@ -310,6 +315,9 @@
<goal>generate-api-spec</goal>
</goals>
<phase>prepare-package</phase>
<configuration>
<outputDirectory>${project.build.outputDirectory}/META-INF/resources/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
@ -376,6 +384,16 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>${version.failsafe.plugin}</version>
<configuration>
<systemPropertyVariables>
<isAssemblyProfileActive>true</isAssemblyProfileActive>
</systemPropertyVariables>
</configuration>
</plugin>
</plugins>
</build>
</profile>

View File

@ -0,0 +1,48 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.rest;
import java.util.Map;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
/**
* A Kafka Connect REST extension that enables some advanced features over
* Kafka Connect's REST interface:
* + report available transformations and their configuration
* + return if topic auto-creation is available and enabled
*
* To install this extension put the jar file into a separate Kafka Connect
* plugin dir and configure your Kafka Connect properties file with:
*
* `rest.extension.classes=io.debezium.connector.postgresql.rest.DebeziumPostgresConnectRestExtension`
*
*/
public class DebeziumPostgresConnectRestExtension implements ConnectRestExtension {
private Map<String, ?> config;
@Override
public void register(ConnectRestExtensionContext restPluginContext) {
restPluginContext.configurable().register(new DebeziumPostgresConnectorResource());
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
this.config = configs;
}
@Override
public String version() {
return AppInfoParser.getVersion();
}
}

View File

@ -0,0 +1,48 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.rest;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import org.apache.kafka.connect.connector.Connector;
import io.debezium.connector.postgresql.Module;
import io.debezium.connector.postgresql.PostgresConnector;
import io.debezium.rest.ConnectionValidationResource;
import io.debezium.rest.SchemaResource;
/**
* A JAX-RS Resource class defining endpoints of the Debezium Postgres Connect REST Extension
*
*/
@Path(DebeziumPostgresConnectorResource.BASE_PATH)
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public class DebeziumPostgresConnectorResource implements SchemaResource, ConnectionValidationResource {
public static final String BASE_PATH = "/debezium/postgres";
public static final String VERSION_ENDPOINT = "/version";
@GET
@Path(VERSION_ENDPOINT)
public String getConnectorVersion() {
return Module.version();
}
@Override
public Connector getConnector() {
return new PostgresConnector();
}
@Override
public String getSchemaFilePath() {
return "/META-INF/resources/postgres.json";
}
}

View File

@ -0,0 +1 @@
io.debezium.connector.postgresql.rest.DebeziumPostgresConnectRestExtension

View File

@ -0,0 +1,115 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.rest;
import static io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure.DATABASE;
import static io.restassured.RestAssured.given;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.CoreMatchers.is;
import java.util.Locale;
import java.util.Map;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import io.debezium.connector.postgresql.Module;
import io.debezium.connector.postgresql.PostgresConnector;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.testing.testcontainers.ConnectorConfiguration;
import io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure;
import io.restassured.http.ContentType;
public class DebeziumPostgresConnectorResourceIT {
@BeforeClass
public static void checkCondition() {
Assume.assumeThat("Skipping DebeziumPostgresConnectorResourceIT tests when assembly profile is not active!",
System.getProperty("isAssemblyProfileActive", "false"),
is("true"));
}
@Before
public void start() {
RestExtensionTestInfrastructure.setupDebeziumContainer(Module.version(), DebeziumPostgresConnectRestExtension.class.getName());
RestExtensionTestInfrastructure.startContainers(DATABASE.POSTGRES);
}
@After
public void stop() {
RestExtensionTestInfrastructure.stopContainers();
}
@Test
public void testValidConnection() {
ConnectorConfiguration config = getPostgresConnectorConfiguration(1);
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumPostgresConnectorResource.BASE_PATH + DebeziumPostgresConnectorResource.VALIDATE_CONNECTION_ENDPOINT)
.then().log().all()
.statusCode(200)
.assertThat().body("status", equalTo("VALID"))
.body("validationResults.size()", is(0));
}
@Test
public void testInvalidHostnameConnection() {
ConnectorConfiguration config = getPostgresConnectorConfiguration(1).with(PostgresConnectorConfig.HOSTNAME.name(), "zzzzzzzzzz");
Locale.setDefault(new Locale("en", "US")); // to enforce errormessages in English
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumPostgresConnectorResource.BASE_PATH + DebeziumPostgresConnectorResource.VALIDATE_CONNECTION_ENDPOINT)
.then().log().all()
.statusCode(200)
.assertThat().body("status", equalTo("INVALID"))
.body("validationResults.size()", is(1))
.rootPath("validationResults[0]")
.body("property", is(PostgresConnectorConfig.HOSTNAME.name()))
.body("message", is("Error while validating connector config: The connection attempt failed."));
}
@Test
public void testInvalidConnection() {
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body("{\"connector.class\": \"" + PostgresConnector.class.getName() + "\"}")
.put(DebeziumPostgresConnectorResource.BASE_PATH + DebeziumPostgresConnectorResource.VALIDATE_CONNECTION_ENDPOINT)
.then().log().all()
.statusCode(200)
.assertThat().body("status", equalTo("INVALID"))
.body("validationResults.size()", is(4))
.body("validationResults",
hasItems(
Map.of("property", PostgresConnectorConfig.USER.name(), "message", "The 'database.user' value is invalid: A value is required"),
Map.of("property", PostgresConnectorConfig.DATABASE_NAME.name(), "message",
"The 'database.dbname' value is invalid: A value is required"),
Map.of("property", PostgresConnectorConfig.TOPIC_PREFIX.name(), "message", "The 'topic.prefix' value is invalid: A value is required"),
Map.of("property", PostgresConnectorConfig.HOSTNAME.name(), "message", "The 'database.hostname' value is invalid: A value is required")));
}
private static ConnectorConfiguration getPostgresConnectorConfiguration(int id, String... options) {
final ConnectorConfiguration config = ConnectorConfiguration.forJdbcContainer(RestExtensionTestInfrastructure.getPostgresContainer())
.with(PostgresConnectorConfig.SNAPSHOT_MODE.name(), "never") // temporarily disable snapshot mode globally until we can check if connectors inside testcontainers are in SNAPSHOT or STREAMING mode (wait for snapshot finished!)
.with(PostgresConnectorConfig.TOPIC_PREFIX.name(), "dbserver" + id)
.with(PostgresConnectorConfig.SLOT_NAME.name(), "debezium_" + id);
if (options != null && options.length > 0) {
for (int i = 0; i < options.length; i += 2) {
config.with(options[i], options[i + 1]);
}
}
return config;
}
}

View File

@ -0,0 +1,74 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.rest;
import static io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure.DATABASE;
import static io.restassured.RestAssured.given;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.hasKey;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import io.debezium.connector.postgresql.Module;
import io.debezium.connector.postgresql.PostgresConnector;
import io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure;
public class DebeziumPostgresConnectorResourceNoDatabaseIT {
@BeforeClass
public static void checkCondition() {
Assume.assumeThat("Skipping DebeziumPostgresConnectorResourceIT tests when assembly profile is not active!",
System.getProperty("isAssemblyProfileActive", "false"),
is("true"));
}
@Before
public void start() {
RestExtensionTestInfrastructure.setupDebeziumContainer(Module.version(), DebeziumPostgresConnectRestExtension.class.getName());
RestExtensionTestInfrastructure.startContainers(DATABASE.NONE);
}
@After
public void stop() {
RestExtensionTestInfrastructure.stopContainers();
}
@Test
public void testVersionEndpoint() {
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when()
.get(DebeziumPostgresConnectorResource.BASE_PATH + DebeziumPostgresConnectorResource.VERSION_ENDPOINT)
.then().log().all()
.statusCode(200)
.body(is(Module.version()));
}
@Test
public void testSchemaEndpoint() {
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when()
.get(DebeziumPostgresConnectorResource.BASE_PATH + DebeziumPostgresConnectorResource.SCHEMA_ENDPOINT)
.then().log().all()
.statusCode(200)
.body("components.schemas.size()", is(1))
.rootPath("components.schemas.values()[0]")
.body("title", is("Debezium PostgreSQL Connector"))
.body("properties.size()", is(81))
.body("x-connector-id", is("postgres"))
.body("x-version", is(Module.version()))
.body("x-className", is(PostgresConnector.class.getName()))
.body("properties", hasKey("topic.prefix"))
.body("properties", hasKey("plugin.name"))
.body("properties", hasKey("slot.name"))
.body("properties", hasKey("snapshot.mode"));
}
}

View File

@ -0,0 +1,29 @@
ARG BASE_IMAGE
ARG DEBEZIUM_VERSION
FROM ${BASE_IMAGE}
ARG DEBEZIUM_VERSION
ENV CONNECTOR="postgres"
RUN echo "Installing Debezium connectors version: ${DEBEZIUM_VERSION}" ; \
MAVEN_REPOSITORY="https://repo1.maven.org/maven2/io/debezium" ; \
if [[ "${DEBEZIUM_VERSION}" == *-SNAPSHOT ]] ; then \
MAVEN_REPOSITORY="https://oss.sonatype.org/content/repositories/snapshots/io/debezium" ; \
fi ; \
CONNECTOR_VERSION="${DEBEZIUM_VERSION}" ; \
for PACKAGE in {scripting,}; do \
local CONNECTOR_VERSION="${DEBEZIUM_VERSION}" ; \
if [[ "${DEBEZIUM_VERSION}" == *-SNAPSHOT ]] ; then \
CONNECTOR_VERSION=$(curl --silent -fSL "${MAVEN_REPOSITORY}/debezium-${PACKAGE}/${DEBEZIUM_VERSION}/maven-metadata.xml" | awk -F'<[^>]+>' '/<extension>tar.gz<\/extension>/ {getline; print $2; exit}'); \
fi ; \
echo "Downloading and installing debezium-${PACKAGE}-${CONNECTOR_VERSION}.tar.gz ..." ; \
curl --silent -fSL -o /tmp/package.tar.gz "${MAVEN_REPOSITORY}/debezium-${PACKAGE}/${DEBEZIUM_VERSION}/debezium-${PACKAGE}-${CONNECTOR_VERSION}.tar.gz" && \
echo "Extracting debezium-${PACKAGE}-${CONNECTOR_VERSION}.tar.gz ..." && \
tar -xzf /tmp/package.tar.gz -C $EXTERNAL_LIBS_DIR && \
echo "Successfully installed debezium-${PACKAGE}-${CONNECTOR_VERSION}!" ; \
rm -f /tmp/package.tar.gz ; \
done
COPY --chown=kafka:kafka debezium-connector-${CONNECTOR}-${DEBEZIUM_VERSION}-plugin.tar.gz /tmp/plugin.tar.gz
RUN tar -xvzf /tmp/plugin.tar.gz -C ${KAFKA_CONNECT_PLUGINS_DIR}/ ; rm -f /tmp/plugin.tar.gz

View File

@ -111,6 +111,26 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-testing-testcontainers</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-launcher</artifactId>
</exclusion>
<exclusion>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
@ -251,6 +271,9 @@
<goal>generate-api-spec</goal>
</goals>
<phase>prepare-package</phase>
<configuration>
<outputDirectory>${project.build.outputDirectory}/META-INF/resources/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
@ -317,6 +340,16 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>${version.failsafe.plugin}</version>
<configuration>
<systemPropertyVariables>
<isAssemblyProfileActive>true</isAssemblyProfileActive>
</systemPropertyVariables>
</configuration>
</plugin>
</plugins>
</build>
</profile>

View File

@ -0,0 +1,48 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver.rest;
import java.util.Map;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
/**
* A Kafka Connect REST extension that enables some advanced features over
* Kafka Connect's REST interface:
* + report available transformations and their configuration
* + return if topic auto-creation is available and enabled
*
* To install this extension put the jar file into a separate Kafka Connect
* plugin dir and configure your Kafka Connect properties file with:
*
* `rest.extension.classes=io.debezium.connector.sqlserver.rest.DebeziumSqlServerConnectRestExtension`
*
*/
public class DebeziumSqlServerConnectRestExtension implements ConnectRestExtension {
private Map<String, ?> config;
@Override
public void register(ConnectRestExtensionContext restPluginContext) {
restPluginContext.configurable().register(new DebeziumSqlServerConnectorResource());
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
this.config = configs;
}
@Override
public String version() {
return AppInfoParser.getVersion();
}
}

View File

@ -0,0 +1,49 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver.rest;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import org.apache.kafka.connect.connector.Connector;
import io.debezium.connector.sqlserver.Module;
import io.debezium.connector.sqlserver.SqlServerConnector;
import io.debezium.rest.ConnectionValidationResource;
import io.debezium.rest.SchemaResource;
/**
* A JAX-RS Resource class defining endpoints of the Debezium SQL Server Connect REST Extension
*
*/
@Path(DebeziumSqlServerConnectorResource.BASE_PATH)
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public class DebeziumSqlServerConnectorResource implements SchemaResource, ConnectionValidationResource {
public static final String BASE_PATH = "/debezium/sqlserver";
public static final String VERSION_ENDPOINT = "/version";
@GET
@Path(VERSION_ENDPOINT)
public String getConnectorVersion() {
return Module.version();
}
@Override
public Connector getConnector() {
return new SqlServerConnector();
}
@Override
public String getSchemaFilePath() {
return "/META-INF/resources/sqlserver.json";
}
}

View File

@ -0,0 +1 @@
io.debezium.connector.sqlserver.rest.DebeziumSqlServerConnectRestExtension

View File

@ -0,0 +1,124 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver.rest;
import static io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure.DATABASE;
import static io.restassured.RestAssured.given;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.startsWith;
import java.net.URISyntaxException;
import java.util.Locale;
import java.util.Map;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import io.debezium.connector.sqlserver.Module;
import io.debezium.connector.sqlserver.SqlServerConnector;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
import io.debezium.storage.kafka.history.KafkaSchemaHistory;
import io.debezium.testing.testcontainers.ConnectorConfiguration;
import io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure;
import io.restassured.http.ContentType;
public class DebeziumSqlServerConnectorResourceIT {
@BeforeClass
public static void checkCondition() {
Assume.assumeThat("Skipping DebeziumSqlServerConnectorResourceIT tests when assembly profile is not active!",
System.getProperty("isAssemblyProfileActive", "false"),
is("true"));
}
@Before
public void start() throws URISyntaxException {
RestExtensionTestInfrastructure.setupDebeziumContainer(Module.version(), DebeziumSqlServerConnectRestExtension.class.getName());
RestExtensionTestInfrastructure.startContainers(DATABASE.SQLSERVER);
}
@After
public void stop() {
RestExtensionTestInfrastructure.stopContainers();
}
@Test
public void testValidConnection() {
ConnectorConfiguration config = getSqlServerConnectorConfiguration(1);
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumSqlServerConnectorResource.BASE_PATH + DebeziumSqlServerConnectorResource.VALIDATE_CONNECTION_ENDPOINT)
.then().log().all()
.statusCode(200)
.assertThat().body("status", equalTo("VALID"))
.body("validationResults.size()", is(0));
}
@Test
public void testInvalidHostnameConnection() {
ConnectorConfiguration config = getSqlServerConnectorConfiguration(1).with(SqlServerConnectorConfig.HOSTNAME.name(), "zzzzzzzzzz");
Locale.setDefault(new Locale("en", "US")); // to enforce errormessages in English
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumSqlServerConnectorResource.BASE_PATH + DebeziumSqlServerConnectorResource.VALIDATE_CONNECTION_ENDPOINT)
.then().log().all()
.statusCode(200)
.assertThat().body("status", equalTo("INVALID"))
.body("validationResults.size()", is(1))
.rootPath("validationResults[0]")
.body("property", is(SqlServerConnectorConfig.HOSTNAME.name()))
.body("message", startsWith(
"Unable to connect. Check this and other connection properties. Error: The TCP/IP connection to the host zzzzzzzzzz, port 1433 has failed."));
}
@Test
public void testInvalidConnection() {
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body("{\"connector.class\": \"" + SqlServerConnector.class.getName() + "\"}")
.put(DebeziumSqlServerConnectorResource.BASE_PATH + DebeziumSqlServerConnectorResource.VALIDATE_CONNECTION_ENDPOINT)
.then().log().all()
.statusCode(200)
.assertThat().body("status", equalTo("INVALID"))
.body("validationResults.size()", is(3))
.body("validationResults",
hasItems(
Map.of("property", SqlServerConnectorConfig.DATABASE_NAMES.name(), "message",
"The 'database.names' value is invalid: Cannot be empty"),
Map.of("property", SqlServerConnectorConfig.TOPIC_PREFIX.name(), "message", "The 'topic.prefix' value is invalid: A value is required"),
Map.of("property", SqlServerConnectorConfig.HOSTNAME.name(), "message",
"The 'database.hostname' value is invalid: A value is required")));
}
public static ConnectorConfiguration getSqlServerConnectorConfiguration(int id, String... options) {
final ConnectorConfiguration config = ConnectorConfiguration.forJdbcContainer(RestExtensionTestInfrastructure.getSqlServerContainer())
.with(ConnectorConfiguration.USER, "sa")
.with(ConnectorConfiguration.PASSWORD, "Password!")
.with(KafkaSchemaHistory.BOOTSTRAP_SERVERS.name(), RestExtensionTestInfrastructure.KAFKA_HOSTNAME + ":9092")
.with(KafkaSchemaHistory.TOPIC.name(), "dbhistory.inventory")
.with(SqlServerConnectorConfig.DATABASE_NAMES.name(), "testDB,testDB2")
.with(SqlServerConnectorConfig.SNAPSHOT_MODE.name(), "initial")
.with(SqlServerConnectorConfig.TOPIC_PREFIX.name(), "dbserver" + id)
.with("database.encrypt", false);
if (options != null && options.length > 0) {
for (int i = 0; i < options.length; i += 2) {
config.with(options[i], options[i + 1]);
}
}
return config;
}
}

View File

@ -0,0 +1,73 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver.rest;
import static io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure.DATABASE;
import static io.restassured.RestAssured.given;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.hasKey;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import io.debezium.connector.sqlserver.Module;
import io.debezium.connector.sqlserver.SqlServerConnector;
import io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure;
public class DebeziumSqlServerConnectorResourceNoDatabaseIT {
@BeforeClass
public static void checkCondition() {
Assume.assumeThat("Skipping DebeziumSqlServerConnectorResourceIT tests when assembly profile is not active!",
System.getProperty("isAssemblyProfileActive", "false"),
is("true"));
}
@Before
public void start() {
RestExtensionTestInfrastructure.setupDebeziumContainer(Module.version(), DebeziumSqlServerConnectRestExtension.class.getName());
RestExtensionTestInfrastructure.startContainers(DATABASE.NONE);
}
@After
public void stop() {
RestExtensionTestInfrastructure.stopContainers();
}
@Test
public void testVersionEndpoint() {
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when()
.get(DebeziumSqlServerConnectorResource.BASE_PATH + DebeziumSqlServerConnectorResource.VERSION_ENDPOINT)
.then().log().all()
.statusCode(200)
.body(is(Module.version()));
}
@Test
public void testSchemaEndpoint() {
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when()
.get(DebeziumSqlServerConnectorResource.BASE_PATH + DebeziumSqlServerConnectorResource.SCHEMA_ENDPOINT)
.then().log().all()
.statusCode(200)
.body("components.schemas.size()", is(1))
.rootPath("components.schemas.values()[0]")
.body("title", is("Debezium SQLServer Connector"))
.body("properties.size()", is(59))
.body("x-connector-id", is("sqlserver"))
.body("x-version", is(Module.version()))
.body("x-className", is(SqlServerConnector.class.getName()))
.body("properties", hasKey("topic.prefix"))
.body("properties", hasKey("database.instance"))
.body("properties", hasKey("snapshot.mode"));
}
}

View File

@ -0,0 +1,29 @@
ARG BASE_IMAGE
ARG DEBEZIUM_VERSION
FROM ${BASE_IMAGE}
ARG DEBEZIUM_VERSION
ENV CONNECTOR="sqlserver"
RUN echo "Installing Debezium connectors version: ${DEBEZIUM_VERSION}" ; \
MAVEN_REPOSITORY="https://repo1.maven.org/maven2/io/debezium" ; \
if [[ "${DEBEZIUM_VERSION}" == *-SNAPSHOT ]] ; then \
MAVEN_REPOSITORY="https://oss.sonatype.org/content/repositories/snapshots/io/debezium" ; \
fi ; \
CONNECTOR_VERSION="${DEBEZIUM_VERSION}" ; \
for PACKAGE in {scripting,}; do \
local CONNECTOR_VERSION="${DEBEZIUM_VERSION}" ; \
if [[ "${DEBEZIUM_VERSION}" == *-SNAPSHOT ]] ; then \
CONNECTOR_VERSION=$(curl --silent -fSL "${MAVEN_REPOSITORY}/debezium-${PACKAGE}/${DEBEZIUM_VERSION}/maven-metadata.xml" | awk -F'<[^>]+>' '/<extension>tar.gz<\/extension>/ {getline; print $2; exit}'); \
fi ; \
echo "Downloading and installing debezium-${PACKAGE}-${CONNECTOR_VERSION}.tar.gz ..." ; \
curl --silent -fSL -o /tmp/package.tar.gz "${MAVEN_REPOSITORY}/debezium-${PACKAGE}/${DEBEZIUM_VERSION}/debezium-${PACKAGE}-${CONNECTOR_VERSION}.tar.gz" && \
echo "Extracting debezium-${PACKAGE}-${CONNECTOR_VERSION}.tar.gz ..." && \
tar -xzf /tmp/package.tar.gz -C $EXTERNAL_LIBS_DIR && \
echo "Successfully installed debezium-${PACKAGE}-${CONNECTOR_VERSION}!" ; \
rm -f /tmp/package.tar.gz ; \
done
COPY --chown=kafka:kafka debezium-connector-${CONNECTOR}-${DEBEZIUM_VERSION}-plugin.tar.gz /tmp/plugin.tar.gz
RUN tar -xvzf /tmp/plugin.tar.gz -C ${KAFKA_CONNECT_PLUGINS_DIR}/ ; rm -f /tmp/plugin.tar.gz

View File

@ -0,0 +1,67 @@
CREATE DATABASE testDB
CREATE DATABASE testDB2
USE testDB2
-- Gives the SQL Server Agent time to start before applying CDC operations
-- If the Agent isn't running, a CDC operation will fail and the container won't start
WAITFOR DELAY '00:00:30'
EXEC sys.sp_cdc_enable_db
CREATE SCHEMA inventory
CREATE TABLE inventory.products (id INTEGER IDENTITY(101,1) NOT NULL PRIMARY KEY, name VARCHAR(255) NOT NULL, description VARCHAR(512), weight FLOAT)
INSERT INTO inventory.products(name,description,weight) VALUES ('scooter','Small 2-wheel scooter',3.14)
INSERT INTO inventory.products(name,description,weight) VALUES ('car battery','12V car battery',8.1)
INSERT INTO inventory.products(name,description,weight) VALUES ('12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8)
INSERT INTO inventory.products(name,description,weight) VALUES ('hammer','12oz carpenter''s hammer',0.75)
INSERT INTO inventory.products(name,description,weight) VALUES ('hammer','14oz carpenter''s hammer',0.875)
INSERT INTO inventory.products(name,description,weight) VALUES ('hammer','16oz carpenter''s hammer',1.0)
INSERT INTO inventory.products(name,description,weight) VALUES ('rocks','box of assorted rocks',5.3)
INSERT INTO inventory.products(name,description,weight) VALUES ('jacket','water resistent black wind breaker',0.1)
INSERT INTO inventory.products(name,description,weight) VALUES ('spare tire','24 inch spare tire',22.2)
EXEC sys.sp_cdc_enable_table @source_schema = 'inventory', @source_name = 'products', @role_name = NULL, @supports_net_changes = 0
USE testDB
EXEC sys.sp_cdc_enable_db
CREATE SCHEMA inventory
CREATE TABLE inventory.products (id INTEGER IDENTITY(101,1) NOT NULL PRIMARY KEY, name VARCHAR(255) NOT NULL, description VARCHAR(512), weight FLOAT)
INSERT INTO inventory.products(name,description,weight) VALUES ('scooter','Small 2-wheel scooter',3.14)
INSERT INTO inventory.products(name,description,weight) VALUES ('car battery','12V car battery',8.1)
INSERT INTO inventory.products(name,description,weight) VALUES ('12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8)
INSERT INTO inventory.products(name,description,weight) VALUES ('hammer','12oz carpenter''s hammer',0.75)
INSERT INTO inventory.products(name,description,weight) VALUES ('hammer','14oz carpenter''s hammer',0.875)
INSERT INTO inventory.products(name,description,weight) VALUES ('hammer','16oz carpenter''s hammer',1.0)
INSERT INTO inventory.products(name,description,weight) VALUES ('rocks','box of assorted rocks',5.3)
INSERT INTO inventory.products(name,description,weight) VALUES ('jacket','water resistent black wind breaker',0.1)
INSERT INTO inventory.products(name,description,weight) VALUES ('spare tire','24 inch spare tire',22.2)
EXEC sys.sp_cdc_enable_table @source_schema = 'inventory', @source_name = 'products', @role_name = NULL, @supports_net_changes = 0
CREATE TABLE inventory.products_on_hand (product_id INTEGER NOT NULL PRIMARY KEY, quantity INTEGER NOT NULL, FOREIGN KEY (product_id) REFERENCES inventory.products(id))
INSERT INTO inventory.products_on_hand VALUES (101,3)
INSERT INTO inventory.products_on_hand VALUES (102,8)
INSERT INTO inventory.products_on_hand VALUES (103,18)
INSERT INTO inventory.products_on_hand VALUES (104,4)
INSERT INTO inventory.products_on_hand VALUES (105,5)
INSERT INTO inventory.products_on_hand VALUES (106,0)
INSERT INTO inventory.products_on_hand VALUES (107,44)
INSERT INTO inventory.products_on_hand VALUES (108,2)
INSERT INTO inventory.products_on_hand VALUES (109,5)
EXEC sys.sp_cdc_enable_table @source_schema = 'inventory', @source_name = 'products_on_hand', @role_name = NULL, @supports_net_changes = 0
CREATE TABLE inventory.customers (id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL UNIQUE)
INSERT INTO inventory.customers(first_name,last_name,email) VALUES ('Sally','Thomas','sally.thomas@acme.com')
INSERT INTO inventory.customers(first_name,last_name,email) VALUES ('George','Bailey','gbailey@foobar.com')
INSERT INTO inventory.customers(first_name,last_name,email) VALUES ('Edward','Walker','ed@walker.com')
INSERT INTO inventory.customers(first_name,last_name,email) VALUES ('Anne','Kretchmar','annek@noanswer.org')
EXEC sys.sp_cdc_enable_table @source_schema = 'inventory', @source_name = 'customers', @role_name = NULL, @supports_net_changes = 0
CREATE TABLE inventory.orders (id INTEGER IDENTITY(10001,1) NOT NULL PRIMARY KEY, order_date DATE NOT NULL, purchaser INTEGER NOT NULL, quantity INTEGER NOT NULL, product_id INTEGER NOT NULL, FOREIGN KEY (purchaser) REFERENCES inventory.customers(id), FOREIGN KEY (product_id) REFERENCES inventory.products(id))
INSERT INTO inventory.orders(order_date,purchaser,quantity,product_id) VALUES ('16-JAN-2016', 1001, 1, 102)
INSERT INTO inventory.orders(order_date,purchaser,quantity,product_id) VALUES ('17-JAN-2016', 1002, 2, 105)
INSERT INTO inventory.orders(order_date,purchaser,quantity,product_id) VALUES ('19-FEB-2016', 1002, 2, 106)
INSERT INTO inventory.orders(order_date,purchaser,quantity,product_id) VALUES ('21-FEB-2016', 1003, 1, 107)
EXEC sys.sp_cdc_enable_table @source_schema = 'inventory', @source_name = 'orders', @role_name = NULL, @supports_net_changes = 0