DBZ-918 Adding Debezium connector field to source info

This will allow consumers to recognize the Debezium connector used for creating a given message, helping them to adjust their behavior for a variety of connectors.
This commit is contained in:
jchipmunk 2018-10-04 10:41:23 +03:00 committed by Gunnar Morling
parent 60d5fc8c09
commit ea230e8e8a
6 changed files with 88 additions and 0 deletions

View File

@ -21,4 +21,8 @@ public final class Module {
public static String version() {
return INFO.getProperty("version");
}
public static String name() {
return "oracle";
}
}

View File

@ -7,11 +7,13 @@
import java.time.Instant;
import io.debezium.annotation.NotThreadSafe;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import io.debezium.connector.AbstractSourceInfo;
@NotThreadSafe
public class SourceInfo extends AbstractSourceInfo {
public static final String SERVER_NAME_KEY = "name";
@ -45,6 +47,11 @@ protected Schema schema() {
return SCHEMA;
}
@Override
protected String connector() {
return Module.name();
}
@Override
public Struct struct() {
return super.struct()

View File

@ -0,0 +1,34 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle;
import org.junit.Before;
import org.junit.Test;
import java.time.Instant;
import static org.fest.assertions.Assertions.assertThat;
public class SourceInfoTest {
private SourceInfo source;
@Before
public void beforeEach() {
source = new SourceInfo("serverX");
source.setSourceTime(Instant.now());
}
@Test
public void versionIsPresent() {
assertThat(source.struct().getString(SourceInfo.DEBEZIUM_VERSION_KEY)).isEqualTo(Module.version());
}
@Test
public void connectorIsPresent() {
assertThat(source.struct().getString(SourceInfo.DEBEZIUM_CONNECTOR_KEY)).isEqualTo(Module.name());
}
}

View File

@ -21,4 +21,8 @@ public final class Module {
public static String version() {
return INFO.getProperty("version");
}
public static String name() {
return "sqlserver";
}
}

View File

@ -7,6 +7,7 @@
import java.time.Instant;
import io.debezium.annotation.NotThreadSafe;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
@ -19,6 +20,7 @@
* @author Jiri Pechanec
*
*/
@NotThreadSafe
public class SourceInfo extends AbstractSourceInfo {
public static final String SERVER_NAME_KEY = "name";
@ -93,6 +95,11 @@ protected Schema schema() {
return SCHEMA;
}
@Override
protected String connector() {
return Module.name();
}
/**
* @return the coordinates encoded as a {@code Struct}
*/

View File

@ -0,0 +1,32 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import org.junit.Before;
import org.junit.Test;
import static org.fest.assertions.Assertions.assertThat;
public class SourceInfoTest {
private SourceInfo source;
@Before
public void beforeEach() {
source = new SourceInfo("serverX");
source.setChangeLsn(Lsn.NULL);
}
@Test
public void versionIsPresent() {
assertThat(source.struct().getString(SourceInfo.DEBEZIUM_VERSION_KEY)).isEqualTo(Module.version());
}
@Test
public void connectorIsPresent() {
assertThat(source.struct().getString(SourceInfo.DEBEZIUM_CONNECTOR_KEY)).isEqualTo(Module.name());
}
}