diff --git a/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/offset/JdbcOffsetBackingStoreConfig.java b/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/offset/JdbcOffsetBackingStoreConfig.java index 6185b95d3..e00544a2a 100644 --- a/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/offset/JdbcOffsetBackingStoreConfig.java +++ b/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/offset/JdbcOffsetBackingStoreConfig.java @@ -21,17 +21,16 @@ public class JdbcOffsetBackingStoreConfig extends JdbcCommonConfig { public static final String OFFSET_STORAGE_PREFIX = "offset.storage."; - public static final String PROP_PREFIX = OFFSET_STORAGE_PREFIX + CONFIGURATION_FIELD_PREFIX_STRING; public static final String DEFAULT_TABLE_NAME = "debezium_offset_storage"; - public static final Field PROP_TABLE_NAME = Field.create(PROP_PREFIX + "offset.table.name") + public static final Field PROP_TABLE_NAME = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "offset.table.name") .withDescription("Name of the table to store offsets") .withDefault(DEFAULT_TABLE_NAME); /** * JDBC Offset storage CREATE TABLE syntax. */ - public static final String DEFAULT_TABLE_DDL = "CREATE TABLE %s(id VARCHAR(36) NOT NULL, " + + public static final String DEFAULT_TABLE_DDL = "CREATE TABLE %s (id VARCHAR(36) NOT NULL, " + "offset_key VARCHAR(1255), offset_val VARCHAR(1255)," + "record_insert_ts TIMESTAMP NOT NULL," + "record_insert_seq INTEGER NOT NULL" + @@ -45,7 +44,7 @@ public class JdbcOffsetBackingStoreConfig extends JdbcCommonConfig { * record_insert_ts - Timestamp when the record was inserted * record_insert_seq - Sequence number of record */ - public static final Field PROP_TABLE_DDL = Field.create(PROP_PREFIX + "offset.table.ddl") + public static final Field PROP_TABLE_DDL = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "offset.table.ddl") .withDescription("Create table syntax for offset jdbc table") .withDefault(DEFAULT_TABLE_DDL); @@ -56,15 +55,15 @@ public class JdbcOffsetBackingStoreConfig extends JdbcCommonConfig { public static final String DEFAULT_TABLE_INSERT = "INSERT INTO %s(id, offset_key, offset_val, record_insert_ts, record_insert_seq) " + "VALUES ( ?, ?, ?, ?, ? )"; - public static final Field PROP_TABLE_SELECT = Field.create(PROP_PREFIX + "offset.table.select") + public static final Field PROP_TABLE_SELECT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "offset.table.select") .withDescription("Select syntax to get offset data from jdbc table") .withDefault(DEFAULT_TABLE_SELECT); - public static final Field PROP_TABLE_DELETE = Field.create(PROP_PREFIX + "offset.table.delete") + public static final Field PROP_TABLE_DELETE = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "offset.table.delete") .withDescription("Delete syntax to delete offset data from jdbc table") .withDefault(DEFAULT_TABLE_DELETE); - public static final Field PROP_TABLE_INSERT = Field.create(PROP_PREFIX + "offset.table.insert") + public static final Field PROP_TABLE_INSERT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "offset.table.insert") .withDescription("Insert syntax to add offset data to the jdbc table") .withDefault(DEFAULT_TABLE_INSERT); diff --git a/debezium-storage/debezium-storage-jdbc/src/test/java/io/debezium/storage/jdbc/JdbcOffsetBackingStoreIT.java b/debezium-storage/debezium-storage-jdbc/src/test/java/io/debezium/storage/jdbc/JdbcOffsetBackingStoreIT.java index 52f8aae54..717727efe 100644 --- a/debezium-storage/debezium-storage-jdbc/src/test/java/io/debezium/storage/jdbc/JdbcOffsetBackingStoreIT.java +++ b/debezium-storage/debezium-storage-jdbc/src/test/java/io/debezium/storage/jdbc/JdbcOffsetBackingStoreIT.java @@ -31,7 +31,6 @@ import io.debezium.connector.mysql.MySqlConnector; import io.debezium.connector.mysql.MySqlConnectorConfig; import io.debezium.connector.mysql.MySqlTestConnection; -import io.debezium.connector.mysql.UniqueDatabase; import io.debezium.embedded.AbstractConnectorTest; import io.debezium.jdbc.JdbcConfiguration; import io.debezium.junit.SkipWhenDatabaseVersion; @@ -46,8 +45,6 @@ */ @SkipWhenDatabaseVersion(check = LESS_THAN, major = 5, minor = 6, reason = "DDL uses fractional second data types, not supported until MySQL 5.6") public class JdbcOffsetBackingStoreIT extends AbstractConnectorTest { - private final UniqueDatabase DATABASE = new UniqueDatabase("myServer1", "connector_test") - .withDbHistoryPath(SCHEMA_HISTORY_PATH); private static final Path SCHEMA_HISTORY_PATH = Testing.Files.createTestingPath("schema-history.db").toAbsolutePath(); private static final String USER = "debezium"; @@ -161,7 +158,7 @@ private MySqlTestConnection testConnection() { } @Test - public void shouldStartCorrectlyWithJDBCOffsetStorage() throws InterruptedException, IOException { + public void shouldStartCorrectlyWithJdbcOffsetStorage() throws InterruptedException, IOException { String masterPort = System.getProperty("database.port", "3306"); String replicaPort = System.getProperty("database.replica.port", "3306"); boolean replicaIsMaster = masterPort.equals(replicaPort); diff --git a/debezium-storage/debezium-storage-jdbc/src/test/java/io/debezium/storage/jdbc/JdbcOffsetBackingStoreTest.java b/debezium-storage/debezium-storage-jdbc/src/test/java/io/debezium/storage/jdbc/JdbcOffsetBackingStoreTest.java index a023c38b6..52fa2fbf1 100644 --- a/debezium-storage/debezium-storage-jdbc/src/test/java/io/debezium/storage/jdbc/JdbcOffsetBackingStoreTest.java +++ b/debezium-storage/debezium-storage-jdbc/src/test/java/io/debezium/storage/jdbc/JdbcOffsetBackingStoreTest.java @@ -24,7 +24,6 @@ import org.junit.Test; import io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore; -import io.debezium.storage.jdbc.offset.JdbcOffsetBackingStoreConfig; /** * @author Ismail simsek @@ -45,16 +44,16 @@ public void setup() throws IOException { store = new JdbcOffsetBackingStore(); props = new HashMap<>(); props.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, "dummy"); - props.put(JdbcOffsetBackingStoreConfig.OFFSET_STORAGE_PREFIX + JdbcOffsetBackingStoreConfig.PROP_JDBC_URL.name(), "jdbc:sqlite:" + dbFile.getAbsolutePath()); - props.put(JdbcOffsetBackingStoreConfig.OFFSET_STORAGE_PREFIX + JdbcOffsetBackingStoreConfig.PROP_USER.name(), "user"); - props.put(JdbcOffsetBackingStoreConfig.OFFSET_STORAGE_PREFIX + JdbcOffsetBackingStoreConfig.PROP_PASSWORD.name(), "pass"); - props.put(JdbcOffsetBackingStoreConfig.PROP_TABLE_NAME.name(), "offsets_jdbc"); - props.put(JdbcOffsetBackingStoreConfig.PROP_TABLE_DDL.name(), "CREATE TABLE %s(id VARCHAR(36) NOT NULL, " + + props.put("offset.storage.jdbc.url", "jdbc:sqlite:" + dbFile.getAbsolutePath()); + props.put("offset.storage.jdbc.user", "user"); + props.put("offset.storage.jdbc.password", "pass"); + props.put("offset.storage.jdbc.offset.table.name", "offsets_jdbc"); + props.put("offset.storage.jdbc.offset.table.ddl", "CREATE TABLE %s (id VARCHAR(36) NOT NULL, " + "offset_key VARCHAR(1255), offset_val VARCHAR(1255)," + "record_insert_ts TIMESTAMP NOT NULL," + "record_insert_seq INTEGER NOT NULL" + ")"); - props.put(JdbcOffsetBackingStoreConfig.PROP_TABLE_SELECT.name(), "SELECT id, offset_key, offset_val FROM %s " + + props.put("offset.storage.jdbc.offset.table.select", "SELECT id, offset_key, offset_val FROM offsets_jdbc " + "ORDER BY record_insert_ts, record_insert_seq"); props.put(StandaloneConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); props.put(StandaloneConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");