DBZ-1162 fix to ensure hstore snapshots behave the same as streams

This commit is contained in:
Josh Stanfield 2019-02-25 10:45:25 -07:00 committed by Jiri Pechanec
parent 50a7c568fa
commit e4b7b90637
8 changed files with 87 additions and 4 deletions

View File

@ -432,18 +432,32 @@ private String asHstoreString(byte[] data) {
private Object convertHstoreToJsonString(Column column, Field fieldDefn, Object data){
return convertValue(column, fieldDefn, data, "{}", (r) -> {
logger.trace("in ANON: value from data object: *** {} ***", data.toString());
logger.trace("in ANON: object type is: *** {} ***", data.getClass());
if (data instanceof String) {
r.deliver(changePlainStringRepresentationToJsonStringRepresentation(((String) data)));
}
else if (data instanceof byte[]) {
r.deliver(changePlainStringRepresentationToJsonStringRepresentation(asHstoreString((byte[]) data)));
}
else if (data instanceof java.util.HashMap) {
r.deliver(convertMapToJsonStringRepresentation((Map<String, String>)data));
}
});
}
private String changePlainStringRepresentationToJsonStringRepresentation(String text){
logger.trace("text value is: " + text);
try {
Map<String, String> map = HStoreConverter.fromString(text);
return convertMapToJsonStringRepresentation(map);
}
catch(Exception e) {
throw new RuntimeException("Couldn't serialize hstore value into JSON: " + text, e);
}
}
private String convertMapToJsonStringRepresentation(Map<String, String> map) {
StringWriter writer = new StringWriter();
try (JsonGenerator jsonGenerator = jsonFactory.createGenerator(writer)) {
jsonGenerator.writeStartObject();
@ -455,7 +469,7 @@ private String changePlainStringRepresentationToJsonStringRepresentation(String
return writer.getBuffer().toString();
}
catch(Exception e) {
throw new RuntimeException("Couldn't serialize hstore value into JSON: " + text, e);
throw new RuntimeException("Couldn't serialize hstore value into JSON: " + map.toString(), e);
}
}

View File

@ -331,6 +331,10 @@ private Object valueForColumn(ResultSet rs, int colIdx, ResultSetMetaData metaDa
final String columnTypeName = metaData.getColumnTypeName(colIdx);
final PostgresType type = taskContext.schema().getTypeRegistry().get(columnTypeName);
logger.trace("Type of incoming data is: " + String.valueOf(type.getOid()));
logger.trace("ColumnTypeName is: " + columnTypeName);
logger.trace("Type toString: " + type.toString());
if (type.isArrayType()) {
Array array = rs.getArray(colIdx);
@ -357,7 +361,11 @@ private Object valueForColumn(ResultSet rs, int colIdx, ResultSetMetaData metaDa
return value.isPresent() ? value.get() : new SpecialValueDecimal(rs.getBigDecimal(colIdx));
default:
return rs.getObject(colIdx);
Object x = rs.getObject(colIdx);
if(x != null) {
logger.trace("rs getobject returns class: {}; rs getObject toString is: {}", x.getClass(), x.toString());
}
return x;
}
}
catch (SQLException e) {
@ -373,11 +381,15 @@ protected void generateReadRecord(TableId tableId, Object[] rowData) {
if (rowData.length == 0) {
return;
}
logger.trace("tableId value is: {}", tableId.toString());
TableSchema tableSchema = schema().schemaFor(tableId);
assert tableSchema != null;
Object key = tableSchema.keyFromColumnData(rowData);
Struct value = tableSchema.valueFromColumnData(rowData);
//note this is different than implementation in Streams producer. See DBZ-1163
if (key == null || value == null) {
logger.trace("key: {}; value: {}; One is null", String.valueOf(key), String.valueOf(value) );
return;
}
Schema keySchema = tableSchema.keySchema();

View File

@ -289,6 +289,7 @@ protected void generateCreateRecord(TableId tableId, Object[] rowData, BlockingC
TableSchema tableSchema = schema().schemaFor(tableId);
assert tableSchema != null;
Object key = tableSchema.keyFromColumnData(rowData);
logger.trace("key value is: {}", String.valueOf(key));
Struct value = tableSchema.valueFromColumnData(rowData);
if (value == null) {
logger.warn("no values found for table '{}' from create message at '{}'; skipping record" , tableId, sourceInfo);

View File

@ -424,4 +424,40 @@ public void shouldGenerateSnapshotsForPartitionedTables() throws Exception {
assertSourceInfo(record);
}
}
@Test
@FixFor("DBZ-1162")
public void shouldGenerateSnapshotsForHstores() throws Exception {
// PostGIS must not be used
TestHelper.dropAllSchemas();
TestHelper.executeDDL("postgres_create_tables.ddl");
PostgresConnectorConfig config = new PostgresConnectorConfig(
TestHelper.defaultConfig()
.with(PostgresConnectorConfig.HSTORE_HANDLING_MODE,PostgresConnectorConfig.HStoreHandlingMode.JSON)
.build());
TopicSelector<TableId> selector = PostgresTopicSelector.create(config);
context = new PostgresTaskContext(
config,
TestHelper.getSchema(config),
selector
);
snapshotProducer = new RecordsSnapshotProducer(context, new SourceInfo(TestHelper.TEST_SERVER, TestHelper.TEST_DATABASE), false);
TestConsumer consumer = testConsumer(1, "public", "Quoted_\"");
//insert data for each of different supported types
TestHelper.execute(INSERT_HSTORE_TYPE_STMT);
//then start the producer and validate all records are there
snapshotProducer.start(consumer, e -> {});
consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
final Map<String, List<SchemaAndValueField>> expectedValuesByTopicName = Collect.hashMapOf("public.hstore_table", schemaAndValueFieldForJsonEncodedHStoreType());
consumer.process(record -> assertReadRecord(record, expectedValuesByTopicName));
}
}

View File

@ -12,4 +12,4 @@ log4j.logger.io.debezium=INFO
log4j.logger.io.debezium.embedded.EmbeddedEngine$EmbeddedConfig=WARN
#log4j.logger.io.debezium.embedded.EmbeddedEngine=DEBUG
log4j.logger.io.debezium.core=DEBUG
log4j.logger.io.debezium.connector.sqlserver=TRACE
log4j.logger.io.debezium.connector.sqlserver=DEBUG

View File

@ -1180,9 +1180,12 @@ protected Object convertValue(Column column, Field fieldDefn, Object data, Objec
final Object schemaDefault = fieldDefn.schema().defaultValue();
return schemaDefault != null ? schemaDefault : fallback;
}
logger.trace("Value from data object: *** {} ***", data.toString());
final ResultReceiver r = ResultReceiver.create();
callback.convert(r);
logger.trace("Callback toString: {}", callback.toString());
logger.trace("Value from ResultReceiver: {}", r.toString());
return r.hasReceived() ? r.get() : handleUnknownData(column, fieldDefn, data);
}
}

View File

@ -16,6 +16,10 @@
import io.debezium.data.SchemaUtil;
import io.debezium.schema.DataCollectionSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Defines the Kafka Connect {@link Schema} functionality associated with a given {@link Table table definition}, and which can
* be used to send rows of data that match the table definition to Kafka Connect.
@ -49,6 +53,8 @@
@Immutable
public class TableSchema implements DataCollectionSchema {
private static final Logger logger = LoggerFactory.getLogger(TableSchema.class);
private final TableId id;
private final Schema keySchema;
private final Envelope envelopeSchema;
@ -121,6 +127,8 @@ public Envelope getEnvelopeSchema() {
* @return the key, or null if the {@code columnData}
*/
public Object keyFromColumnData(Object[] columnData) {
logger.trace("columnData from current stack: {}", String.valueOf(columnData));
logger.trace("key from column data stack: " , new Throwable());
return columnData == null ? null : keyGenerator.apply(columnData);
}

View File

@ -208,7 +208,16 @@ protected Function<Object[], Struct> createValueGenerator(Schema schema, TableId
Struct result = new Struct(schema);
for (int i = 0; i != numFields; ++i) {
Object value = row[recordIndexes[i]];
ValueConverter converter = converters[i];
if (converter != null) {
LOGGER.trace("converter for value object: *** {} ***", converter.toString());
}
else {
LOGGER.trace("converter is null...");
}
if (converter != null) {
try {
value = converter.convert(value);