DBZ-720 Making scan of single table interruptable;

Also better logging.
This commit is contained in:
Gunnar Morling 2018-07-13 11:12:33 +02:00 committed by Jiri Pechanec
parent 648d695c08
commit c5e2d5f9e7

View File

@ -10,6 +10,7 @@
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.HashSet;
import java.util.Set;
@ -28,6 +29,8 @@
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.util.Clock;
import io.debezium.util.Strings;
import io.debezium.util.Threads;
import io.debezium.util.Threads.Timer;
/**
* Base class for {@link SnapshotChangeEventSource} for relational databases with a schema history.
@ -43,6 +46,11 @@ public abstract class HistorizedRelationalSnapshotChangeEventSource implements S
private static final Logger LOGGER = LoggerFactory.getLogger(HistorizedRelationalSnapshotChangeEventSource.class);
/**
* Interval for showing a log statement with the progress while scanning a single table.
*/
private static final Duration LOG_INTERVAL = Duration.ofMillis(10_000);
private final RelationalDatabaseConnectorConfig connectorConfig;
private final OffsetContext previousOffset;
private final JdbcConnection jdbcConnection;
@ -234,7 +242,7 @@ private boolean createDataEvents(ChangeEventSourceContext sourceContext, Snapsho
LOGGER.debug("Scanning table {}", tableId);
if (!createDataEventsForTable(snapshotContext, snapshotReceiver, snapshotContext.tables.forTable(tableId))) {
if (!createDataEventsForTable(sourceContext, snapshotContext, snapshotReceiver, snapshotContext.tables.forTable(tableId))) {
return false;
}
}
@ -249,7 +257,7 @@ private boolean createDataEvents(ChangeEventSourceContext sourceContext, Snapsho
/**
* Dispatches the data change events for the records of a single table.
*/
private boolean createDataEventsForTable(SnapshotContext snapshotContext, SnapshotReceiver snapshotReceiver,
private boolean createDataEventsForTable(ChangeEventSourceContext sourceContext, SnapshotContext snapshotContext, SnapshotReceiver snapshotReceiver,
Table table) throws InterruptedException {
long exportStart = clock.currentTimeInMillis();
@ -264,18 +272,24 @@ private boolean createDataEventsForTable(SnapshotContext snapshotContext, Snapsh
Column[] columns = getColumnsForResultSet(table, rs);
final int numColumns = table.columns().size();
int rows = 0;
Timer logTimer = getTableScanLogTimer();
while (rs.next()) {
if (!sourceContext.isRunning()) {
return false;
}
rows++;
final Object[] row = new Object[numColumns];
for (int i = 0; i < numColumns; i++) {
row[i] = getColumnValue(rs, i + 1, columns[i]);
}
if (rows % 10_000 == 0) {
if (logTimer.expired()) {
long stop = clock.currentTimeInMillis();
LOGGER.info("\t Exported {} records for table '{}' after {}", rows, table.id(),
Strings.duration(stop - exportStart));
logTimer = getTableScanLogTimer();
}
dispatcher.dispatchSnapshotEvent(table.id(), getChangeRecordEmitter(snapshotContext, row),
@ -292,6 +306,10 @@ private boolean createDataEventsForTable(SnapshotContext snapshotContext, Snapsh
return true;
}
private Timer getTableScanLogTimer() {
return Threads.timer(clock, LOG_INTERVAL);
}
/**
* Returns a {@link ChangeRecordEmitter} producing the change records for the given table row.
*/