|
|
|
@ -27,9 +27,8 @@
|
|
|
|
|
import org.apache.kafka.connect.data.Struct;
|
|
|
|
|
import org.apache.kafka.connect.source.SourceConnector;
|
|
|
|
|
import org.apache.kafka.connect.source.SourceRecord;
|
|
|
|
|
import org.assertj.core.api.Assertions;
|
|
|
|
|
import org.awaitility.Awaitility;
|
|
|
|
|
import org.fest.assertions.Assertions;
|
|
|
|
|
import org.fest.assertions.MapAssert;
|
|
|
|
|
import org.junit.Test;
|
|
|
|
|
|
|
|
|
|
import io.debezium.config.CommonConnectorConfig;
|
|
|
|
@ -366,7 +365,7 @@ public void snapshotOnly() throws Exception {
|
|
|
|
|
final int expectedRecordCount = ROW_COUNT;
|
|
|
|
|
final Map<Integer, Integer> dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount);
|
|
|
|
|
for (int i = 0; i < expectedRecordCount; i++) {
|
|
|
|
|
Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i));
|
|
|
|
|
Assertions.assertThat(dbChanges).contains(Assertions.entry(i + 1, i));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -382,7 +381,7 @@ public void invalidTablesInTheList() throws Exception {
|
|
|
|
|
final int expectedRecordCount = ROW_COUNT;
|
|
|
|
|
final Map<Integer, Integer> dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount);
|
|
|
|
|
for (int i = 0; i < expectedRecordCount; i++) {
|
|
|
|
|
Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i));
|
|
|
|
|
Assertions.assertThat(dbChanges).contains(Assertions.entry(i + 1, i));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -410,7 +409,7 @@ public void inserts() throws Exception {
|
|
|
|
|
final int expectedRecordCount = ROW_COUNT * 2;
|
|
|
|
|
final Map<Integer, Integer> dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount);
|
|
|
|
|
for (int i = 0; i < expectedRecordCount; i++) {
|
|
|
|
|
Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i));
|
|
|
|
|
Assertions.assertThat(dbChanges).contains(Assertions.entry(i + 1, i));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -442,7 +441,7 @@ public void updates() throws Exception {
|
|
|
|
|
final Map<Integer, Integer> dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount,
|
|
|
|
|
x -> x.getValue() >= 2000, null);
|
|
|
|
|
for (int i = 0; i < expectedRecordCount; i++) {
|
|
|
|
|
Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i + 2000));
|
|
|
|
|
Assertions.assertThat(dbChanges).contains(Assertions.entry(i + 1, i + 2000));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -491,7 +490,7 @@ public void updatesWithRestart() throws Exception {
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
for (int i = 0; i < expectedRecordCount; i++) {
|
|
|
|
|
Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i + 2000));
|
|
|
|
|
Assertions.assertThat(dbChanges).contains(Assertions.entry(i + 1, i + 2000));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -512,7 +511,7 @@ public void updatesLargeChunk() throws Exception {
|
|
|
|
|
final Map<Integer, Integer> dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount,
|
|
|
|
|
x -> x.getValue() >= 2000, null);
|
|
|
|
|
for (int i = 0; i < expectedRecordCount; i++) {
|
|
|
|
|
Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i + 2000));
|
|
|
|
|
Assertions.assertThat(dbChanges).contains(Assertions.entry(i + 1, i + 2000));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -546,7 +545,7 @@ public void snapshotOnlyWithRestart() throws Exception {
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
for (int i = 0; i < expectedRecordCount; i++) {
|
|
|
|
|
Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i));
|
|
|
|
|
Assertions.assertThat(dbChanges).contains(Assertions.entry(i + 1, i));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -575,7 +574,7 @@ public void snapshotPreceededBySchemaChange() throws Exception {
|
|
|
|
|
final int expectedRecordCount = ROW_COUNT;
|
|
|
|
|
Map<Integer, Integer> dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount);
|
|
|
|
|
for (int i = 0; i < expectedRecordCount; i++) {
|
|
|
|
|
Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i));
|
|
|
|
|
Assertions.assertThat(dbChanges).contains(Assertions.entry(i + 1, i));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Initiate a schema change to the table immediately before the adhoc-snapshot
|
|
|
|
@ -590,7 +589,7 @@ public void snapshotPreceededBySchemaChange() throws Exception {
|
|
|
|
|
|
|
|
|
|
dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount);
|
|
|
|
|
for (int i = 0; i < expectedRecordCount; i++) {
|
|
|
|
|
Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i));
|
|
|
|
|
Assertions.assertThat(dbChanges).contains(Assertions.entry(i + 1, i));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -603,7 +602,7 @@ public void snapshotWithRegexDataCollections() throws Exception {
|
|
|
|
|
final int expectedRecordCount = ROW_COUNT;
|
|
|
|
|
final Map<Integer, Integer> dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount);
|
|
|
|
|
for (int i = 0; i < expectedRecordCount; i++) {
|
|
|
|
|
Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i));
|
|
|
|
|
Assertions.assertThat(dbChanges).contains(Assertions.entry(i + 1, i));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -655,7 +654,7 @@ public void stopCurrentIncrementalSnapshotWithoutCollectionsAndTakeNewNewIncreme
|
|
|
|
|
final int expectedRecordCount = ROW_COUNT * 2;
|
|
|
|
|
final Map<Integer, Integer> dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount);
|
|
|
|
|
for (int i = 0; i < expectedRecordCount; i++) {
|
|
|
|
|
Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i));
|
|
|
|
|
Assertions.assertThat(dbChanges).contains(Assertions.entry(i + 1, i));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -707,7 +706,7 @@ public void stopCurrentIncrementalSnapshotWithAllCollectionsAndTakeNewNewIncreme
|
|
|
|
|
final int expectedRecordCount = ROW_COUNT * 2;
|
|
|
|
|
final Map<Integer, Integer> dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount);
|
|
|
|
|
for (int i = 0; i < expectedRecordCount; i++) {
|
|
|
|
|
Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i));
|
|
|
|
|
Assertions.assertThat(dbChanges).contains(Assertions.entry(i + 1, i));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -758,7 +757,7 @@ public void removeNotYetCapturedCollectionFromInProgressIncrementalSnapshot() th
|
|
|
|
|
final int expectedRecordCount = ROW_COUNT * 2;
|
|
|
|
|
final Map<Integer, Integer> dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount, topicToConsume);
|
|
|
|
|
for (int i = 0; i < expectedRecordCount; i++) {
|
|
|
|
|
Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i));
|
|
|
|
|
Assertions.assertThat(dbChanges).contains(Assertions.entry(i + 1, i));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -809,7 +808,7 @@ public void removeStartedCapturedCollectionFromInProgressIncrementalSnapshot() t
|
|
|
|
|
final int expectedRecordCount = ROW_COUNT * 2;
|
|
|
|
|
final Map<Integer, Integer> dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount, topicToConsume);
|
|
|
|
|
for (int i = 0; i < expectedRecordCount; i++) {
|
|
|
|
|
Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i));
|
|
|
|
|
Assertions.assertThat(dbChanges).contains(Assertions.entry(i + 1, i));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -837,7 +836,7 @@ public void shouldSnapshotNewlyAddedTableToIncludeListAfterRestart() throws Exce
|
|
|
|
|
final int expectedRecordCount = ROW_COUNT;
|
|
|
|
|
final Map<Integer, Integer> dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount);
|
|
|
|
|
for (int i = 0; i < expectedRecordCount; i++) {
|
|
|
|
|
Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i));
|
|
|
|
|
Assertions.assertThat(dbChanges).contains(Assertions.entry(i + 1, i));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
stopConnector();
|
|
|
|
@ -878,7 +877,7 @@ public void testPauseDuringSnapshot() throws Exception {
|
|
|
|
|
if ((expectedRecordCount - beforeResume) > 0) {
|
|
|
|
|
Map<Integer, Integer> dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount - beforeResume);
|
|
|
|
|
for (int i = beforeResume + 1; i < expectedRecordCount; i++) {
|
|
|
|
|
Assertions.assertThat(dbChanges).includes(MapAssert.entry(i + 1, i));
|
|
|
|
|
Assertions.assertThat(dbChanges).contains(Assertions.entry(i + 1, i));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|