DBZ-1892 Added test

This commit is contained in:
Jiri Pechanec 2020-03-23 14:11:50 +01:00 committed by Gunnar Morling
parent 252beb0785
commit 5111149e5d

View File

@ -13,7 +13,9 @@
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -33,26 +35,37 @@ public void testSnapshotTable() throws Exception {
int tableSize = 5; int tableSize = 5;
context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("cdc_table") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = true;"); context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("cdc_table") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = true;");
context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("cdc_table2") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = true;");
context.getSchemaHolder().refreshSchemas(); context.getSchemaHolder().refreshSchemas();
for (int i = 0; i < tableSize; i++) { for (int i = 0; i < tableSize; i++) {
context.getCassandraClient().execute("INSERT INTO " + keyspaceTable("cdc_table") + "(a, b) VALUES (?, ?)", i, String.valueOf(i)); context.getCassandraClient().execute("INSERT INTO " + keyspaceTable("cdc_table") + "(a, b) VALUES (?, ?)", i, String.valueOf(i));
context.getCassandraClient().execute("INSERT INTO " + keyspaceTable("cdc_table2") + "(a, b) VALUES (?, ?)", i + 10, String.valueOf(i + 10));
} }
ChangeEventQueue<Event> queue = context.getQueue(); ChangeEventQueue<Event> queue = context.getQueue();
assertEquals(queue.totalCapacity(), queue.remainingCapacity()); assertEquals(queue.totalCapacity(), queue.remainingCapacity());
snapshotProcessor.process(); snapshotProcessor.process();
assertEquals(tableSize, queue.totalCapacity() - queue.remainingCapacity()); assertEquals(2 * tableSize, queue.totalCapacity() - queue.remainingCapacity());
final List<ChangeRecord> table1 = new ArrayList<>();
final List<ChangeRecord> table2 = new ArrayList<>();
for (Event event : queue.poll()) { for (Event event : queue.poll()) {
ChangeRecord record = (ChangeRecord) event; ChangeRecord record = (ChangeRecord) event;
Assert.assertEquals(record.getEventType(), Event.EventType.CHANGE_EVENT); Assert.assertEquals(record.getEventType(), Event.EventType.CHANGE_EVENT);
Assert.assertEquals(record.getOp(), Record.Operation.INSERT); Assert.assertEquals(record.getOp(), Record.Operation.INSERT);
assertEquals(record.getSource().cluster, DatabaseDescriptor.getClusterName()); assertEquals(record.getSource().cluster, DatabaseDescriptor.getClusterName());
assertTrue(record.getSource().snapshot); assertTrue(record.getSource().snapshot);
assertEquals(record.getSource().keyspaceTable.name(), keyspaceTable("cdc_table")); final String tableName = record.getSource().keyspaceTable.name();
if (tableName.equals(keyspaceTable("cdc_table"))) {
table1.add(record);
}
else {
table2.add(record);
}
Assert.assertEquals(record.getSource().offsetPosition, OffsetPosition.defaultOffsetPosition()); Assert.assertEquals(record.getSource().offsetPosition, OffsetPosition.defaultOffsetPosition());
} }
assertEquals(tableSize, table1.size());
assertEquals(tableSize, table2.size());
deleteTestKeyspaceTables(); deleteTestKeyspaceTables();
deleteTestOffsets(context); deleteTestOffsets(context);
context.cleanUp(); context.cleanUp();