DBZ-7770 Ability to disable log position validation on startup for MongoDb conenctor

This commit is contained in:
Jakub Cechacek 2024-04-11 09:23:29 +02:00 committed by Jiri Pechanec
parent 6e9235215f
commit 9242d45240

View File

@ -134,7 +134,7 @@ public ChangeEventSourceCoordinator<MongoDbPartition, MongoDbOffsetContext> star
schemaNameAdjuster,
signalProcessor);
validate(taskContext.getConnection(dispatcher, previousOffsets.getTheOnlyPartition()), previousOffsets,
validate(connectorConfig, taskContext.getConnection(dispatcher, previousOffsets.getTheOnlyPartition()), previousOffsets,
snapshotterService.getSnapshotter());
NotificationService<MongoDbPartition, MongoDbOffsetContext> notificationService = new NotificationService<>(getNotificationChannels(),
@ -263,7 +263,8 @@ protected Configuration withMaskedSensitiveOptions(Configuration config) {
return super.withMaskedSensitiveOptions(config).withMasked(MongoDbConnectorConfig.CONNECTION_STRING.name());
}
private void validate(MongoDbConnection mongoDbConnection, Offsets<MongoDbPartition, MongoDbOffsetContext> previousOffsets, Snapshotter snapshotter) {
private void validate(MongoDbConnectorConfig connectorConfig, MongoDbConnection mongoDbConnection, Offsets<MongoDbPartition, MongoDbOffsetContext> previousOffsets,
Snapshotter snapshotter) {
for (Map.Entry<MongoDbPartition, MongoDbOffsetContext> previousOffset : previousOffsets) {
@ -287,25 +288,27 @@ private void validate(MongoDbConnection mongoDbConnection, Offsets<MongoDbPartit
return;
}
boolean logPositionAvailable = mongoDbConnection.validateLogPosition(offset, taskContext);
if (connectorConfig.isLogPositionCheckEnabled()) {
boolean logPositionAvailable = mongoDbConnection.validateLogPosition(offset, taskContext);
if (!logPositionAvailable) {
LOGGER.warn("Last recorded offset is no longer available on the server.");
if (!logPositionAvailable) {
LOGGER.warn("Last recorded offset is no longer available on the server.");
if (snapshotter.shouldSnapshotOnDataError()) {
if (snapshotter.shouldSnapshotOnDataError()) {
LOGGER.info("The last recorded offset is no longer available but we are in {} snapshot mode. " +
"Attempting to snapshot data to fill the gap.",
snapshotter.name());
LOGGER.info("The last recorded offset is no longer available but we are in {} snapshot mode. " +
"Attempting to snapshot data to fill the gap.",
snapshotter.name());
previousOffsets.resetOffset(previousOffsets.getTheOnlyPartition());
previousOffsets.resetOffset(previousOffsets.getTheOnlyPartition());
return;
return;
}
LOGGER.warn("The connector is trying to read change stream starting at " + offset + ", but this is no longer "
+ "available on the server. Reconfigure the connector to use a snapshot when needed if you want to recover. " +
"If not the connector will streaming from the last available position in the log");
}
LOGGER.warn("The connector is trying to read change stream starting at " + offset + ", but this is no longer "
+ "available on the server. Reconfigure the connector to use a snapshot when needed if you want to recover. " +
"If not the connector will streaming from the last available position in the log");
}
}
}