DBZ-4488: Reuse the ErrorHandler between retries

This commit is contained in:
Jacob Gminder 2023-03-15 12:13:10 -04:00 committed by Jiri Pechanec
parent 84d0d580bc
commit 5e615ff796
3 changed files with 23 additions and 15 deletions

View File

@ -99,8 +99,12 @@ public ChangeEventSourceCoordinator<SqlServerPartition, SqlServerOffsetContext>
.loggingContextSupplier(() -> taskContext.configureLoggingContext(CONTEXT_NAME))
.build();
int retries = (errorHandler == null) ? 0 : errorHandler.getRetries();
errorHandler = new SqlServerErrorHandler(connectorConfig, queue, retries, connectorConfig.getMaxRetries());
if (errorHandler == null) {
errorHandler = new SqlServerErrorHandler(connectorConfig, queue, connectorConfig.getMaxRetries());
}
else {
errorHandler.initialize(connectorConfig, queue, connectorConfig.getMaxRetries());
}
final SqlServerEventMetadataProvider metadataProvider = new SqlServerEventMetadataProvider();

View File

@ -20,8 +20,8 @@
*/
public class SqlServerErrorHandler extends ErrorHandler {
public SqlServerErrorHandler(SqlServerConnectorConfig connectorConfig, ChangeEventQueue<?> queue, int retries, int maxRetries) {
super(SqlServerConnector.class, connectorConfig, queue, retries, maxRetries);
public SqlServerErrorHandler(SqlServerConnectorConfig connectorConfig, ChangeEventQueue<?> queue, int maxRetries) {
super(SqlServerConnector.class, connectorConfig, queue, maxRetries);
}
@Override

View File

@ -23,27 +23,24 @@ public class ErrorHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(ErrorHandler.class);
private final ChangeEventQueue<?> queue;
private final AtomicReference<Throwable> producerThrowable;
private final CommonConnectorConfig connectorConfig;
private final int maxRetries;
private ChangeEventQueue<?> queue;
private AtomicReference<Throwable> producerThrowable;
private CommonConnectorConfig connectorConfig;
private int maxRetries;
private int retries;
public ErrorHandler(Class<? extends SourceConnector> connectorType, CommonConnectorConfig connectorConfig,
ChangeEventQueue<?> queue) {
this(connectorType, connectorConfig, queue, 0, -1);
this(connectorType, connectorConfig, queue, -1);
}
/**
* Allows a connector that supports setting maximum retries to set the current retries attempts and the maximum retries
*/
public ErrorHandler(Class<? extends SourceConnector> connectorType, CommonConnectorConfig connectorConfig,
ChangeEventQueue<?> queue, int retries, int maxRetries) {
this.connectorConfig = connectorConfig;
this.queue = queue;
this.producerThrowable = new AtomicReference<>();
this.retries = retries;
this.maxRetries = maxRetries;
ChangeEventQueue<?> queue, int maxRetries) {
this.initialize(connectorConfig, queue, maxRetries);
this.retries = 0;
}
public void setProducerThrowable(Throwable producerThrowable) {
@ -67,6 +64,13 @@ public void setProducerThrowable(Throwable producerThrowable) {
}
}
public void initialize(CommonConnectorConfig connectorConfig, ChangeEventQueue<?> queue, int maxRetries) {
this.connectorConfig = connectorConfig;
this.queue = queue;
this.producerThrowable = new AtomicReference<>();
this.maxRetries = maxRetries;
}
public Throwable getProducerThrowable() {
return producerThrowable.get();
}