diff --git a/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalSnapshotChangeEventSource.java b/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalSnapshotChangeEventSource.java index 16f2de071..42016d90f 100644 --- a/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalSnapshotChangeEventSource.java +++ b/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalSnapshotChangeEventSource.java @@ -10,6 +10,7 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; +import java.time.Duration; import java.util.HashSet; import java.util.Set; @@ -28,6 +29,8 @@ import io.debezium.schema.SchemaChangeEvent; import io.debezium.util.Clock; import io.debezium.util.Strings; +import io.debezium.util.Threads; +import io.debezium.util.Threads.Timer; /** * Base class for {@link SnapshotChangeEventSource} for relational databases with a schema history. @@ -43,6 +46,11 @@ public abstract class HistorizedRelationalSnapshotChangeEventSource implements S private static final Logger LOGGER = LoggerFactory.getLogger(HistorizedRelationalSnapshotChangeEventSource.class); + /** + * Interval for showing a log statement with the progress while scanning a single table. + */ + private static final Duration LOG_INTERVAL = Duration.ofMillis(10_000); + private final RelationalDatabaseConnectorConfig connectorConfig; private final OffsetContext previousOffset; private final JdbcConnection jdbcConnection; @@ -234,7 +242,7 @@ private boolean createDataEvents(ChangeEventSourceContext sourceContext, Snapsho LOGGER.debug("Scanning table {}", tableId); - if (!createDataEventsForTable(snapshotContext, snapshotReceiver, snapshotContext.tables.forTable(tableId))) { + if (!createDataEventsForTable(sourceContext, snapshotContext, snapshotReceiver, snapshotContext.tables.forTable(tableId))) { return false; } } @@ -249,7 +257,7 @@ private boolean createDataEvents(ChangeEventSourceContext sourceContext, Snapsho /** * Dispatches the data change events for the records of a single table. */ - private boolean createDataEventsForTable(SnapshotContext snapshotContext, SnapshotReceiver snapshotReceiver, + private boolean createDataEventsForTable(ChangeEventSourceContext sourceContext, SnapshotContext snapshotContext, SnapshotReceiver snapshotReceiver, Table table) throws InterruptedException { long exportStart = clock.currentTimeInMillis(); @@ -264,18 +272,24 @@ private boolean createDataEventsForTable(SnapshotContext snapshotContext, Snapsh Column[] columns = getColumnsForResultSet(table, rs); final int numColumns = table.columns().size(); int rows = 0; + Timer logTimer = getTableScanLogTimer(); while (rs.next()) { + if (!sourceContext.isRunning()) { + return false; + } + rows++; final Object[] row = new Object[numColumns]; for (int i = 0; i < numColumns; i++) { row[i] = getColumnValue(rs, i + 1, columns[i]); } - if (rows % 10_000 == 0) { + if (logTimer.expired()) { long stop = clock.currentTimeInMillis(); LOGGER.info("\t Exported {} records for table '{}' after {}", rows, table.id(), Strings.duration(stop - exportStart)); + logTimer = getTableScanLogTimer(); } dispatcher.dispatchSnapshotEvent(table.id(), getChangeRecordEmitter(snapshotContext, row), @@ -292,6 +306,10 @@ private boolean createDataEventsForTable(SnapshotContext snapshotContext, Snapsh return true; } + private Timer getTableScanLogTimer() { + return Threads.timer(clock, LOG_INTERVAL); + } + /** * Returns a {@link ChangeRecordEmitter} producing the change records for the given table row. */