DBZ-824 Fix tests with invalid field names
This commit is contained in:
parent
6205f1d6ea
commit
1de53607dc
@ -68,7 +68,7 @@ public void shouldParseWhitespaceChars() throws Exception {
|
||||
start(SqlServerConnector.class, config);
|
||||
assertConnectorIsRunning();
|
||||
|
||||
SourceRecords actualRecords = consumeRecordsByTopic(2);
|
||||
SourceRecords actualRecords = consumeRecordsByTopic(2, false);
|
||||
Assertions.assertThat(actualRecords.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset")).hasSize(1);
|
||||
Assertions.assertThat(actualRecords.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Prop")).hasSize(1);
|
||||
|
||||
@ -119,6 +119,7 @@ public void shouldParseSpecialChars() throws Exception {
|
||||
final Configuration config = TestHelper.defaultConfig()
|
||||
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
|
||||
.with(SqlServerConnectorConfig.TABLE_WHITELIST, "dbo\\.UAT WAG CZ\\$Fixed Asset.*")
|
||||
.with(SqlServerConnectorConfig.SANITIZE_FIELD_NAMES, true)
|
||||
.build();
|
||||
|
||||
connection.execute(
|
||||
@ -138,7 +139,7 @@ public void shouldParseSpecialChars() throws Exception {
|
||||
.optional()
|
||||
.name("server1.dbo.UAT_WAG_CZ_Fixed_Asset.Value")
|
||||
.field("id", Schema.INT32_SCHEMA)
|
||||
.field("my col$a", Schema.OPTIONAL_STRING_SCHEMA)
|
||||
.field("my_col_a", Schema.OPTIONAL_STRING_SCHEMA)
|
||||
.build());
|
||||
assertSchemaMatchesStruct(
|
||||
(Struct) record.key(),
|
||||
@ -158,7 +159,7 @@ record = records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset").get(0);
|
||||
.optional()
|
||||
.name("server1.dbo.UAT_WAG_CZ_Fixed_Asset.Value")
|
||||
.field("id", Schema.INT32_SCHEMA)
|
||||
.field("my col$a", Schema.OPTIONAL_STRING_SCHEMA)
|
||||
.field("my_col_a", Schema.OPTIONAL_STRING_SCHEMA)
|
||||
.build());
|
||||
assertSchemaMatchesStruct(
|
||||
(Struct) record.key(),
|
||||
@ -181,7 +182,7 @@ record = records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset_Two").get(0
|
||||
.optional()
|
||||
.name("server1.dbo.UAT_WAG_CZ_Fixed_Asset_Two.Value")
|
||||
.field("id", Schema.INT32_SCHEMA)
|
||||
.field("my col$", Schema.OPTIONAL_STRING_SCHEMA)
|
||||
.field("my_col_", Schema.OPTIONAL_STRING_SCHEMA)
|
||||
.field("Description", Schema.STRING_SCHEMA)
|
||||
.build());
|
||||
assertSchemaMatchesStruct(
|
||||
@ -202,7 +203,7 @@ record = records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset_Two").get(0
|
||||
.optional()
|
||||
.name("server1.dbo.UAT_WAG_CZ_Fixed_Asset_Two.Value")
|
||||
.field("id", Schema.INT32_SCHEMA)
|
||||
.field("my col$", Schema.OPTIONAL_STRING_SCHEMA)
|
||||
.field("my_col_", Schema.OPTIONAL_STRING_SCHEMA)
|
||||
.field("Description", Schema.STRING_SCHEMA)
|
||||
.build());
|
||||
assertSchemaMatchesStruct(
|
||||
@ -211,7 +212,7 @@ record = records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset_Two").get(0
|
||||
.optional()
|
||||
.name("server1.dbo.UAT_WAG_CZ_Fixed_Asset_Two.Value")
|
||||
.field("id", Schema.INT32_SCHEMA)
|
||||
.field("my col$", Schema.OPTIONAL_STRING_SCHEMA)
|
||||
.field("my_col_", Schema.OPTIONAL_STRING_SCHEMA)
|
||||
.field("Description", Schema.STRING_SCHEMA)
|
||||
.build());
|
||||
Assertions.assertThat(((Struct) record.value()).getStruct("after").getString("Description")).isEqualTo("c1");
|
||||
@ -232,7 +233,7 @@ record = records.recordsForTopic("server1.dbo.UAT_WAG_CZ_Fixed_Asset").get(0);
|
||||
.optional()
|
||||
.name("server1.dbo.UAT_WAG_CZ_Fixed_Asset.Value")
|
||||
.field("id", Schema.INT32_SCHEMA)
|
||||
.field("my col$a", Schema.OPTIONAL_STRING_SCHEMA)
|
||||
.field("my_col_a", Schema.OPTIONAL_STRING_SCHEMA)
|
||||
.build());
|
||||
assertSchemaMatchesStruct(
|
||||
(Struct) record.key(),
|
||||
|
@ -465,6 +465,20 @@ protected SourceRecords consumeRecordsByTopic(int numRecords) throws Interrupted
|
||||
return records;
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to consume and capture exactly the specified number of records from the connector.
|
||||
*
|
||||
* @param numRecords the number of records that should be consumed
|
||||
* @param true if the record serialization should be tested
|
||||
* @return the collector into which the records were captured; never null
|
||||
* @throws InterruptedException if the thread was interrupted while waiting for a record to be returned
|
||||
*/
|
||||
protected SourceRecords consumeRecordsByTopic(int numRecords, boolean assertRecords) throws InterruptedException {
|
||||
SourceRecords records = new SourceRecords();
|
||||
consumeRecords(numRecords, 3, records::add, assertRecords);
|
||||
return records;
|
||||
}
|
||||
|
||||
protected class SourceRecords {
|
||||
private final List<SourceRecord> records = new ArrayList<>();
|
||||
private final Map<String, List<SourceRecord>> recordsByTopic = new HashMap<>();
|
||||
|
Loading…
Reference in New Issue
Block a user