Merge pull request #203 from rhauch/dbz-195
DBZ-195 Added tests to try to replicate a reported issue
This commit is contained in:
commit
db1aacc5f9
@ -249,6 +249,11 @@ protected void pollComplete(List<SourceRecord> batch) {
|
||||
* @throws InterruptedException if interrupted while waiting for the queue to have room for this record
|
||||
*/
|
||||
protected void enqueueRecord(SourceRecord record) throws InterruptedException {
|
||||
if (record != null) this.records.put(record);
|
||||
if (record != null) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Enqueuing source record: {}", record);
|
||||
}
|
||||
this.records.put(record);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -301,6 +301,17 @@ END;
|
||||
$$
|
||||
DELIMITER ;
|
||||
|
||||
-- DBZ-195 handle numeric values
|
||||
CREATE TABLE dbz_195_numvalues (
|
||||
id int auto_increment NOT NULL,
|
||||
`search_version_read` int(11) NOT NULL DEFAULT '0', -- (11) is the display width
|
||||
PRIMARY KEY (`id`)
|
||||
) ENGINE=InnoDB AUTO_INCREMENT=4972 DEFAULT CHARSET=utf8;
|
||||
|
||||
INSERT INTO dbz_195_numvalues VALUES (default,0);
|
||||
INSERT INTO dbz_195_numvalues VALUES (default,-2147483648);
|
||||
INSERT INTO dbz_195_numvalues VALUES (default,2147483647);
|
||||
|
||||
-- ----------------------------------------------------------------------------------------------------------------
|
||||
-- DATABASE: json_test
|
||||
-- ----------------------------------------------------------------------------------------------------------------
|
||||
|
@ -261,6 +261,7 @@ public void shouldValidateAcceptableConfiguration() {
|
||||
"regression_test.dbz_123_bitvaluetest",
|
||||
"regression_test.dbz_104_customers",
|
||||
"regression_test.dbz_147_decimalvalues",
|
||||
"regression_test.dbz_195_numvalues",
|
||||
"json_test.dbz_126_jsontable");
|
||||
|
||||
// Now set the whitelist to two databases ...
|
||||
|
@ -504,8 +504,8 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio
|
||||
// Consume all of the events due to startup and initialization of the database
|
||||
// ---------------------------------------------------------------------------------------------------------------
|
||||
// Testing.Debug.enable();
|
||||
int numTables = 9;
|
||||
int numDataRecords = 16;
|
||||
int numTables = 10;
|
||||
int numDataRecords = 19;
|
||||
int numDdlRecords = numTables * 2 + 3; // for each table (1 drop + 1 create) + for each db (1 create + 1 drop + 1 use)
|
||||
int numSetVariables = 1;
|
||||
SourceRecords records = consumeRecordsByTopic(numDdlRecords + numSetVariables + numDataRecords);
|
||||
@ -521,6 +521,7 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio
|
||||
assertThat(records.recordsForTopic("regression.regression_test.dbz_123_bitvaluetest").size()).isEqualTo(2);
|
||||
assertThat(records.recordsForTopic("regression.regression_test.dbz_104_customers").size()).isEqualTo(4);
|
||||
assertThat(records.recordsForTopic("regression.regression_test.dbz_147_decimalvalues").size()).isEqualTo(1);
|
||||
assertThat(records.recordsForTopic("regression.regression_test.dbz_195_numvalues").size()).isEqualTo(3);
|
||||
assertThat(records.topics().size()).isEqualTo(numTables + 1);
|
||||
assertThat(records.databaseNames().size()).isEqualTo(2);
|
||||
assertThat(records.databaseNames()).containsOnly("regression_test", "");
|
||||
@ -646,6 +647,18 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio
|
||||
assertThat(decimalValue).isInstanceOf(BigDecimal.class);
|
||||
BigDecimal bigValue = (BigDecimal) decimalValue;
|
||||
assertThat(bigValue.doubleValue()).isEqualTo(12345.67, Delta.delta(0.01));
|
||||
} else if (record.topic().endsWith("dbz_195_numvalues")) {
|
||||
Struct after = value.getStruct(Envelope.FieldName.AFTER);
|
||||
Object searchVersion = after.get("search_version_read");
|
||||
assertThat(searchVersion).isInstanceOf(Integer.class);
|
||||
Integer intValue = (Integer) searchVersion;
|
||||
if (intValue.intValue() < 0) {
|
||||
assertThat(intValue.intValue()).isEqualTo(-2147483648);
|
||||
} else if (intValue.intValue() > 0) {
|
||||
assertThat(intValue.intValue()).isEqualTo(2147483647);
|
||||
} else {
|
||||
assertThat(intValue.intValue()).isEqualTo(0);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -189,8 +189,8 @@ protected Function<Object[], Object> createKeyGenerator(Schema schema, TableId c
|
||||
result.put(fields[i], value);
|
||||
} catch (DataException e) {
|
||||
Column col = columns.get(i);
|
||||
LOGGER.error("Failed to properly convert key value for '" + columnSetName + "." + col.name() + "' of type "
|
||||
+ col.typeName() + ":", e);
|
||||
LOGGER.error("Failed to properly convert key value for '{}.{}' of type {} for row {}:",
|
||||
columnSetName, col.name(), col.typeName(), row, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -231,12 +231,12 @@ protected Function<Object[], Struct> createValueGenerator(Schema schema, TableId
|
||||
result.put(fields[i], value);
|
||||
} catch (DataException|IllegalArgumentException e) {
|
||||
Column col = columns.get(i);
|
||||
LOGGER.error("Failed to properly convert data value for '" + tableId + "." + col.name() + "' of type "
|
||||
+ col.typeName() + ":", e);
|
||||
LOGGER.error("Failed to properly convert data value for '{}.{}' of type {} for row {}:",
|
||||
tableId, col.name(), col.typeName(), row, e);
|
||||
}
|
||||
} else if (traceMessage.getAndSet(false)) {
|
||||
Column col = columns.get(i);
|
||||
LOGGER.trace("Excluding '" + tableId + "." + col.name() + "' of type " + col.typeName());
|
||||
LOGGER.trace("Excluding '{}.{}' of type {}", tableId, col.name(), col.typeName());
|
||||
}
|
||||
}
|
||||
return result;
|
||||
|
Loading…
Reference in New Issue
Block a user