DBZ-1101 Wait for snapshot when necessary

This commit is contained in:
Jiri Pechanec 2019-01-25 09:48:49 +01:00 committed by Gunnar Morling
parent 9dc2c525c9
commit 52c2cf07dd
5 changed files with 80 additions and 9 deletions

View File

@ -51,10 +51,10 @@ public void before() throws SQLException {
"CREATE TABLE tablenumb (id int IDENTITY(1,1) primary key, cola DECIMAL(8, 4),colb DECIMAL, colc numeric(8,1), cold numeric)",
"CREATE TABLE tablenumc (id int IDENTITY(1,1) primary key, cola DECIMAL(8, 4),colb DECIMAL, colc numeric(8,1), cold numeric)",
"CREATE TABLE tablenumd (id int IDENTITY(1,1) primary key, cola DECIMAL(8, 4),colb DECIMAL, colc numeric(8,1), cold numeric)");
TestHelper.enableTableCdc(connection, "tablea");
TestHelper.enableTableCdc(connection, "tableb");
TestHelper.enableTableCdc(connection, "tablec");
TestHelper.enableTableCdc(connection, "tabled");
TestHelper.enableTableCdc(connection, "tablenuma");
TestHelper.enableTableCdc(connection, "tablenumb");
TestHelper.enableTableCdc(connection, "tablenumc");
TestHelper.enableTableCdc(connection, "tablenumd");
initializeConnectorTestFramework();
Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
@ -84,6 +84,7 @@ public void decimalModeConfigString() throws Exception {
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
TestHelper.waitForSnapshotToBeCompleted();
connection.execute("INSERT INTO tablenuma VALUES (111.1111, 1111111, 1111111.1, 1111111 );");
final SourceRecords records = consumeRecordsByTopic(1);
@ -115,6 +116,7 @@ public void decimalModeConfigDouble() throws Exception {
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
TestHelper.waitForSnapshotToBeCompleted();
connection.execute("INSERT INTO tablenumb VALUES (222.2222, 22222, 22222.2, 2222222 );");
final SourceRecords records = consumeRecordsByTopic(1);
@ -145,6 +147,7 @@ public void decimalModeConfigPrecise() throws Exception {
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
TestHelper.waitForSnapshotToBeCompleted();
connection.execute("INSERT INTO tablenumc VALUES (333.3333, 3333, 3333.3, 33333333 );");
final SourceRecords records = consumeRecordsByTopic(1);

View File

@ -189,6 +189,7 @@ public void takeSchemaOnlySnapshotAndStartStreaming() throws Exception {
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
TestHelper.waitForSnapshotToBeCompleted();
testStreaming();
}
@ -252,8 +253,9 @@ public void takeSchemaOnlySnapshotAndSendHeartbeat() throws Exception {
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
TestHelper.waitForSnapshotToBeCompleted();
final SourceRecord record = consumeRecord();
Assertions.assertThat(record).isNotNull();
Assertions.assertThat(record.topic()).startsWith("__debezium-heartbeat");
}

View File

@ -64,6 +64,7 @@ public void addTable() throws Exception {
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
TestHelper.waitForSnapshotToBeCompleted();
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final int id = ID_START + i;
@ -136,6 +137,7 @@ public void removeTable() throws Exception {
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
TestHelper.waitForSnapshotToBeCompleted();
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final int id = ID_START_1 + i;
@ -191,6 +193,7 @@ private void addColumnToTable(boolean pauseAfterCaptureChange) throws Exception
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
TestHelper.waitForSnapshotToBeCompleted();
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final int id = ID_START_1 + i;
@ -315,6 +318,7 @@ public void removeColumnFromTable() throws Exception {
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
TestHelper.waitForSnapshotToBeCompleted();
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final int id = ID_START_1 + i;
@ -406,6 +410,7 @@ public void readHistoryAfterRestart() throws Exception {
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
TestHelper.waitForSnapshotToBeCompleted();
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final int id = ID_START_1 + i;
@ -479,6 +484,7 @@ public void renameColumn() throws Exception {
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
TestHelper.waitForSnapshotToBeCompleted();
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final int id = ID_START_1 + i;
@ -574,6 +580,7 @@ public void changeColumn() throws Exception {
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
TestHelper.waitForSnapshotToBeCompleted();
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final int id = ID_START_1 + i;

View File

@ -54,6 +54,7 @@ public void before() throws SQLException {
initializeConnectorTestFramework();
Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
Testing.Print.enable();
}
@After
@ -69,12 +70,15 @@ public void createAndDelete() throws Exception {
final int TABLES = 2;
final int ID_START = 10;
final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_SCHEMA_ONLY)
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.build();
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
// Wait for snapshot completion
consumeRecordsByTopic(1);
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final int id = ID_START + i;
connection.execute(
@ -143,12 +147,15 @@ public void update() throws Exception {
final int RECORDS_PER_TABLE = 5;
final int ID_START = 10;
final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_SCHEMA_ONLY)
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.build();
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
// Wait for snapshot completion
consumeRecordsByTopic(1);
connection.setAutoCommit(false);
final String[] tableBInserts = new String[RECORDS_PER_TABLE];
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
@ -200,12 +207,15 @@ public void streamChangesWhileStopped() throws Exception {
final int ID_START = 10;
final int ID_RESTART = 100;
final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_SCHEMA_ONLY)
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.build();
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
// Wait for snapshot completion
consumeRecordsByTopic(1);
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final int id = ID_START + i;
connection.execute(
@ -362,10 +372,16 @@ public void whitelistTable() throws Exception {
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_SCHEMA_ONLY)
.with(SqlServerConnectorConfig.TABLE_WHITELIST, "dbo.tableb")
.build();
connection.execute(
"INSERT INTO tableb VALUES(1, 'b')"
);
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
// Wait for snapshot completion
consumeRecordsByTopic(1);
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final int id = ID_START + i;
connection.execute(
@ -391,13 +407,19 @@ public void blacklistTable() throws Exception {
final int TABLES = 1;
final int ID_START = 10;
final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_SCHEMA_ONLY)
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(SqlServerConnectorConfig.TABLE_BLACKLIST, "dbo.tablea")
.build();
connection.execute(
"INSERT INTO tableb VALUES(1, 'b')"
);
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
// Wait for snapshot completion
consumeRecordsByTopic(1);
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final int id = ID_START + i;
connection.execute(
@ -435,6 +457,9 @@ public void blacklistColumn() throws Exception {
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
// Wait for snapshot completion
consumeRecordsByTopic(1);
connection.execute("INSERT INTO blacklist_column_table_a VALUES(10, 'some_name', 120)");
connection.execute("INSERT INTO blacklist_column_table_b VALUES(11, 'some_name', 447)");

View File

@ -6,10 +6,17 @@
package io.debezium.connector.sqlserver.util;
import java.lang.management.ManagementFactory;
import java.nio.file.Path;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Objects;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -18,7 +25,9 @@
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.util.Clock;
import io.debezium.util.IoUtil;
import io.debezium.util.Metronome;
import io.debezium.util.Testing;
/**
@ -210,4 +219,29 @@ public static void disableTableCdc(SqlServerConnection connection, String name)
String disableCdcForTableStmt = DISABLE_TABLE_CDC.replace(STATEMENTS_PLACEHOLDER, name);
connection.execute(disableCdcForTableStmt);
}
public static void waitForSnapshotToBeCompleted() throws InterruptedException {
int waitForSeconds = 60;
final MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
final Metronome metronome = Metronome.sleeper(Duration.ofSeconds(1), Clock.system());
while (true) {
if (waitForSeconds-- <= 0) {
Assert.fail("Snapshot was not completed on time");
}
try {
final boolean completed = (boolean)mbeanServer.getAttribute(new ObjectName("debezium.sql_server:type=connector-metrics,context=snapshot,server=server1"), "SnapshotCompleted");
if (completed) {
break;
}
}
catch (InstanceNotFoundException e) {
// Metrics has not started yet
}
catch (Exception e) {
throw new IllegalStateException(e);
}
metronome.pause();
}
}
}