DBZ-1875 Test stabilization;

* Snapshot metrics are present with snapshot never
* Skip extra empty TX

Co-authored-by: Gunnar Morling <gunnar.morling@googlemail.com>
This commit is contained in:
Jiri Pechanec 2020-03-13 11:40:00 +01:00 committed by GitHub
parent 4ffed1fa46
commit b114520f5e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 41 additions and 10 deletions

View File

@ -14,6 +14,7 @@
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.awaitility.Awaitility;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Assert;
@ -126,15 +127,7 @@ public void testStreamingOnlyMetrics() throws Exception {
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
.build());
// Check snapshot metrics do not exist
try {
mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "TotalNumberOfEventsSeen");
Assert.fail("Expected Snapshot Metrics to not exist");
}
catch (InstanceNotFoundException e) {
// expected
}
assertSnapshotNotExecutedMetrics();
assertStreamingMetrics();
}
@ -154,6 +147,29 @@ private void assertSnapshotMetrics() throws Exception {
Assertions.assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotCompleted")).isEqualTo(true);
}
private void assertSnapshotNotExecutedMetrics() throws Exception {
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
Awaitility.await("Waiting for snapshot metrics to appear").atMost(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS).until(() -> {
try {
mBeanServer.getObjectInstance(getSnapshotMetricsObjectName());
return true;
}
catch (InstanceNotFoundException e) {
return false;
}
});
// Check snapshot metrics
Assertions.assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "TotalTableCount")).isEqualTo(0);
Assertions.assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "MonitoredTables")).isEqualTo(new String[]{});
Assertions.assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "TotalNumberOfEventsSeen")).isEqualTo(0L);
Assertions.assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "RemainingTableCount")).isEqualTo(0);
Assertions.assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotRunning")).isEqualTo(false);
Assertions.assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotAborted")).isEqualTo(false);
Assertions.assertThat(mBeanServer.getAttribute(getSnapshotMetricsObjectName(), "SnapshotCompleted")).isEqualTo(false);
}
private void assertStreamingMetrics() throws Exception {
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();

View File

@ -7,10 +7,13 @@
package io.debezium.connector.postgresql;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.source.SourceRecord;
import org.awaitility.Awaitility;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Before;
@ -80,7 +83,19 @@ public void transactionMetadata() throws InterruptedException {
TestHelper.execute(INSERT_STMT);
// BEGIN, 2 * data, END
final List<SourceRecord> records = consumeRecordsByTopic(4).allRecordsInOrder();
final List<SourceRecord> records = new ArrayList<>();
// Database sometimes insert an empty transaction, we must skip those
Awaitility.await("Skip empty transactions and find the data").atMost(Duration.ofSeconds(TestHelper.waitTimeForRecords() * 3)).until(() -> {
final List<SourceRecord> candidate = consumeRecordsByTopic(2).allRecordsInOrder();
if (candidate.get(1).topic().contains("transaction")) {
// empty transaction, should be skipped
return false;
}
records.addAll(candidate);
records.addAll(consumeRecordsByTopic(2).allRecordsInOrder());
return true;
});
Assertions.assertThat(records).hasSize(4);
final String txId = assertBeginTransaction(records.get(0));