DBZ-6715 Make variables final
This commit is contained in:
parent
23adf86379
commit
e4b34e771c
@ -707,19 +707,19 @@ public void run() {
|
||||
}
|
||||
|
||||
// Instantiate the connector ...
|
||||
SourceConnector connector = instantiateConnector(connectorClassName);
|
||||
Map<String, String> connectorConfig = getConnectorConfig(connector, connectorClassName);
|
||||
final SourceConnector connector = instantiateConnector(connectorClassName);
|
||||
final Map<String, String> connectorConfig = getConnectorConfig(connector, connectorClassName);
|
||||
|
||||
// Instantiate the offset store ...
|
||||
OffsetBackingStore offsetStore = initializeOffsetStore(connectorConfig);
|
||||
final OffsetBackingStore offsetStore = initializeOffsetStore(connectorConfig);
|
||||
|
||||
// Set up the offset commit policy ...
|
||||
setOffsetCommitPolicy();
|
||||
|
||||
// Set up offset reader and writer
|
||||
Duration commitTimeout = Duration.ofMillis(config.getLong(OFFSET_COMMIT_TIMEOUT_MS));
|
||||
OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetStore, engineName, keyConverter, valueConverter);
|
||||
OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetStore, engineName, keyConverter, valueConverter);
|
||||
final Duration commitTimeout = Duration.ofMillis(config.getLong(OFFSET_COMMIT_TIMEOUT_MS));
|
||||
final OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetStore, engineName, keyConverter, valueConverter);
|
||||
final OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetStore, engineName, keyConverter, valueConverter);
|
||||
initializeConnector(connector, offsetReader);
|
||||
|
||||
try {
|
||||
@ -728,8 +728,8 @@ public void run() {
|
||||
connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::connectorStarted);
|
||||
|
||||
// Create source connector task
|
||||
List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
|
||||
Class<? extends Task> taskClass = connector.taskClass();
|
||||
final List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
|
||||
final Class<? extends Task> taskClass = connector.taskClass();
|
||||
task = createSourceTask(connector, taskConfigs, taskClass);
|
||||
|
||||
try {
|
||||
@ -741,7 +741,7 @@ public void run() {
|
||||
// Clean-up allocated resources
|
||||
stopSourceTask();
|
||||
// Mask the passwords ...
|
||||
Configuration config = Configuration.from(taskConfigs.get(0)).withMaskedPasswords();
|
||||
final Configuration config = Configuration.from(taskConfigs.get(0)).withMaskedPasswords();
|
||||
String msg = "Unable to initialize and start connector's task class '" + taskClass.getName() + "' with config: "
|
||||
+ config;
|
||||
failRun(msg, t);
|
||||
@ -751,7 +751,7 @@ public void run() {
|
||||
Throwable handlerError = null, retryError = null;
|
||||
try {
|
||||
timeOfLastCommitMillis = clock.currentTimeInMillis();
|
||||
RecordCommitter committer = buildRecordCommitter(offsetWriter, task, commitTimeout);
|
||||
final RecordCommitter committer = buildRecordCommitter(offsetWriter, task, commitTimeout);
|
||||
while (runningThread.get() != null) {
|
||||
List<SourceRecord> changeRecords = null;
|
||||
try {
|
||||
@ -863,7 +863,7 @@ private Map<String, String> getConnectorConfig(final SourceConnector connector,
|
||||
/**
|
||||
* Determines, which offset backing store should be used, instantiate it and start the offset store.
|
||||
*/
|
||||
private OffsetBackingStore initializeOffsetStore(Map<String, String> connectorConfig) throws EmbeddedEngineRuntimeException {
|
||||
private OffsetBackingStore initializeOffsetStore(final Map<String, String> connectorConfig) throws EmbeddedEngineRuntimeException {
|
||||
final String offsetStoreClassName = config.getString(OFFSET_STORAGE);
|
||||
OffsetBackingStore offsetStore = null;
|
||||
try {
|
||||
|
Loading…
Reference in New Issue
Block a user