DBZ-7759 Use MongoException in error handling

This commit is contained in:
ani-sha 2024-04-11 12:49:09 +05:30 committed by Jiri Pechanec
parent 1b7481d357
commit aef55a1ccd

View File

@ -22,7 +22,6 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.mongodb.MongoChangeStreamException;
import com.mongodb.MongoException; import com.mongodb.MongoException;
import com.mongodb.ServerAddress; import com.mongodb.ServerAddress;
import com.mongodb.ServerCursor; import com.mongodb.ServerCursor;
@ -227,13 +226,13 @@ private void fetchEvents(MongoChangeStreamCursor<ChangeStreamDocument<TResult>>
private Optional<ResumableChangeStreamEvent<TResult>> fetchEvent(MongoChangeStreamCursor<ChangeStreamDocument<TResult>> cursor) { private Optional<ResumableChangeStreamEvent<TResult>> fetchEvent(MongoChangeStreamCursor<ChangeStreamDocument<TResult>> cursor) {
var beforeEventPollTime = clock.currentTimeAsInstant(); var beforeEventPollTime = clock.currentTimeAsInstant();
ChangeStreamDocument<TResult> document; ChangeStreamDocument<TResult> document = null;
try { try {
document = cursor.tryNext(); document = cursor.tryNext();
} }
catch (MongoException e) { catch (MongoException e) {
running.set(false); running.set(false);
throw new MongoChangeStreamException("Error while fetching change stream event" + e.getMessage()); LOGGER.error("Error while fetching change stream event", e);
} }
metrics.onSourceEventPolled(document, clock, beforeEventPollTime); metrics.onSourceEventPolled(document, clock, beforeEventPollTime);
throttleIfNeeded(document); throttleIfNeeded(document);
@ -307,19 +306,14 @@ public BufferingChangeStreamCursor<TResult> start() {
@Override @Override
public ResumableChangeStreamEvent<TResult> tryNext() { public ResumableChangeStreamEvent<TResult> tryNext() {
var event = pollWithDelay(); var event = pollWithDelay();
try {
if (event != null) { if (event != null) {
lastResumeToken = event.resumeToken; lastResumeToken = event.resumeToken;
} }
else { else {
if (!fetcher.isRunning() && fetcher.isEmpty()) { if (!fetcher.isRunning() && fetcher.isEmpty()) {
throw new MongoChangeStreamException("Failed to fetch event from change stream cursor, cursor is closed"); throw new MongoException("Fetcher thread has stopped and buffer is empty");
} }
} }
}
catch (MongoException e) {
throw new MongoException(e.getMessage());
}
return event; return event;
} }