From f274c83e375a63f112e2bef78a93f4b9664cecb1 Mon Sep 17 00:00:00 2001 From: Gunnar Morling Date: Mon, 16 Jul 2018 10:48:28 +0200 Subject: [PATCH] DBZ-720 Signaling snapshot cancellation via InterruptedException; Also skipping schema snapshotting when earlier offset with completed snapshot was found --- .../oracle/OracleSnapshotChangeEventSource.java | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java index abe8f62fa..d5dd5aaaa 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java @@ -57,7 +57,7 @@ protected SnapshottingTask getSnapshottingTask(OffsetContext previousOffset) { // found a previous offset and the earlier snapshot has completed if (previousOffset != null && !previousOffset.isSnapshotRunning()) { - snapshotData = false; + snapshotSchema = false; snapshotData = false; } else { @@ -84,13 +84,13 @@ protected Set getAllTableIds(SnapshotContext ctx) throws Exception { } @Override - protected boolean lockTablesForSchemaSnapshot(ChangeEventSourceContext sourceContext, SnapshotContext snapshotContext) throws SQLException { + protected void lockTablesForSchemaSnapshot(ChangeEventSourceContext sourceContext, SnapshotContext snapshotContext) throws SQLException, InterruptedException { ((OracleSnapshotContext)snapshotContext).preSchemaSnapshotSavepoint = jdbcConnection.connection().setSavepoint("dbz_schema_snapshot"); try (Statement statement = jdbcConnection.connection().createStatement()) { for (TableId tableId : snapshotContext.capturedTables) { if (!sourceContext.isRunning()) { - return false; + throw new InterruptedException("Interrupted while locking table " + tableId); } LOGGER.debug("Locking table {}", tableId); @@ -98,8 +98,6 @@ protected boolean lockTablesForSchemaSnapshot(ChangeEventSourceContext sourceCon statement.execute("LOCK TABLE " + tableId.schema() + "." + tableId.table() + " IN EXCLUSIVE MODE"); } } - - return true; } @Override @@ -186,7 +184,7 @@ private Optional getLatestTableDdlScn(SnapshotContext ctx) throws SQLExcep } @Override - protected boolean readTableStructure(ChangeEventSourceContext sourceContext, SnapshotContext snapshotContext) throws SQLException { + protected void readTableStructure(ChangeEventSourceContext sourceContext, SnapshotContext snapshotContext) throws SQLException, InterruptedException { Set schemas = snapshotContext.capturedTables.stream() .map(TableId::schema) .collect(Collectors.toSet()); @@ -196,7 +194,7 @@ protected boolean readTableStructure(ChangeEventSourceContext sourceContext, Sna // would take much longer that way for (String schema : schemas) { if (!sourceContext.isRunning()) { - return false; + throw new InterruptedException("Interrupted while reading structure of schema " + schema); } jdbcConnection.readSchema( @@ -208,8 +206,6 @@ protected boolean readTableStructure(ChangeEventSourceContext sourceContext, Sna false ); } - - return true; } @Override