DBZ-5489 add test

This commit is contained in:
tjwornjs 2022-08-08 13:06:32 +09:00 committed by Jiri Pechanec
parent ca009d8492
commit 484a4390aa

View File

@ -11,6 +11,8 @@
import java.time.Instant;
import java.util.Set;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -376,6 +378,38 @@ public void shouldNotStoreNonCapturedTable() {
assertTableExcluded("non_captured.nct");
}
@Test
public void addCommentToSchemaTest() {
final Configuration config = DATABASE.defaultConfig()
.with(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, false)
.with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, "captured")
.with(DatabaseHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true)
.with("include.schema.comments", true)
.build();
mysql = getSchema(config);
mysql.initializeStorage();
final MySqlPartition partition = initializePartition(connectorConfig, config);
final MySqlOffsetContext offset = initializeOffset(connectorConfig);
// Set up the server ...
offset.setBinlogStartPoint("binlog.001", 400);
mysql.parseStreamingDdl(partition, IoUtil.readClassPathResource("ddl/mysql-schema-captured.ddl"), "db1",
offset, Instant.now()).forEach(x -> mysql.applySchemaChange(x));
mysql.close();
assertTableSchemaComments("captured.ct", "id", "");
assertTableSchemaComments("captured.ct", "code", "order code");
}
protected void assertTableSchemaComments(String tableName, String column, String comments){
TableId tableId = TableId.parse(tableName);
TableSchema tableSchema = mysql.schemaFor(tableId);
Schema valueSchema = tableSchema.valueSchema();
Field columnField = valueSchema.field(column);
assertThat(columnField.schema().doc()).isEqualTo(comments);
}
protected void assertTableIncluded(String fullyQualifiedTableName) {
TableId tableId = TableId.parse(fullyQualifiedTableName);
TableSchema tableSchema = mysql.schemaFor(tableId);