DBZ-4360 Correctly parse default values with trailing spaces
This commit is contained in:
parent
d7a9784c82
commit
707dac863c
@ -63,7 +63,7 @@ public Optional<Object> parseDefaultValue(Column column, String defaultValue) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Object rawDefaultValue = mapper.parse(column, defaultValue);
|
Object rawDefaultValue = mapper.parse(column, defaultValue != null ? defaultValue.trim() : defaultValue);
|
||||||
Object convertedDefaultValue = convertDefaultValue(rawDefaultValue, column);
|
Object convertedDefaultValue = convertDefaultValue(rawDefaultValue, column);
|
||||||
if (convertedDefaultValue instanceof Struct) {
|
if (convertedDefaultValue instanceof Struct) {
|
||||||
// Workaround for KAFKA-12694
|
// Workaround for KAFKA-12694
|
||||||
|
@ -41,6 +41,7 @@ public class OracleDefaultValueIT extends AbstractConnectorTest {
|
|||||||
|
|
||||||
private OracleConnection connection;
|
private OracleConnection connection;
|
||||||
private Consumer<Configuration.Builder> configUpdater;
|
private Consumer<Configuration.Builder> configUpdater;
|
||||||
|
private Configuration config;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void before() throws Exception {
|
public void before() throws Exception {
|
||||||
@ -313,7 +314,29 @@ public void shouldHandleDefaultValueFromSequencesAsNoDefault() throws Exception
|
|||||||
AssertionType.FIELD_NO_DEFAULT));
|
AssertionType.FIELD_NO_DEFAULT));
|
||||||
|
|
||||||
shouldHandleDefaultValuesCommon(columnDefinitions);
|
shouldHandleDefaultValuesCommon(columnDefinitions);
|
||||||
assertThat(logInterceptor.countOccurrences("Cannot parse column default value")).isEqualTo(4);
|
assertThat(logInterceptor.countOccurrences("Cannot parse column default value")).isEqualTo(6);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@FixFor("DBZ-4360")
|
||||||
|
public void shouldHandleDefaultValuesWhereSqlMayContainsTrailingSpaces() throws Exception {
|
||||||
|
LogInterceptor logInterceptor = new LogInterceptor();
|
||||||
|
List<ColumnDefinition> columnDefinitions = Arrays.asList(
|
||||||
|
new ColumnDefinition("val_num", "number(15)",
|
||||||
|
"null ", "null ",
|
||||||
|
null, null,
|
||||||
|
AssertionType.FIELD_DEFAULT_EQUAL),
|
||||||
|
new ColumnDefinition("val_num2", "number(15)",
|
||||||
|
"2 ", "3 ",
|
||||||
|
2L, 3L,
|
||||||
|
AssertionType.FIELD_DEFAULT_EQUAL),
|
||||||
|
new ColumnDefinition("val_char", "char(3)",
|
||||||
|
"'No' ", "'NO' ",
|
||||||
|
"No ", "NO ",
|
||||||
|
AssertionType.FIELD_DEFAULT_EQUAL));
|
||||||
|
|
||||||
|
shouldHandleDefaultValuesCommon(columnDefinitions);
|
||||||
|
assertThat(logInterceptor.countOccurrences("Cannot parse column default value")).isEqualTo(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
private long getOracleIntervalYearMonth(int years, int month) {
|
private long getOracleIntervalYearMonth(int years, int month) {
|
||||||
@ -343,6 +366,20 @@ private void shouldHandleDefaultValuesCommon(List<ColumnDefinition> columnDefini
|
|||||||
testDefaultValuesCreateTableAndSnapshot(columnDefinitions);
|
testDefaultValuesCreateTableAndSnapshot(columnDefinitions);
|
||||||
testDefaultValuesAlterTableModifyExisting(columnDefinitions);
|
testDefaultValuesAlterTableModifyExisting(columnDefinitions);
|
||||||
testDefaultValuesAlterTableAdd(columnDefinitions);
|
testDefaultValuesAlterTableAdd(columnDefinitions);
|
||||||
|
TestDefaultValuesByRestartAndLoadingHistoryTopic();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Restarts the connector and verifies when the database history topic is loaded that we can parse
|
||||||
|
* all the loaded history statements without failures.
|
||||||
|
*/
|
||||||
|
private void TestDefaultValuesByRestartAndLoadingHistoryTopic() throws Exception {
|
||||||
|
stopConnector();
|
||||||
|
|
||||||
|
start(OracleConnector.class, config);
|
||||||
|
assertConnectorIsRunning();
|
||||||
|
|
||||||
|
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -389,7 +426,8 @@ private void testDefaultValuesCreateTableAndSnapshot(List<ColumnDefinition> colu
|
|||||||
// Insert snapshot record
|
// Insert snapshot record
|
||||||
connection.execute("INSERT INTO default_value_test (id) values (1)");
|
connection.execute("INSERT INTO default_value_test (id) values (1)");
|
||||||
|
|
||||||
Configuration config = TestHelper.defaultConfig()
|
// store config so it can be used by other methods
|
||||||
|
config = TestHelper.defaultConfig()
|
||||||
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DEFAULT_VALUE_TEST")
|
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DEFAULT_VALUE_TEST")
|
||||||
.apply(configUpdater)
|
.apply(configUpdater)
|
||||||
.build();
|
.build();
|
||||||
|
Loading…
Reference in New Issue
Block a user