diff --git a/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/JdbcConfig.java b/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/JdbcConfig.java index 6da63f8b7..e7566aaac 100644 --- a/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/JdbcConfig.java +++ b/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/JdbcConfig.java @@ -23,7 +23,7 @@ public class JdbcConfig extends WorkerConfig { .define(OFFSET_STORAGE_JDBC_URL.name(), ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, - "JDBC database URI") + "JDBC database URL") .define(OFFSET_STORAGE_JDBC_USER.name(), ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, diff --git a/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/JdbcOffsetBackingStore.java b/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/JdbcOffsetBackingStore.java index b8588a760..6c8298ae1 100644 --- a/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/JdbcOffsetBackingStore.java +++ b/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/JdbcOffsetBackingStore.java @@ -43,7 +43,7 @@ public class JdbcOffsetBackingStore implements OffsetBackingStore { public static final Field OFFSET_STORAGE_JDBC_URL = Field.create("offset.storage.jdbc.url") - .withDescription("URI of the database which will be used to record the database history") + .withDescription("URL of the database which will be used to record the database history") .withValidation(Field::isRequired); public static final Field OFFSET_STORAGE_JDBC_USER = Field.create("offset.storage.jdbc.user") @@ -84,7 +84,7 @@ public class JdbcOffsetBackingStore implements OffsetBackingStore { protected ExecutorService executor; private final AtomicInteger recordInsertSeq = new AtomicInteger(0); private Connection conn; - private String jdbcUri; + private String jdbcUrl; private String offsetStorageTableName; @@ -103,13 +103,13 @@ public ByteBuffer toByteBuffer(String data) { public void configure(WorkerConfig config) { try { - jdbcUri = config.getString("offset.storage.jdbc.url"); + jdbcUrl = config.getString("offset.storage.jdbc.url"); offsetStorageTableName = config.getString(OFFSET_STORAGE_TABLE_NAME.name()); - conn = DriverManager.getConnection(jdbcUri, config.getString(OFFSET_STORAGE_JDBC_USER.name()), config.getString(OFFSET_STORAGE_JDBC_PASSWORD.name())); + conn = DriverManager.getConnection(jdbcUrl, config.getString(OFFSET_STORAGE_JDBC_USER.name()), config.getString(OFFSET_STORAGE_JDBC_PASSWORD.name())); conn.setAutoCommit(false); } catch (Exception e) { - throw new IllegalStateException("Failed to connect JDBC offset backing store: " + jdbcUri, e); + throw new IllegalStateException("Failed to connect JDBC offset backing store: " + jdbcUrl, e); } } @@ -118,13 +118,13 @@ public synchronized void start() { executor = Executors.newFixedThreadPool(1, ThreadUtils.createThreadFactory( this.getClass().getSimpleName() + "-%d", false)); - LOG.info("Starting JdbcOffsetBackingStore db {}", jdbcUri); + LOG.info("Starting JdbcOffsetBackingStore db {}", jdbcUrl); try { initializeTable(); } catch (SQLException e) { - throw new IllegalStateException("Failed to create JDBC offset table: " + jdbcUri, e); + throw new IllegalStateException("Failed to create JDBC offset table: " + jdbcUrl, e); } load(); } @@ -187,7 +187,7 @@ private void load() { data = tmpData; } catch (SQLException e) { - LOG.error("Failed recover records from database: {}", jdbcUri, e); + LOG.error("Failed recover records from database: {}", jdbcUrl, e); } } diff --git a/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/history/JdbcSchemaHistory.java b/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/history/JdbcSchemaHistory.java index 340583e0c..8b75ca4dd 100644 --- a/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/history/JdbcSchemaHistory.java +++ b/debezium-storage/debezium-storage-jdbc/src/main/java/io/debezium/storage/jdbc/history/JdbcSchemaHistory.java @@ -99,7 +99,7 @@ public final class JdbcSchemaHistory extends AbstractSchemaHistory { private final AtomicInteger recordInsertSeq = new AtomicInteger(0); private Connection conn; - private String jdbcUri; + private String jdbcUrl; @Override public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) { @@ -114,12 +114,12 @@ public void configure(Configuration config, HistoryRecordComparator comparator, super.configure(config, comparator, listener, useCatalogBeforeSchema); try { - jdbcUri = config.getString(JDBC_URL.name()); + jdbcUrl = config.getString(JDBC_URL.name()); conn = DriverManager.getConnection(config.getString(JDBC_URL.name()), config.getString(JDBC_USER.name()), config.getString(JDBC_PASSWORD.name())); conn.setAutoCommit(false); } catch (SQLException e) { - throw new IllegalStateException("Failed to connect " + jdbcUri); + throw new IllegalStateException("Failed to connect " + jdbcUrl); } } @@ -137,7 +137,7 @@ public void start() { } } catch (Exception e) { - throw new SchemaHistoryException("Unable to create history table " + jdbcUri + ": " + e.getMessage(), e); + throw new SchemaHistoryException("Unable to create history table " + jdbcUrl + ": " + e.getMessage(), e); } } }); @@ -269,7 +269,7 @@ public boolean exists() { @Override public String toString() { - return "Jdbc database: " + (jdbcUri != null ? jdbcUri : "(unstarted)"); + return "Jdbc database: " + (jdbcUrl != null ? jdbcUrl : "(unstarted)"); } @Override diff --git a/debezium-storage/debezium-storage-jdbc/src/test/java/io/debezium/storage/jdbc/JdbcOffsetBackingStoreIT.java b/debezium-storage/debezium-storage-jdbc/src/test/java/io/debezium/storage/jdbc/JdbcOffsetBackingStoreIT.java index 632b70279..cb250986e 100644 --- a/debezium-storage/debezium-storage-jdbc/src/test/java/io/debezium/storage/jdbc/JdbcOffsetBackingStoreIT.java +++ b/debezium-storage/debezium-storage-jdbc/src/test/java/io/debezium/storage/jdbc/JdbcOffsetBackingStoreIT.java @@ -123,7 +123,7 @@ protected Configuration.Builder schemaHistory(Configuration.Builder builder) { .with(JDBC_PASSWORD, "pass"); } - private Configuration.Builder config(String jdbcUri) { + private Configuration.Builder config(String jdbcUrl) { final Configuration.Builder builder = Configuration.create() .with(MySqlConnectorConfig.HOSTNAME, container.getHost()) @@ -138,7 +138,7 @@ private Configuration.Builder config(String jdbcUri) { .with(CommonConnectorConfig.TOPIC_PREFIX, TOPIC_PREFIX) .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL) .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false) - .with(OFFSET_STORAGE_JDBC_URL.name(), jdbcUri) + .with(OFFSET_STORAGE_JDBC_URL.name(), jdbcUrl) .with(OFFSET_STORAGE_JDBC_USER.name(), "user") .with(OFFSET_STORAGE_JDBC_PASSWORD.name(), "pass") .with(OFFSET_STORAGE_TABLE_NAME.name(), "offsets_jdbc") @@ -170,34 +170,34 @@ public void shouldStartCorrectlyWithJDBCOffsetStorage() throws SQLException, Int } File dbFile = File.createTempFile("test-", "db"); - String jdbcUri = String.format("jdbc:sqlite:%s", dbFile.getAbsolutePath()); + String jdbcUrl = String.format("jdbc:sqlite:%s", dbFile.getAbsolutePath()); // Use the DB configuration to define the connector's configuration to use the "replica" // which may be the same as the "master" ... - Configuration config = config(jdbcUri).build(); + Configuration config = config(jdbcUrl).build(); // Start the connector ... start(MySqlConnector.class, config); waitForStreamingRunning("mysql", TOPIC_PREFIX); consumeRecordsByTopic(4); - validateIfDataIsCreatedInJDBCDatabase(jdbcUri, "user", "pass", "offsets_jdbc"); + validateIfDataIsCreatedInJDBCDatabase(jdbcUrl, "user", "pass", "offsets_jdbc"); } /** * Function to validate the offset storage data that is created * in Database. * - * @param jdbcUri + * @param jdbcUrl * @param jdbcUser * @param jdbcPassword */ - private void validateIfDataIsCreatedInJDBCDatabase(String jdbcUri, String jdbcUser, + private void validateIfDataIsCreatedInJDBCDatabase(String jdbcUrl, String jdbcUser, String jdbcPassword, String jdbcTableName) { Connection connection = null; try { // create a database connection - connection = DriverManager.getConnection(jdbcUri, jdbcUser, jdbcPassword); + connection = DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPassword); Statement statement = connection.createStatement(); statement.setQueryTimeout(30); // set timeout to 30 sec. diff --git a/debezium-storage/debezium-storage-jdbc/src/test/java/io/debezium/storage/jdbc/history/JdbcSchemaHistoryIT.java b/debezium-storage/debezium-storage-jdbc/src/test/java/io/debezium/storage/jdbc/history/JdbcSchemaHistoryIT.java index ffea2289a..8151222f5 100644 --- a/debezium-storage/debezium-storage-jdbc/src/test/java/io/debezium/storage/jdbc/history/JdbcSchemaHistoryIT.java +++ b/debezium-storage/debezium-storage-jdbc/src/test/java/io/debezium/storage/jdbc/history/JdbcSchemaHistoryIT.java @@ -120,7 +120,7 @@ protected Configuration.Builder schemaHistory(Configuration.Builder builder) { private Configuration.Builder config() throws IOException { File dbFile = File.createTempFile("test-", "db"); - String jdbcUri = String.format("jdbc:sqlite:%s", dbFile.getAbsolutePath()); + String jdbcUrl = String.format("jdbc:sqlite:%s", dbFile.getAbsolutePath()); final Builder builder = Configuration.create() .with(MySqlConnectorConfig.HOSTNAME, container.getHost()) @@ -135,7 +135,7 @@ private Configuration.Builder config() throws IOException { .with(CommonConnectorConfig.TOPIC_PREFIX, TOPIC_PREFIX) .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false) - .with(OFFSET_STORAGE_JDBC_URL.name(), jdbcUri) + .with(OFFSET_STORAGE_JDBC_URL.name(), jdbcUrl) .with(OFFSET_STORAGE_JDBC_USER.name(), "user") .with(OFFSET_STORAGE_JDBC_PASSWORD.name(), "pass") .with(OFFSET_STORAGE_TABLE_NAME.name(), "offsets_jdbc");