DBZ-2410 Reprocess error commitLogs.

This commit is contained in:
Bingqin Zhou 2020-08-06 16:47:12 -07:00 committed by Jiri Pechanec
parent dc095a0fb5
commit 3c873ab181
4 changed files with 27 additions and 2 deletions

View File

@ -21,4 +21,7 @@ public void onSuccessTransfer(File file) {
public void onErrorTransfer(File file) {
CommitLogUtil.deleteCommitLog(file);
}
@Override
public void getErrorCommitLogs() {}
}

View File

@ -177,6 +177,13 @@ public static Optional<SnapshotMode> fromText(String text) {
public static final Field COMMIT_LOG_POST_PROCESSING_ENABLED = Field.create("commit.log.post.processing.enabled")
.withType(Type.BOOLEAN).withDefault(DEFAULT_COMMIT_LOG_POST_PROCESSING_ENABLED);
/**
* Determine if CommitLogProcessor should re-process error commitLogFiles.
*/
public static final boolean DEFAULT_COMMIT_LOG_ERROR_REPROCESSING_ENABLED = false;
public static final Field COMMIT_LOG_ERROR_REPROCESSING_ENABLED = Field.create("commit.log.error.reprocessing.enabled")
.withType(Type.BOOLEAN).withDefault(DEFAULT_COMMIT_LOG_ERROR_REPROCESSING_ENABLED);
/**
* The fully qualified {@link CommitLogTransfer} class used to transfer commit logs.
* The default option will delete all commit log files after processing (successful or otherwise).
@ -345,6 +352,10 @@ public boolean postProcessEnabled() {
return this.getConfig().getBoolean(COMMIT_LOG_POST_PROCESSING_ENABLED);
}
public boolean errorCommitLogReprocessEnabled() {
return this.getConfig().getBoolean(COMMIT_LOG_ERROR_REPROCESSING_ENABLED);
}
public CommitLogTransfer getCommitLogTransfer() {
try {
String clazz = this.getConfig().getString(COMMIT_LOG_TRANSFER_CLASS);

View File

@ -40,6 +40,8 @@ public class CommitLogProcessor extends AbstractProcessor {
private final boolean latestOnly;
private final CommitLogProcessorMetrics metrics = new CommitLogProcessorMetrics();
private boolean initial = true;
private final boolean errorCommitLogReprocessEnabled;
private final CommitLogTransfer commitLogTransfer;
public CommitLogProcessor(CassandraConnectorContext context) throws IOException {
super(NAME, 0);
@ -63,6 +65,8 @@ void handleEvent(WatchEvent event, Path path) throws IOException {
}
};
latestOnly = context.getCassandraConnectorConfig().latestCommitLogOnly();
errorCommitLogReprocessEnabled = context.getCassandraConnectorConfig().errorCommitLogReprocessEnabled();
commitLogTransfer = context.getCassandraConnectorConfig().getCommitLogTransfer();
}
@Override
@ -82,7 +86,9 @@ public void process() throws IOException, InterruptedException {
processLastModifiedCommitLog();
throw new InterruptedException();
}
if (errorCommitLogReprocessEnabled) {
commitLogTransfer.getErrorCommitLogs();
}
if (initial) {
LOGGER.info("Reading existing commit logs in {}", cdcDir);
File[] commitLogFiles = CommitLogUtil.getCommitLogs(cdcDir);
@ -115,7 +121,7 @@ void processCommitLog(File file) throws IOException {
}
LOGGER.info("Successfully processed commit log {}", file.getName());
}
catch (IOException e) {
catch (Exception e) {
if (!latestOnly) {
queue.enqueue(new EOFEvent(file, false));
}

View File

@ -35,4 +35,9 @@ default void destroy() throws Exception {
* Transfer a commit log that has not been successfully processed.
*/
void onErrorTransfer(File file);
/**
* Get all error commitLog files into cdc_raw directory for re-processing.
*/
void getErrorCommitLogs();
}