DBZ-720 Signaling snapshot cancellation via InterruptedException;
Also skipping schema snapshotting when earlier offset with completed snapshot was found
This commit is contained in:
parent
55bfcdda27
commit
f274c83e37
@ -57,7 +57,7 @@ protected SnapshottingTask getSnapshottingTask(OffsetContext previousOffset) {
|
||||
|
||||
// found a previous offset and the earlier snapshot has completed
|
||||
if (previousOffset != null && !previousOffset.isSnapshotRunning()) {
|
||||
snapshotData = false;
|
||||
snapshotSchema = false;
|
||||
snapshotData = false;
|
||||
}
|
||||
else {
|
||||
@ -84,13 +84,13 @@ protected Set<TableId> getAllTableIds(SnapshotContext ctx) throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean lockTablesForSchemaSnapshot(ChangeEventSourceContext sourceContext, SnapshotContext snapshotContext) throws SQLException {
|
||||
protected void lockTablesForSchemaSnapshot(ChangeEventSourceContext sourceContext, SnapshotContext snapshotContext) throws SQLException, InterruptedException {
|
||||
((OracleSnapshotContext)snapshotContext).preSchemaSnapshotSavepoint = jdbcConnection.connection().setSavepoint("dbz_schema_snapshot");
|
||||
|
||||
try (Statement statement = jdbcConnection.connection().createStatement()) {
|
||||
for (TableId tableId : snapshotContext.capturedTables) {
|
||||
if (!sourceContext.isRunning()) {
|
||||
return false;
|
||||
throw new InterruptedException("Interrupted while locking table " + tableId);
|
||||
}
|
||||
|
||||
LOGGER.debug("Locking table {}", tableId);
|
||||
@ -98,8 +98,6 @@ protected boolean lockTablesForSchemaSnapshot(ChangeEventSourceContext sourceCon
|
||||
statement.execute("LOCK TABLE " + tableId.schema() + "." + tableId.table() + " IN EXCLUSIVE MODE");
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -186,7 +184,7 @@ private Optional<Long> getLatestTableDdlScn(SnapshotContext ctx) throws SQLExcep
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean readTableStructure(ChangeEventSourceContext sourceContext, SnapshotContext snapshotContext) throws SQLException {
|
||||
protected void readTableStructure(ChangeEventSourceContext sourceContext, SnapshotContext snapshotContext) throws SQLException, InterruptedException {
|
||||
Set<String> schemas = snapshotContext.capturedTables.stream()
|
||||
.map(TableId::schema)
|
||||
.collect(Collectors.toSet());
|
||||
@ -196,7 +194,7 @@ protected boolean readTableStructure(ChangeEventSourceContext sourceContext, Sna
|
||||
// would take much longer that way
|
||||
for (String schema : schemas) {
|
||||
if (!sourceContext.isRunning()) {
|
||||
return false;
|
||||
throw new InterruptedException("Interrupted while reading structure of schema " + schema);
|
||||
}
|
||||
|
||||
jdbcConnection.readSchema(
|
||||
@ -208,8 +206,6 @@ protected boolean readTableStructure(ChangeEventSourceContext sourceContext, Sna
|
||||
false
|
||||
);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
Loading…
Reference in New Issue
Block a user