DBZ-6855 Use correct naming prefix
This commit is contained in:
parent
454a9ac914
commit
52f45ad9f7
@ -21,17 +21,16 @@
|
|||||||
public class JdbcOffsetBackingStoreConfig extends JdbcCommonConfig {
|
public class JdbcOffsetBackingStoreConfig extends JdbcCommonConfig {
|
||||||
|
|
||||||
public static final String OFFSET_STORAGE_PREFIX = "offset.storage.";
|
public static final String OFFSET_STORAGE_PREFIX = "offset.storage.";
|
||||||
public static final String PROP_PREFIX = OFFSET_STORAGE_PREFIX + CONFIGURATION_FIELD_PREFIX_STRING;
|
|
||||||
|
|
||||||
public static final String DEFAULT_TABLE_NAME = "debezium_offset_storage";
|
public static final String DEFAULT_TABLE_NAME = "debezium_offset_storage";
|
||||||
public static final Field PROP_TABLE_NAME = Field.create(PROP_PREFIX + "offset.table.name")
|
public static final Field PROP_TABLE_NAME = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "offset.table.name")
|
||||||
.withDescription("Name of the table to store offsets")
|
.withDescription("Name of the table to store offsets")
|
||||||
.withDefault(DEFAULT_TABLE_NAME);
|
.withDefault(DEFAULT_TABLE_NAME);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* JDBC Offset storage CREATE TABLE syntax.
|
* JDBC Offset storage CREATE TABLE syntax.
|
||||||
*/
|
*/
|
||||||
public static final String DEFAULT_TABLE_DDL = "CREATE TABLE %s(id VARCHAR(36) NOT NULL, " +
|
public static final String DEFAULT_TABLE_DDL = "CREATE TABLE %s (id VARCHAR(36) NOT NULL, " +
|
||||||
"offset_key VARCHAR(1255), offset_val VARCHAR(1255)," +
|
"offset_key VARCHAR(1255), offset_val VARCHAR(1255)," +
|
||||||
"record_insert_ts TIMESTAMP NOT NULL," +
|
"record_insert_ts TIMESTAMP NOT NULL," +
|
||||||
"record_insert_seq INTEGER NOT NULL" +
|
"record_insert_seq INTEGER NOT NULL" +
|
||||||
@ -45,7 +44,7 @@ public class JdbcOffsetBackingStoreConfig extends JdbcCommonConfig {
|
|||||||
* record_insert_ts - Timestamp when the record was inserted
|
* record_insert_ts - Timestamp when the record was inserted
|
||||||
* record_insert_seq - Sequence number of record
|
* record_insert_seq - Sequence number of record
|
||||||
*/
|
*/
|
||||||
public static final Field PROP_TABLE_DDL = Field.create(PROP_PREFIX + "offset.table.ddl")
|
public static final Field PROP_TABLE_DDL = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "offset.table.ddl")
|
||||||
.withDescription("Create table syntax for offset jdbc table")
|
.withDescription("Create table syntax for offset jdbc table")
|
||||||
.withDefault(DEFAULT_TABLE_DDL);
|
.withDefault(DEFAULT_TABLE_DDL);
|
||||||
|
|
||||||
@ -56,15 +55,15 @@ public class JdbcOffsetBackingStoreConfig extends JdbcCommonConfig {
|
|||||||
|
|
||||||
public static final String DEFAULT_TABLE_INSERT = "INSERT INTO %s(id, offset_key, offset_val, record_insert_ts, record_insert_seq) " +
|
public static final String DEFAULT_TABLE_INSERT = "INSERT INTO %s(id, offset_key, offset_val, record_insert_ts, record_insert_seq) " +
|
||||||
"VALUES ( ?, ?, ?, ?, ? )";
|
"VALUES ( ?, ?, ?, ?, ? )";
|
||||||
public static final Field PROP_TABLE_SELECT = Field.create(PROP_PREFIX + "offset.table.select")
|
public static final Field PROP_TABLE_SELECT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "offset.table.select")
|
||||||
.withDescription("Select syntax to get offset data from jdbc table")
|
.withDescription("Select syntax to get offset data from jdbc table")
|
||||||
.withDefault(DEFAULT_TABLE_SELECT);
|
.withDefault(DEFAULT_TABLE_SELECT);
|
||||||
|
|
||||||
public static final Field PROP_TABLE_DELETE = Field.create(PROP_PREFIX + "offset.table.delete")
|
public static final Field PROP_TABLE_DELETE = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "offset.table.delete")
|
||||||
.withDescription("Delete syntax to delete offset data from jdbc table")
|
.withDescription("Delete syntax to delete offset data from jdbc table")
|
||||||
.withDefault(DEFAULT_TABLE_DELETE);
|
.withDefault(DEFAULT_TABLE_DELETE);
|
||||||
|
|
||||||
public static final Field PROP_TABLE_INSERT = Field.create(PROP_PREFIX + "offset.table.insert")
|
public static final Field PROP_TABLE_INSERT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "offset.table.insert")
|
||||||
.withDescription("Insert syntax to add offset data to the jdbc table")
|
.withDescription("Insert syntax to add offset data to the jdbc table")
|
||||||
.withDefault(DEFAULT_TABLE_INSERT);
|
.withDefault(DEFAULT_TABLE_INSERT);
|
||||||
|
|
||||||
|
@ -31,7 +31,6 @@
|
|||||||
import io.debezium.connector.mysql.MySqlConnector;
|
import io.debezium.connector.mysql.MySqlConnector;
|
||||||
import io.debezium.connector.mysql.MySqlConnectorConfig;
|
import io.debezium.connector.mysql.MySqlConnectorConfig;
|
||||||
import io.debezium.connector.mysql.MySqlTestConnection;
|
import io.debezium.connector.mysql.MySqlTestConnection;
|
||||||
import io.debezium.connector.mysql.UniqueDatabase;
|
|
||||||
import io.debezium.embedded.AbstractConnectorTest;
|
import io.debezium.embedded.AbstractConnectorTest;
|
||||||
import io.debezium.jdbc.JdbcConfiguration;
|
import io.debezium.jdbc.JdbcConfiguration;
|
||||||
import io.debezium.junit.SkipWhenDatabaseVersion;
|
import io.debezium.junit.SkipWhenDatabaseVersion;
|
||||||
@ -46,8 +45,6 @@
|
|||||||
*/
|
*/
|
||||||
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 5, minor = 6, reason = "DDL uses fractional second data types, not supported until MySQL 5.6")
|
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 5, minor = 6, reason = "DDL uses fractional second data types, not supported until MySQL 5.6")
|
||||||
public class JdbcOffsetBackingStoreIT extends AbstractConnectorTest {
|
public class JdbcOffsetBackingStoreIT extends AbstractConnectorTest {
|
||||||
private final UniqueDatabase DATABASE = new UniqueDatabase("myServer1", "connector_test")
|
|
||||||
.withDbHistoryPath(SCHEMA_HISTORY_PATH);
|
|
||||||
private static final Path SCHEMA_HISTORY_PATH = Testing.Files.createTestingPath("schema-history.db").toAbsolutePath();
|
private static final Path SCHEMA_HISTORY_PATH = Testing.Files.createTestingPath("schema-history.db").toAbsolutePath();
|
||||||
|
|
||||||
private static final String USER = "debezium";
|
private static final String USER = "debezium";
|
||||||
@ -161,7 +158,7 @@ private MySqlTestConnection testConnection() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldStartCorrectlyWithJDBCOffsetStorage() throws InterruptedException, IOException {
|
public void shouldStartCorrectlyWithJdbcOffsetStorage() throws InterruptedException, IOException {
|
||||||
String masterPort = System.getProperty("database.port", "3306");
|
String masterPort = System.getProperty("database.port", "3306");
|
||||||
String replicaPort = System.getProperty("database.replica.port", "3306");
|
String replicaPort = System.getProperty("database.replica.port", "3306");
|
||||||
boolean replicaIsMaster = masterPort.equals(replicaPort);
|
boolean replicaIsMaster = masterPort.equals(replicaPort);
|
||||||
|
@ -24,7 +24,6 @@
|
|||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore;
|
import io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore;
|
||||||
import io.debezium.storage.jdbc.offset.JdbcOffsetBackingStoreConfig;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Ismail simsek
|
* @author Ismail simsek
|
||||||
@ -45,16 +44,16 @@ public void setup() throws IOException {
|
|||||||
store = new JdbcOffsetBackingStore();
|
store = new JdbcOffsetBackingStore();
|
||||||
props = new HashMap<>();
|
props = new HashMap<>();
|
||||||
props.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, "dummy");
|
props.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, "dummy");
|
||||||
props.put(JdbcOffsetBackingStoreConfig.OFFSET_STORAGE_PREFIX + JdbcOffsetBackingStoreConfig.PROP_JDBC_URL.name(), "jdbc:sqlite:" + dbFile.getAbsolutePath());
|
props.put("offset.storage.jdbc.url", "jdbc:sqlite:" + dbFile.getAbsolutePath());
|
||||||
props.put(JdbcOffsetBackingStoreConfig.OFFSET_STORAGE_PREFIX + JdbcOffsetBackingStoreConfig.PROP_USER.name(), "user");
|
props.put("offset.storage.jdbc.user", "user");
|
||||||
props.put(JdbcOffsetBackingStoreConfig.OFFSET_STORAGE_PREFIX + JdbcOffsetBackingStoreConfig.PROP_PASSWORD.name(), "pass");
|
props.put("offset.storage.jdbc.password", "pass");
|
||||||
props.put(JdbcOffsetBackingStoreConfig.PROP_TABLE_NAME.name(), "offsets_jdbc");
|
props.put("offset.storage.jdbc.offset.table.name", "offsets_jdbc");
|
||||||
props.put(JdbcOffsetBackingStoreConfig.PROP_TABLE_DDL.name(), "CREATE TABLE %s(id VARCHAR(36) NOT NULL, " +
|
props.put("offset.storage.jdbc.offset.table.ddl", "CREATE TABLE %s (id VARCHAR(36) NOT NULL, " +
|
||||||
"offset_key VARCHAR(1255), offset_val VARCHAR(1255)," +
|
"offset_key VARCHAR(1255), offset_val VARCHAR(1255)," +
|
||||||
"record_insert_ts TIMESTAMP NOT NULL," +
|
"record_insert_ts TIMESTAMP NOT NULL," +
|
||||||
"record_insert_seq INTEGER NOT NULL" +
|
"record_insert_seq INTEGER NOT NULL" +
|
||||||
")");
|
")");
|
||||||
props.put(JdbcOffsetBackingStoreConfig.PROP_TABLE_SELECT.name(), "SELECT id, offset_key, offset_val FROM %s " +
|
props.put("offset.storage.jdbc.offset.table.select", "SELECT id, offset_key, offset_val FROM offsets_jdbc " +
|
||||||
"ORDER BY record_insert_ts, record_insert_seq");
|
"ORDER BY record_insert_ts, record_insert_seq");
|
||||||
props.put(StandaloneConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
|
props.put(StandaloneConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
|
||||||
props.put(StandaloneConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
|
props.put(StandaloneConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
|
||||||
|
Loading…
Reference in New Issue
Block a user