DBZ-1763 Add unit test for schema name.

This commit is contained in:
Bingqin Zhou 2020-02-03 16:49:19 -08:00 committed by Jiri Pechanec
parent 892463dd95
commit d8f330ebbd

View File

@ -19,24 +19,48 @@ public class SchemaProcessorTest extends EmbeddedCassandraConnectorTestBase {
public void testProcess() throws Exception {
CassandraConnectorContext context = generateTaskContext();
SchemaProcessor schemaProcessor = new SchemaProcessor(context);
SchemaHolder.KeyValueSchema keyValueSchema;
String namespacePrefix = "io.debezium.connector.cassandra" + "."
+ EmbeddedCassandraConnectorTestBase.TEST_KAFKA_TOPIC_PREFIX + "."
+ EmbeddedCassandraConnectorTestBase.TEST_KEYSPACE;
String expectedKeySchemaName;
String expectedValueSchemaName;
assertEquals(0, context.getSchemaHolder().getCdcEnabledTableMetadataSet().size());
context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("table1") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = false;");
schemaProcessor.process();
assertEquals(0, context.getSchemaHolder().getCdcEnabledTableMetadataSet().size());
assertNull(context.getSchemaHolder().getOrUpdateKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1")));
keyValueSchema = context.getSchemaHolder().getOrUpdateKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1"));
assertNull(keyValueSchema);
context.getCassandraClient().execute("ALTER TABLE " + keyspaceTable("table1") + " WITH cdc = true;");
schemaProcessor.process();
assertEquals(1, context.getSchemaHolder().getCdcEnabledTableMetadataSet().size());
assertNotNull(context.getSchemaHolder().getOrUpdateKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1")));
keyValueSchema = context.getSchemaHolder().getOrUpdateKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1"));
assertNotNull(keyValueSchema);
expectedKeySchemaName = namespacePrefix + "." + "table1" + "." + "Key";
assertEquals(expectedKeySchemaName, keyValueSchema.keySchema().name());
expectedValueSchemaName = namespacePrefix + "." + "table1" + "." + "Value";
assertEquals(expectedValueSchemaName, keyValueSchema.valueSchema().name());
context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("table2") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = true;");
schemaProcessor.process();
assertEquals(2, context.getSchemaHolder().getCdcEnabledTableMetadataSet().size());
assertNotNull(context.getSchemaHolder().getOrUpdateKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1")));
assertNotNull(context.getSchemaHolder().getOrUpdateKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table2")));
keyValueSchema = context.getSchemaHolder().getOrUpdateKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table1"));
assertNotNull(keyValueSchema);
expectedKeySchemaName = namespacePrefix + "." + "table1" + "." + "Key";
assertEquals(expectedKeySchemaName, keyValueSchema.keySchema().name());
expectedValueSchemaName = namespacePrefix + "." + "table1" + "." + "Value";
assertEquals(expectedValueSchemaName, keyValueSchema.valueSchema().name());
keyValueSchema = context.getSchemaHolder().getOrUpdateKeyValueSchema(new KeyspaceTable(TEST_KEYSPACE, "table2"));
assertNotNull(keyValueSchema);
expectedKeySchemaName = namespacePrefix + "." + "table2" + "." + "Key";
assertEquals(expectedKeySchemaName, keyValueSchema.keySchema().name());
expectedValueSchemaName = namespacePrefix + "." + "table2" + "." + "Value";
assertEquals(expectedValueSchemaName, keyValueSchema.valueSchema().name());
context.getCassandraClient().execute("ALTER TABLE " + keyspaceTable("table2") + " ADD c text");
schemaProcessor.process();