DBZ-4488 Refactor the code

This commit is contained in:
Jiri Pechanec 2023-03-14 13:25:06 +01:00
parent 9d5b6df0f7
commit 84d0d580bc
4 changed files with 14 additions and 16 deletions

View File

@ -42,7 +42,7 @@ public class SqlServerChangeEventSourceCoordinator extends ChangeEventSourceCoor
private final Clock clock; private final Clock clock;
private final Duration pollInterval; private final Duration pollInterval;
private AtomicBoolean firstStreamingIterationCompletedSuccessfully; private final AtomicBoolean firstStreamingIterationCompletedSuccessfully = new AtomicBoolean(false);
public SqlServerChangeEventSourceCoordinator(Offsets<SqlServerPartition, SqlServerOffsetContext> previousOffsets, ErrorHandler errorHandler, public SqlServerChangeEventSourceCoordinator(Offsets<SqlServerPartition, SqlServerOffsetContext> previousOffsets, ErrorHandler errorHandler,
Class<? extends SourceConnector> connectorType, Class<? extends SourceConnector> connectorType,
@ -54,7 +54,6 @@ public SqlServerChangeEventSourceCoordinator(Offsets<SqlServerPartition, SqlServ
Clock clock) { Clock clock) {
super(previousOffsets, errorHandler, connectorType, connectorConfig, changeEventSourceFactory, super(previousOffsets, errorHandler, connectorType, connectorConfig, changeEventSourceFactory,
changeEventSourceMetricsFactory, eventDispatcher, schema); changeEventSourceMetricsFactory, eventDispatcher, schema);
this.firstStreamingIterationCompletedSuccessfully = new AtomicBoolean(false);
this.clock = clock; this.clock = clock;
this.pollInterval = connectorConfig.getPollInterval(); this.pollInterval = connectorConfig.getPollInterval();
} }

View File

@ -264,7 +264,7 @@ public static SnapshotIsolationMode parse(String value, String defaultValue) {
.withWidth(Width.MEDIUM) .withWidth(Width.MEDIUM)
.withImportance(Importance.LOW) .withImportance(Importance.LOW)
.withDefault(DEFAULT_MAX_RETRIES) .withDefault(DEFAULT_MAX_RETRIES)
.withValidation(Field::isPositiveInteger) .withValidation(Field::isInteger)
.withDescription( .withDescription(
"The maximum number of retries on connection errors before failing (-1 = no limit, 0 = disabled, > 0 = num of retries)."); "The maximum number of retries on connection errors before failing (-1 = no limit, 0 = disabled, > 0 = num of retries).");
@ -324,7 +324,8 @@ public static SnapshotIsolationMode parse(String value, String defaultValue) {
SCHEMA_NAME_ADJUSTMENT_MODE, SCHEMA_NAME_ADJUSTMENT_MODE,
INCREMENTAL_SNAPSHOT_OPTION_RECOMPILE, INCREMENTAL_SNAPSHOT_OPTION_RECOMPILE,
INCREMENTAL_SNAPSHOT_CHUNK_SIZE, INCREMENTAL_SNAPSHOT_CHUNK_SIZE,
INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES) INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES,
MAX_RETRIES)
.excluding( .excluding(
SCHEMA_INCLUDE_LIST, SCHEMA_INCLUDE_LIST,
SCHEMA_EXCLUDE_LIST) SCHEMA_EXCLUDE_LIST)

View File

@ -2877,7 +2877,7 @@ private void shouldStopRetriableRestartsAtConfiguredMaximum(SqlRunnable scenario
scenario.run(); scenario.run();
final String message1 = "1 of 1 retries will be attempted"; final String message1 = "1 of 1 retries will be attempted";
final String message2 = "The maximum number of retries: 1 has been attempted"; final String message2 = "The maximum number of 1 retries has been attempted";
Awaitility.await() Awaitility.await()
.alias("Checking for maximum restart messages") .alias("Checking for maximum restart messages")
.pollInterval(100, TimeUnit.MILLISECONDS) .pollInterval(100, TimeUnit.MILLISECONDS)

View File

@ -31,11 +31,7 @@ public class ErrorHandler {
public ErrorHandler(Class<? extends SourceConnector> connectorType, CommonConnectorConfig connectorConfig, public ErrorHandler(Class<? extends SourceConnector> connectorType, CommonConnectorConfig connectorConfig,
ChangeEventQueue<?> queue) { ChangeEventQueue<?> queue) {
this.connectorConfig = connectorConfig; this(connectorType, connectorConfig, queue, 0, -1);
this.queue = queue;
this.producerThrowable = new AtomicReference<>();
this.maxRetries = -1;
this.retries = 0;
} }
/** /**
@ -121,21 +117,23 @@ protected boolean isCustomRetriable(Throwable throwable) {
* @return true if maxRetries is -1 or retries < maxRetries * @return true if maxRetries is -1 or retries < maxRetries
*/ */
protected boolean hasMoreRetries() { protected boolean hasMoreRetries() {
boolean doRetry = maxRetries == -1 || retries < maxRetries; boolean doRetry = unlimitedRetries() || retries < maxRetries;
if (doRetry) { if (doRetry) {
retries++; retries++;
LOGGER.info("{} of {} retries will be attempted", retries, LOGGER.warn("Retry {} of {} retries will be attempted", retries,
maxRetries); unlimitedRetries() ? "unlimited" : maxRetries);
} }
else { else {
String errorMsg = String.format( LOGGER.error("The maximum number of {} retries has been attempted", maxRetries);
"The maximum number of retries: %d has been attempted", maxRetries);
LOGGER.error(errorMsg);
} }
return doRetry; return doRetry;
} }
private boolean unlimitedRetries() {
return maxRetries == -1;
}
public int getRetries() { public int getRetries() {
return retries; return retries;
} }