DBZ-4727 Add snapshot paused JXM metrics

Add two JXM objects:
* `snapshotPaused` - detemines if the incremental snapshot is paused
  or not
* `snapshotPausedDurationInSeconds` - overall time when the incremental
  snapshot was paused. The time adds up - if the snapshot was paused
  e.g. two times, the `snapshotPausedDurationInSeconds` is the sum of
  these two paused times.
This commit is contained in:
Vojtech Juranek 2022-07-19 13:20:31 +02:00 committed by Chris Cranford
parent ed504173e1
commit c7d8353d7b
12 changed files with 270 additions and 1 deletions

View File

@ -107,6 +107,7 @@ public void pauseSnapshot(MongoDbPartition partition, OffsetContext offsetContex
context = (IncrementalSnapshotContext<CollectionId>) offsetContext.getIncrementalSnapshotContext();
if (context.snapshotRunning() && !context.isSnapshotPaused()) {
context.pauseSnapshot();
progressListener.snapshotPaused(partition);
}
}
@ -115,6 +116,7 @@ public void resumeSnapshot(MongoDbPartition partition, OffsetContext offsetConte
context = (IncrementalSnapshotContext<CollectionId>) offsetContext.getIncrementalSnapshotContext();
if (context.snapshotRunning() && context.isSnapshotPaused()) {
context.resumeSnapshot();
progressListener.snapshotResumed(partition);
window.clear();
context.revertChunk();
readChunk(partition);

View File

@ -9,11 +9,16 @@
import static org.junit.Assert.fail;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.List;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.kafka.connect.source.SourceRecord;
import org.bson.Document;
import org.junit.Assert;
import org.junit.Test;
/**
@ -100,6 +105,8 @@ public void testSnapshotOnlyMetrics() throws Exception {
assertThat(mBeanServer.getAttribute(objectName, "CapturedTables")).isEqualTo(new String[]{ "rs0.dbit.restaurants" });
assertThat(mBeanServer.getAttribute(objectName, "LastEvent")).isNotNull();
assertThat(mBeanServer.getAttribute(objectName, "NumberOfDisconnects")).isEqualTo(0L);
assertThat(mBeanServer.getAttribute(objectName, "SnapshotPaused")).isEqualTo(false);
assertThat(mBeanServer.getAttribute(objectName, "SnapshotPausedDurationInSeconds")).isEqualTo(0L);
}
@Test
@ -141,4 +148,60 @@ public void testStreamingOnlyMetrics() throws Exception {
assertThat(mBeanServer.getAttribute(objectName, "NumberOfDisconnects")).isEqualTo(0L);
assertThat(mBeanServer.getAttribute(objectName, "NumberOfPrimaryElections")).isEqualTo(0L);
}
@Test
public void testPauseResumeSnapshotMetrics() throws Exception {
final String DOCUMENT_ID = "_id";
final int NUM_RECORDS = 1_000;
this.config = TestHelper.getConfiguration()
.edit()
.with(MongoDbConnectorConfig.SNAPSHOT_MODE, MongoDbConnectorConfig.SnapshotMode.NEVER)
.with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*")
.with(MongoDbConnectorConfig.SIGNAL_DATA_COLLECTION, "dbit.debezium_signal")
.with(MongoDbConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 1)
.build();
this.context = new MongoDbTaskContext(config);
TestHelper.cleanDatabase(primary(), "dbit");
final Document[] documents = new Document[NUM_RECORDS];
for (int i = 0; i < NUM_RECORDS; i++) {
final Document doc = new Document();
doc.append(DOCUMENT_ID, i + 1).append("aa", i);
documents[i] = doc;
}
insertDocumentsInTx("dbit", "numbers", documents);
// Start connector.
start(MongoDbConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning("mongodb", "mongo1");
// Start incremental snapshot.
insertDocuments("dbit", "debezium_signal", new Document[]{ Document.parse(
"{\"type\": \"execute-snapshot\", \"payload\": \"{\\\"data-collections\\\": [\\\" dbit.numbers \\\"]}\"}") });
// Pause incremental snapshot.
insertDocuments("dbit", "debezium_signal", new Document[]{ Document.parse(
"{\"type\": \"pause-snapshot\", \"payload\": \"{}\"}") });
// Sleep more than 1 second, we get the pause in seconds.
Thread.sleep(1500);
// Resume incremental snapshot.
insertDocuments("dbit", "debezium_signal", new Document[]{ Document.parse(
"{\"type\": \"resume-snapshot\", \"payload\": \"{}\"}") });
// Consume incremental snapshot records.
List<SourceRecord> records = new ArrayList<>();
consumeRecords(NUM_RECORDS, record -> {
records.add(record);
});
Assert.assertTrue(records.size() >= NUM_RECORDS);
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
final ObjectName objectName = getSnapshotMetricsObjectName("mongodb", "mongo1");
final long snapshotPauseDuration = (Long) mBeanServer.getAttribute(objectName, "SnapshotPausedDurationInSeconds");
Assert.assertTrue(snapshotPauseDuration > 0);
}
}

View File

@ -11,6 +11,8 @@
import java.nio.file.Path;
import java.sql.Connection;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.management.InstanceNotFoundException;
@ -18,12 +20,14 @@
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.kafka.connect.source.SourceRecord;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
import io.debezium.data.VerifyRecord;
import io.debezium.embedded.AbstractConnectorTest;
@ -127,6 +131,61 @@ public void testSnapshotOnlyMetrics() throws Exception {
assertStreamingMetricsExist();
}
@Test
public void testPauseResumeSnapshotMetrics() throws Exception {
final int NUM_RECORDS = 1_000;
final String TABLE_NAME = DATABASE.qualifiedTableName("simple");
final String SIGNAL_TABLE_NAME = DATABASE.qualifiedTableName("debezium_signal");
try (Connection connection = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName()).connection()) {
for (int i = 1; i < NUM_RECORDS; i++) {
connection.createStatement().execute(String.format("INSERT INTO %s (val) VALUES (%d);", TABLE_NAME, i));
}
}
// Start connector.
start(MySqlConnector.class,
DATABASE.defaultConfig()
.with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class)
.with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH)
.with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, String.format("%s, %s", TABLE_NAME, SIGNAL_TABLE_NAME))
.with(DatabaseHistory.STORE_ONLY_CAPTURED_TABLES_DDL, Boolean.TRUE)
.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 1)
.with(MySqlConnectorConfig.SIGNAL_DATA_COLLECTION, SIGNAL_TABLE_NAME)
.build());
assertConnectorIsRunning();
waitForSnapshotToBeCompleted();
waitForStreamingToStart();
// Consume initial snapshot records.
List<SourceRecord> records = new ArrayList<>();
consumeRecords(NUM_RECORDS, record -> {
records.add(record);
});
try (Connection connection = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName()).connection()) {
// Start incremental snapshot.
connection.createStatement().execute(String.format(
"INSERT INTO debezium_signal VALUES('ad-hoc', 'execute-snapshot', '{\"data-collections\": [\"%s\"]}')", TABLE_NAME));
// Pause incremental snapshot.
connection.createStatement().execute(String.format("INSERT INTO %s VALUES('test-pause', 'pause-snapshot', '')", SIGNAL_TABLE_NAME));
// Sleep more than 1 second, we get the pause in seconds.
Thread.sleep(1500);
// Resume incremental snapshot.
connection.createStatement().execute(String.format("INSERT INTO debezium_signal VALUES('test-resume', 'resume-snapshot', '')", SIGNAL_TABLE_NAME));
}
// Consume incremental snapshot records.
consumeRecords(NUM_RECORDS, record -> {
records.add(record);
});
Assert.assertTrue(records.size() >= 2 * NUM_RECORDS);
assertSnapshotPauseNotZero();
}
@Test
public void testSnapshotAndStreamingMetrics() throws Exception {
// Setup
@ -210,6 +269,17 @@ private void assertSnapshotMetricsExist() throws Exception {
}
}
private void assertSnapshotPauseNotZero() throws Exception {
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
try {
final long snapshotPauseDuration = (Long) mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotPausedDurationInSeconds");
Assert.assertTrue(snapshotPauseDuration > 0);
}
catch (InstanceNotFoundException e) {
Assert.fail("Snapshot Metrics should exist");
}
}
private void assertSnapshotMetrics() throws Exception {
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
@ -230,6 +300,8 @@ private void assertSnapshotMetrics() throws Exception {
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotRunning")).isEqualTo(false);
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotAborted")).isEqualTo(false);
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotCompleted")).isEqualTo(true);
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotPaused")).isEqualTo(false);
assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotPausedDurationInSeconds")).isEqualTo(0L);
}
private void assertStreamingMetrics(long events) throws Exception {

View File

@ -1 +1,7 @@
CREATE TABLE simple (pk integer auto_increment, val integer not null, primary key(pk));
CREATE TABLE simple (pk integer auto_increment, val integer not null, primary key(pk));
CREATE TABLE debezium_signal (
id varchar(64),
type varchar(32),
data varchar(2048)
);

View File

@ -154,6 +154,8 @@ private void assertSnapshotMetrics() throws Exception {
Assertions.assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotRunning")).isEqualTo(false);
Assertions.assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotAborted")).isEqualTo(false);
Assertions.assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotCompleted")).isEqualTo(true);
Assertions.assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotPaused")).isEqualTo(false);
Assertions.assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotPausedDurationInSeconds")).isEqualTo(0L);
}
private void assertSnapshotNotExecutedMetrics() throws Exception {

View File

@ -40,6 +40,11 @@ public boolean getSnapshotRunning() {
return snapshotMeter.getSnapshotRunning();
}
@Override
public boolean getSnapshotPaused() {
return snapshotMeter.getSnapshotPaused();
}
@Override
public boolean getSnapshotCompleted() {
return snapshotMeter.getSnapshotCompleted();
@ -55,6 +60,11 @@ public long getSnapshotDurationInSeconds() {
return snapshotMeter.getSnapshotDurationInSeconds();
}
@Override
public long getSnapshotPausedDurationInSeconds() {
return snapshotMeter.getSnapshotPausedDurationInSeconds();
}
@Override
public String[] getCapturedTables() {
return snapshotMeter.getCapturedTables();
@ -72,6 +82,14 @@ void snapshotStarted() {
snapshotMeter.snapshotStarted();
}
void snapshotPaused() {
snapshotMeter.snapshotPaused();
}
void snapshotResumed() {
snapshotMeter.snapshotResumed();
}
void snapshotCompleted() {
snapshotMeter.snapshotCompleted();
}

View File

@ -38,6 +38,16 @@ public void snapshotStarted(SqlServerPartition partition) {
onPartitionEvent(partition, SqlServerSnapshotPartitionMetrics::snapshotStarted);
}
@Override
public void snapshotPaused(SqlServerPartition partition) {
onPartitionEvent(partition, SqlServerSnapshotPartitionMetrics::snapshotPaused);
}
@Override
public void snapshotResumed(SqlServerPartition partition) {
onPartitionEvent(partition, SqlServerSnapshotPartitionMetrics::snapshotResumed);
}
@Override
public void monitoredDataCollectionsDetermined(SqlServerPartition partition, Iterable<? extends DataCollectionId> dataCollectionIds) {
onPartitionEvent(partition, bean -> bean.monitoredDataCollectionsDetermined(dataCollectionIds));

View File

@ -15,6 +15,9 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.annotation.ThreadSafe;
import io.debezium.pipeline.metrics.traits.SnapshotMetricsMXBean;
import io.debezium.relational.TableId;
@ -26,12 +29,17 @@
*/
@ThreadSafe
public class SnapshotMeter implements SnapshotMetricsMXBean {
private static final Logger LOGGER = LoggerFactory.getLogger(SnapshotMeter.class);
private final AtomicBoolean snapshotRunning = new AtomicBoolean();
private final AtomicBoolean snapshotPaused = new AtomicBoolean();
private final AtomicBoolean snapshotCompleted = new AtomicBoolean();
private final AtomicBoolean snapshotAborted = new AtomicBoolean();
private final AtomicLong startTime = new AtomicLong();
private final AtomicLong stopTime = new AtomicLong();
private final AtomicLong startPauseTime = new AtomicLong();
private final AtomicLong stopPauseTime = new AtomicLong();
private final AtomicLong pauseDuration = new AtomicLong();
private final ConcurrentMap<String, Long> rowsScanned = new ConcurrentHashMap<>();
private final ConcurrentMap<String, String> remainingTables = new ConcurrentHashMap<>();
@ -65,6 +73,11 @@ public boolean getSnapshotRunning() {
return this.snapshotRunning.get();
}
@Override
public boolean getSnapshotPaused() {
return this.snapshotPaused.get();
}
@Override
public boolean getSnapshotCompleted() {
return this.snapshotCompleted.get();
@ -88,6 +101,12 @@ public long getSnapshotDurationInSeconds() {
return (stopMillis - startMillis) / 1000L;
}
@Override
public long getSnapshotPausedDurationInSeconds() {
final long pausedMillis = pauseDuration.get();
return pausedMillis < 0 ? 0 : pausedMillis / 1000L;
}
@Override
public String[] getCapturedTables() {
return capturedTables.toArray(new String[0]);
@ -107,16 +126,50 @@ public void dataCollectionSnapshotCompleted(DataCollectionId dataCollectionId, l
public void snapshotStarted() {
this.snapshotRunning.set(true);
this.snapshotPaused.set(false);
this.snapshotCompleted.set(false);
this.snapshotAborted.set(false);
this.startTime.set(clock.currentTimeInMillis());
this.stopTime.set(0L);
this.startPauseTime.set(0);
this.stopPauseTime.set(0);
this.pauseDuration.set(0);
}
public void snapshotPaused() {
this.snapshotRunning.set(false);
this.snapshotPaused.set(true);
this.snapshotCompleted.set(false);
this.snapshotAborted.set(false);
this.startPauseTime.set(clock.currentTimeInMillis());
this.stopPauseTime.set(0L);
}
public void snapshotResumed() {
this.snapshotRunning.set(true);
this.snapshotPaused.set(false);
this.snapshotCompleted.set(false);
this.snapshotAborted.set(false);
final long currTime = clock.currentTimeInMillis();
this.stopPauseTime.set(currTime);
long pauseStartTime = this.startPauseTime.get();
if (pauseStartTime < 0L) {
pauseStartTime = currTime;
}
long pausedTime = this.pauseDuration.get();
if (pausedTime < 0L) {
pausedTime = 0;
}
this.pauseDuration.set(pausedTime + currTime - pauseStartTime);
}
public void snapshotCompleted() {
this.snapshotCompleted.set(true);
this.snapshotAborted.set(false);
this.snapshotRunning.set(false);
this.snapshotPaused.set(false);
this.stopTime.set(clock.currentTimeInMillis());
}
@ -124,6 +177,7 @@ public void snapshotAborted() {
this.snapshotCompleted.set(false);
this.snapshotAborted.set(true);
this.snapshotRunning.set(false);
this.snapshotPaused.set(false);
this.stopTime.set(clock.currentTimeInMillis());
}
@ -179,10 +233,14 @@ private String arrayToString(Object[] array) {
public void reset() {
snapshotRunning.set(false);
snapshotPaused.set(false);
snapshotCompleted.set(false);
snapshotAborted.set(false);
startTime.set(0);
stopTime.set(0);
startPauseTime.set(0);
stopPauseTime.set(0);
pauseDuration.set(0);
rowsScanned.clear();
remainingTables.clear();
capturedTables.clear();

View File

@ -55,6 +55,11 @@ public boolean getSnapshotRunning() {
return snapshotMeter.getSnapshotRunning();
}
@Override
public boolean getSnapshotPaused() {
return snapshotMeter.getSnapshotPaused();
}
@Override
public boolean getSnapshotCompleted() {
return snapshotMeter.getSnapshotCompleted();
@ -70,6 +75,11 @@ public long getSnapshotDurationInSeconds() {
return snapshotMeter.getSnapshotDurationInSeconds();
}
@Override
public long getSnapshotPausedDurationInSeconds() {
return snapshotMeter.getSnapshotPausedDurationInSeconds();
}
@Override
public String[] getCapturedTables() {
return snapshotMeter.getCapturedTables();
@ -90,6 +100,16 @@ public void snapshotStarted(P partition) {
snapshotMeter.snapshotStarted();
}
@Override
public void snapshotPaused(P partition) {
snapshotMeter.snapshotPaused();
}
@Override
public void snapshotResumed(P partition) {
snapshotMeter.snapshotResumed();
}
@Override
public void snapshotCompleted(P partition) {
snapshotMeter.snapshotCompleted();

View File

@ -18,12 +18,16 @@ public interface SnapshotMetricsMXBean extends SchemaMetricsMXBean {
boolean getSnapshotRunning();
boolean getSnapshotPaused();
boolean getSnapshotCompleted();
boolean getSnapshotAborted();
long getSnapshotDurationInSeconds();
long getSnapshotPausedDurationInSeconds();
Map<String, Long> getRowsScanned();
String getChunkId();

View File

@ -107,6 +107,7 @@ public void pauseSnapshot(P partition, OffsetContext offsetContext) throws Inter
context = (IncrementalSnapshotContext<T>) offsetContext.getIncrementalSnapshotContext();
if (context.snapshotRunning() && !context.isSnapshotPaused()) {
context.pauseSnapshot();
progressListener.snapshotPaused(partition);
}
}
@ -115,6 +116,7 @@ public void resumeSnapshot(P partition, OffsetContext offsetContext) throws Inte
context = (IncrementalSnapshotContext<T>) offsetContext.getIncrementalSnapshotContext();
if (context.snapshotRunning() && context.isSnapshotPaused()) {
context.resumeSnapshot();
progressListener.snapshotResumed(partition);
readChunk(partition);
}
}

View File

@ -18,6 +18,10 @@ public interface SnapshotProgressListener<P extends Partition> {
void snapshotStarted(P partition);
void snapshotPaused(P partition);
void snapshotResumed(P partition);
void monitoredDataCollectionsDetermined(P partition, Iterable<? extends DataCollectionId> dataCollectionIds);
void snapshotCompleted(P partition);
@ -39,6 +43,14 @@ static <P extends Partition> SnapshotProgressListener<P> NO_OP() {
public void snapshotStarted(P partition) {
}
@Override
public void snapshotPaused(P partition) {
}
@Override
public void snapshotResumed(P partition) {
}
@Override
public void rowsScanned(P partition, TableId tableId, long numRows) {
}