DBZ-3621 Changed references to JDBC URI to JDBC URL
This commit is contained in:
parent
3f8b2a8d49
commit
d33472eeb8
@ -23,7 +23,7 @@ public class JdbcConfig extends WorkerConfig {
|
||||
.define(OFFSET_STORAGE_JDBC_URL.name(),
|
||||
ConfigDef.Type.STRING,
|
||||
ConfigDef.Importance.HIGH,
|
||||
"JDBC database URI")
|
||||
"JDBC database URL")
|
||||
.define(OFFSET_STORAGE_JDBC_USER.name(),
|
||||
ConfigDef.Type.STRING,
|
||||
ConfigDef.Importance.HIGH,
|
||||
|
@ -43,7 +43,7 @@
|
||||
public class JdbcOffsetBackingStore implements OffsetBackingStore {
|
||||
|
||||
public static final Field OFFSET_STORAGE_JDBC_URL = Field.create("offset.storage.jdbc.url")
|
||||
.withDescription("URI of the database which will be used to record the database history")
|
||||
.withDescription("URL of the database which will be used to record the database history")
|
||||
.withValidation(Field::isRequired);
|
||||
|
||||
public static final Field OFFSET_STORAGE_JDBC_USER = Field.create("offset.storage.jdbc.user")
|
||||
@ -84,7 +84,7 @@ public class JdbcOffsetBackingStore implements OffsetBackingStore {
|
||||
protected ExecutorService executor;
|
||||
private final AtomicInteger recordInsertSeq = new AtomicInteger(0);
|
||||
private Connection conn;
|
||||
private String jdbcUri;
|
||||
private String jdbcUrl;
|
||||
|
||||
private String offsetStorageTableName;
|
||||
|
||||
@ -103,13 +103,13 @@ public ByteBuffer toByteBuffer(String data) {
|
||||
public void configure(WorkerConfig config) {
|
||||
|
||||
try {
|
||||
jdbcUri = config.getString("offset.storage.jdbc.url");
|
||||
jdbcUrl = config.getString("offset.storage.jdbc.url");
|
||||
offsetStorageTableName = config.getString(OFFSET_STORAGE_TABLE_NAME.name());
|
||||
conn = DriverManager.getConnection(jdbcUri, config.getString(OFFSET_STORAGE_JDBC_USER.name()), config.getString(OFFSET_STORAGE_JDBC_PASSWORD.name()));
|
||||
conn = DriverManager.getConnection(jdbcUrl, config.getString(OFFSET_STORAGE_JDBC_USER.name()), config.getString(OFFSET_STORAGE_JDBC_PASSWORD.name()));
|
||||
conn.setAutoCommit(false);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new IllegalStateException("Failed to connect JDBC offset backing store: " + jdbcUri, e);
|
||||
throw new IllegalStateException("Failed to connect JDBC offset backing store: " + jdbcUrl, e);
|
||||
}
|
||||
}
|
||||
|
||||
@ -118,13 +118,13 @@ public synchronized void start() {
|
||||
executor = Executors.newFixedThreadPool(1, ThreadUtils.createThreadFactory(
|
||||
this.getClass().getSimpleName() + "-%d", false));
|
||||
|
||||
LOG.info("Starting JdbcOffsetBackingStore db {}", jdbcUri);
|
||||
LOG.info("Starting JdbcOffsetBackingStore db {}", jdbcUrl);
|
||||
try {
|
||||
initializeTable();
|
||||
}
|
||||
catch (SQLException e) {
|
||||
|
||||
throw new IllegalStateException("Failed to create JDBC offset table: " + jdbcUri, e);
|
||||
throw new IllegalStateException("Failed to create JDBC offset table: " + jdbcUrl, e);
|
||||
}
|
||||
load();
|
||||
}
|
||||
@ -187,7 +187,7 @@ private void load() {
|
||||
data = tmpData;
|
||||
}
|
||||
catch (SQLException e) {
|
||||
LOG.error("Failed recover records from database: {}", jdbcUri, e);
|
||||
LOG.error("Failed recover records from database: {}", jdbcUrl, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -99,7 +99,7 @@ public final class JdbcSchemaHistory extends AbstractSchemaHistory {
|
||||
private final AtomicInteger recordInsertSeq = new AtomicInteger(0);
|
||||
|
||||
private Connection conn;
|
||||
private String jdbcUri;
|
||||
private String jdbcUrl;
|
||||
|
||||
@Override
|
||||
public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) {
|
||||
@ -114,12 +114,12 @@ public void configure(Configuration config, HistoryRecordComparator comparator,
|
||||
super.configure(config, comparator, listener, useCatalogBeforeSchema);
|
||||
|
||||
try {
|
||||
jdbcUri = config.getString(JDBC_URL.name());
|
||||
jdbcUrl = config.getString(JDBC_URL.name());
|
||||
conn = DriverManager.getConnection(config.getString(JDBC_URL.name()), config.getString(JDBC_USER.name()), config.getString(JDBC_PASSWORD.name()));
|
||||
conn.setAutoCommit(false);
|
||||
}
|
||||
catch (SQLException e) {
|
||||
throw new IllegalStateException("Failed to connect " + jdbcUri);
|
||||
throw new IllegalStateException("Failed to connect " + jdbcUrl);
|
||||
}
|
||||
}
|
||||
|
||||
@ -137,7 +137,7 @@ public void start() {
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new SchemaHistoryException("Unable to create history table " + jdbcUri + ": " + e.getMessage(), e);
|
||||
throw new SchemaHistoryException("Unable to create history table " + jdbcUrl + ": " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
});
|
||||
@ -269,7 +269,7 @@ public boolean exists() {
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Jdbc database: " + (jdbcUri != null ? jdbcUri : "(unstarted)");
|
||||
return "Jdbc database: " + (jdbcUrl != null ? jdbcUrl : "(unstarted)");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -123,7 +123,7 @@ protected Configuration.Builder schemaHistory(Configuration.Builder builder) {
|
||||
.with(JDBC_PASSWORD, "pass");
|
||||
}
|
||||
|
||||
private Configuration.Builder config(String jdbcUri) {
|
||||
private Configuration.Builder config(String jdbcUrl) {
|
||||
|
||||
final Configuration.Builder builder = Configuration.create()
|
||||
.with(MySqlConnectorConfig.HOSTNAME, container.getHost())
|
||||
@ -138,7 +138,7 @@ private Configuration.Builder config(String jdbcUri) {
|
||||
.with(CommonConnectorConfig.TOPIC_PREFIX, TOPIC_PREFIX)
|
||||
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL)
|
||||
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
|
||||
.with(OFFSET_STORAGE_JDBC_URL.name(), jdbcUri)
|
||||
.with(OFFSET_STORAGE_JDBC_URL.name(), jdbcUrl)
|
||||
.with(OFFSET_STORAGE_JDBC_USER.name(), "user")
|
||||
.with(OFFSET_STORAGE_JDBC_PASSWORD.name(), "pass")
|
||||
.with(OFFSET_STORAGE_TABLE_NAME.name(), "offsets_jdbc")
|
||||
@ -170,34 +170,34 @@ public void shouldStartCorrectlyWithJDBCOffsetStorage() throws SQLException, Int
|
||||
}
|
||||
|
||||
File dbFile = File.createTempFile("test-", "db");
|
||||
String jdbcUri = String.format("jdbc:sqlite:%s", dbFile.getAbsolutePath());
|
||||
String jdbcUrl = String.format("jdbc:sqlite:%s", dbFile.getAbsolutePath());
|
||||
|
||||
// Use the DB configuration to define the connector's configuration to use the "replica"
|
||||
// which may be the same as the "master" ...
|
||||
Configuration config = config(jdbcUri).build();
|
||||
Configuration config = config(jdbcUrl).build();
|
||||
|
||||
// Start the connector ...
|
||||
start(MySqlConnector.class, config);
|
||||
waitForStreamingRunning("mysql", TOPIC_PREFIX);
|
||||
|
||||
consumeRecordsByTopic(4);
|
||||
validateIfDataIsCreatedInJDBCDatabase(jdbcUri, "user", "pass", "offsets_jdbc");
|
||||
validateIfDataIsCreatedInJDBCDatabase(jdbcUrl, "user", "pass", "offsets_jdbc");
|
||||
}
|
||||
|
||||
/**
|
||||
* Function to validate the offset storage data that is created
|
||||
* in Database.
|
||||
*
|
||||
* @param jdbcUri
|
||||
* @param jdbcUrl
|
||||
* @param jdbcUser
|
||||
* @param jdbcPassword
|
||||
*/
|
||||
private void validateIfDataIsCreatedInJDBCDatabase(String jdbcUri, String jdbcUser,
|
||||
private void validateIfDataIsCreatedInJDBCDatabase(String jdbcUrl, String jdbcUser,
|
||||
String jdbcPassword, String jdbcTableName) {
|
||||
Connection connection = null;
|
||||
try {
|
||||
// create a database connection
|
||||
connection = DriverManager.getConnection(jdbcUri, jdbcUser, jdbcPassword);
|
||||
connection = DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPassword);
|
||||
Statement statement = connection.createStatement();
|
||||
statement.setQueryTimeout(30); // set timeout to 30 sec.
|
||||
|
||||
|
@ -120,7 +120,7 @@ protected Configuration.Builder schemaHistory(Configuration.Builder builder) {
|
||||
private Configuration.Builder config() throws IOException {
|
||||
File dbFile = File.createTempFile("test-", "db");
|
||||
|
||||
String jdbcUri = String.format("jdbc:sqlite:%s", dbFile.getAbsolutePath());
|
||||
String jdbcUrl = String.format("jdbc:sqlite:%s", dbFile.getAbsolutePath());
|
||||
|
||||
final Builder builder = Configuration.create()
|
||||
.with(MySqlConnectorConfig.HOSTNAME, container.getHost())
|
||||
@ -135,7 +135,7 @@ private Configuration.Builder config() throws IOException {
|
||||
.with(CommonConnectorConfig.TOPIC_PREFIX, TOPIC_PREFIX)
|
||||
.with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
|
||||
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
|
||||
.with(OFFSET_STORAGE_JDBC_URL.name(), jdbcUri)
|
||||
.with(OFFSET_STORAGE_JDBC_URL.name(), jdbcUrl)
|
||||
.with(OFFSET_STORAGE_JDBC_USER.name(), "user")
|
||||
.with(OFFSET_STORAGE_JDBC_PASSWORD.name(), "pass")
|
||||
.with(OFFSET_STORAGE_TABLE_NAME.name(), "offsets_jdbc");
|
||||
|
Loading…
Reference in New Issue
Block a user