DBZ-7964 (quickfix) Only reset error retry counter if poll() successfully produced records
This commit is contained in:
parent
19c3fb365b
commit
cac16464b4
@ -182,7 +182,7 @@ public List<SourceRecord> doPoll() throws InterruptedException {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void resetErrorHandlerRetriesIfNeeded() {
|
||||
protected void resetErrorHandlerRetriesIfNeeded(List<SourceRecord> records) {
|
||||
// Reset the retries if all partitions have streamed without exceptions at least once after a restart
|
||||
if (coordinator.getErrorHandler().getRetries() > 0 && ((SqlServerChangeEventSourceCoordinator) coordinator).firstStreamingIterationCompletedSuccessfully()) {
|
||||
coordinator.getErrorHandler().resetRetries();
|
||||
|
@ -289,7 +289,7 @@ public final List<SourceRecord> poll() throws InterruptedException {
|
||||
final List<SourceRecord> records = doPoll();
|
||||
logStatistics(records);
|
||||
|
||||
resetErrorHandlerRetriesIfNeeded();
|
||||
resetErrorHandlerRetriesIfNeeded(records);
|
||||
|
||||
return records;
|
||||
}
|
||||
@ -340,12 +340,12 @@ private void updateLastOffset(Map<String, ?> partition, Map<String, ?> lastOffse
|
||||
* Should be called to reset the error handler's retry counter upon a successful poll or when known
|
||||
* that the connector task has recovered from a previous failure state.
|
||||
*/
|
||||
protected void resetErrorHandlerRetriesIfNeeded() {
|
||||
protected void resetErrorHandlerRetriesIfNeeded(List<SourceRecord> records) {
|
||||
// When a connector throws a retriable error, the task is not re-created and instead the previous
|
||||
// error handler is passed into the new error handler, propagating the retry count. This method
|
||||
// allows resetting that counter when a successful poll iteration step happens so that when a
|
||||
// allows resetting that counter when a successful poll iteration step contains new records so that when a
|
||||
// future failure is thrown, the maximum retry count can be utilized.
|
||||
if (coordinator.getErrorHandler().getRetries() > 0) {
|
||||
if (!records.isEmpty() && coordinator.getErrorHandler().getRetries() > 0) {
|
||||
coordinator.getErrorHandler().resetRetries();
|
||||
}
|
||||
}
|
||||
|
@ -267,7 +267,7 @@ protected List<SourceRecord> doPoll() {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void resetErrorHandlerRetriesIfNeeded() {
|
||||
protected void resetErrorHandlerRetriesIfNeeded(List<SourceRecord> records) {
|
||||
// do nothing as we don't have a coordinator mocked
|
||||
}
|
||||
|
||||
|
@ -171,7 +171,7 @@ protected List<SourceRecord> doPoll() {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void resetErrorHandlerRetriesIfNeeded() {
|
||||
protected void resetErrorHandlerRetriesIfNeeded(List<SourceRecord> records) {
|
||||
// do nothing as we don't have a coordinator mocked
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user