diff --git a/debezium-connector-cassandra/src/test/java/io/debezium/connector/cassandra/SnapshotProcessorTest.java b/debezium-connector-cassandra/src/test/java/io/debezium/connector/cassandra/SnapshotProcessorTest.java index d50878513..de95acca9 100644 --- a/debezium-connector-cassandra/src/test/java/io/debezium/connector/cassandra/SnapshotProcessorTest.java +++ b/debezium-connector-cassandra/src/test/java/io/debezium/connector/cassandra/SnapshotProcessorTest.java @@ -13,7 +13,9 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @@ -33,26 +35,37 @@ public void testSnapshotTable() throws Exception { int tableSize = 5; context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("cdc_table") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = true;"); + context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("cdc_table2") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = true;"); context.getSchemaHolder().refreshSchemas(); for (int i = 0; i < tableSize; i++) { context.getCassandraClient().execute("INSERT INTO " + keyspaceTable("cdc_table") + "(a, b) VALUES (?, ?)", i, String.valueOf(i)); + context.getCassandraClient().execute("INSERT INTO " + keyspaceTable("cdc_table2") + "(a, b) VALUES (?, ?)", i + 10, String.valueOf(i + 10)); } ChangeEventQueue queue = context.getQueue(); assertEquals(queue.totalCapacity(), queue.remainingCapacity()); snapshotProcessor.process(); - assertEquals(tableSize, queue.totalCapacity() - queue.remainingCapacity()); + assertEquals(2 * tableSize, queue.totalCapacity() - queue.remainingCapacity()); + final List table1 = new ArrayList<>(); + final List table2 = new ArrayList<>(); for (Event event : queue.poll()) { ChangeRecord record = (ChangeRecord) event; Assert.assertEquals(record.getEventType(), Event.EventType.CHANGE_EVENT); Assert.assertEquals(record.getOp(), Record.Operation.INSERT); assertEquals(record.getSource().cluster, DatabaseDescriptor.getClusterName()); assertTrue(record.getSource().snapshot); - assertEquals(record.getSource().keyspaceTable.name(), keyspaceTable("cdc_table")); + final String tableName = record.getSource().keyspaceTable.name(); + if (tableName.equals(keyspaceTable("cdc_table"))) { + table1.add(record); + } + else { + table2.add(record); + } Assert.assertEquals(record.getSource().offsetPosition, OffsetPosition.defaultOffsetPosition()); } - + assertEquals(tableSize, table1.size()); + assertEquals(tableSize, table2.size()); deleteTestKeyspaceTables(); deleteTestOffsets(context); context.cleanUp();