diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/Module.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/Module.java index 7f959ae01..ba3738ecf 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/Module.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/Module.java @@ -21,4 +21,8 @@ public final class Module { public static String version() { return INFO.getProperty("version"); } + + public static String name() { + return "oracle"; + } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/SourceInfo.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/SourceInfo.java index 7f0e05e75..2658c6834 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/SourceInfo.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/SourceInfo.java @@ -7,11 +7,13 @@ import java.time.Instant; +import io.debezium.annotation.NotThreadSafe; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import io.debezium.connector.AbstractSourceInfo; +@NotThreadSafe public class SourceInfo extends AbstractSourceInfo { public static final String SERVER_NAME_KEY = "name"; @@ -45,6 +47,11 @@ protected Schema schema() { return SCHEMA; } + @Override + protected String connector() { + return Module.name(); + } + @Override public Struct struct() { return super.struct() diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SourceInfoTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SourceInfoTest.java new file mode 100644 index 000000000..761b27b83 --- /dev/null +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/SourceInfoTest.java @@ -0,0 +1,34 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle; + +import org.junit.Before; +import org.junit.Test; + +import java.time.Instant; + +import static org.fest.assertions.Assertions.assertThat; + +public class SourceInfoTest { + + private SourceInfo source; + + @Before + public void beforeEach() { + source = new SourceInfo("serverX"); + source.setSourceTime(Instant.now()); + } + + @Test + public void versionIsPresent() { + assertThat(source.struct().getString(SourceInfo.DEBEZIUM_VERSION_KEY)).isEqualTo(Module.version()); + } + + @Test + public void connectorIsPresent() { + assertThat(source.struct().getString(SourceInfo.DEBEZIUM_CONNECTOR_KEY)).isEqualTo(Module.name()); + } +} diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/Module.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/Module.java index 7ca01d218..cd358044b 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/Module.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/Module.java @@ -21,4 +21,8 @@ public final class Module { public static String version() { return INFO.getProperty("version"); } + + public static String name() { + return "sqlserver"; + } } diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SourceInfo.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SourceInfo.java index 4ba224ce6..052fea957 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SourceInfo.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SourceInfo.java @@ -7,6 +7,7 @@ import java.time.Instant; +import io.debezium.annotation.NotThreadSafe; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; @@ -19,6 +20,7 @@ * @author Jiri Pechanec * */ +@NotThreadSafe public class SourceInfo extends AbstractSourceInfo { public static final String SERVER_NAME_KEY = "name"; @@ -93,6 +95,11 @@ protected Schema schema() { return SCHEMA; } + @Override + protected String connector() { + return Module.name(); + } + /** * @return the coordinates encoded as a {@code Struct} */ diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SourceInfoTest.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SourceInfoTest.java new file mode 100644 index 000000000..59bef62c7 --- /dev/null +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SourceInfoTest.java @@ -0,0 +1,32 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.sqlserver; + +import org.junit.Before; +import org.junit.Test; + +import static org.fest.assertions.Assertions.assertThat; + +public class SourceInfoTest { + + private SourceInfo source; + + @Before + public void beforeEach() { + source = new SourceInfo("serverX"); + source.setChangeLsn(Lsn.NULL); + } + + @Test + public void versionIsPresent() { + assertThat(source.struct().getString(SourceInfo.DEBEZIUM_VERSION_KEY)).isEqualTo(Module.version()); + } + + @Test + public void connectorIsPresent() { + assertThat(source.struct().getString(SourceInfo.DEBEZIUM_CONNECTOR_KEY)).isEqualTo(Module.name()); + } +}