DBZ-1368 Support toasted placeholder in pgoutput

This commit is contained in:
Jiri Pechanec 2019-09-13 12:25:31 +02:00 committed by Gunnar Morling
parent 442408be02
commit ab31147593
6 changed files with 77 additions and 17 deletions

View File

@ -144,9 +144,9 @@ private Object[] columnValues(List<ReplicationMessage.Column> columns, TableId t
// based on the replication message without toasted columns for now
List<ReplicationMessage.Column> columnsWithoutToasted = columns.stream().filter(Predicates.not(ReplicationMessage.Column::isToastedColumn)).collect(Collectors.toList());
// JSON does not deliver a list of all columns for REPLICA IDENTITY DEFAULT
Object[] values = new Object[columnsWithoutToasted.size() < schemaColumns.size() ? schemaColumns.size() : columnsWithoutToasted.size()];
Object[] values = new Object[schemaColumns.size()];
for (ReplicationMessage.Column column: columnsWithoutToasted) {
for (ReplicationMessage.Column column: columns) {
//DBZ-298 Quoted column names will be sent like that in messages, but stored unquoted in the column names
final String columnName = Strings.unquoteIdentifierPart(column.getName());
final Column tableColumn = table.columnWithName(columnName);

View File

@ -42,6 +42,7 @@
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.util.Strings;
/**
* The configuration properties for the {@link PostgresConnector}
@ -747,6 +748,16 @@ public static SchemaRefreshMode parse(String value) {
"The default is set to 0 ms, which disables tracking xmin.")
.withValidation(Field::isNonNegativeLong);
public static final Field TOASTED_VALUE_PLACEHOLDER = Field.create("toasted.value.placeholder")
.withDisplayName("Toasted value placeholder")
.withType(Type.STRING)
.withWidth(Width.MEDIUM)
.withDefault("__DEBEZIUM_TOASTED_VALUE__")
.withImportance(Importance.MEDIUM)
.withDescription("Specify the constant that will be provided by Debezium to indicate that " +
"the original value is a toasted value not provided by the database." +
"If starts with 'hex:' prefix it is expected that the rest of the string repesents hexadecimally encoded octets.");
/**
* The set of {@link Field}s defined as part of this configuration.
*/
@ -764,7 +775,7 @@ public static SchemaRefreshMode parse(String value) {
SSL_ROOT_CERT, SSL_CLIENT_KEY, SNAPSHOT_LOCK_TIMEOUT_MS, SSL_SOCKET_FACTORY,
STATUS_UPDATE_INTERVAL_MS, TCP_KEEPALIVE, INCLUDE_UNKNOWN_DATATYPES,
RelationalDatabaseConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, SCHEMA_REFRESH_MODE, CommonConnectorConfig.TOMBSTONES_ON_DELETE,
XMIN_FETCH_INTERVAL, SNAPSHOT_MODE_CLASS, CommonConnectorConfig.SOURCE_STRUCT_MAKER_VERSION);
XMIN_FETCH_INTERVAL, TOASTED_VALUE_PLACEHOLDER, SNAPSHOT_MODE_CLASS, CommonConnectorConfig.SOURCE_STRUCT_MAKER_VERSION);
private final HStoreHandlingMode hStoreHandlingMode;
private final SnapshotMode snapshotMode;
@ -883,6 +894,14 @@ protected Duration xminFetchInterval() {
return Duration.ofMillis(getConfig().getLong(PostgresConnectorConfig.XMIN_FETCH_INTERVAL));
}
protected byte[] toastedValuePlaceholder() {
final String placeholder = getConfig().getString(TOASTED_VALUE_PLACEHOLDER);
if (placeholder.startsWith("hex:")) {
Strings.hexStringToByteArray(placeholder.substring(4));
}
return placeholder.getBytes();
}
@Override
protected SourceInfoStructMaker<? extends AbstractSourceInfo> getSourceInfoStructMaker(Version version) {
switch (version) {
@ -901,7 +920,8 @@ protected static ConfigDef configDef() {
Field.group(config, "Events", SCHEMA_WHITELIST, SCHEMA_BLACKLIST, TABLE_WHITELIST, TABLE_BLACKLIST,
COLUMN_BLACKLIST, INCLUDE_UNKNOWN_DATATYPES, SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE,
CommonConnectorConfig.TOMBSTONES_ON_DELETE, Heartbeat.HEARTBEAT_INTERVAL,
Heartbeat.HEARTBEAT_TOPICS_PREFIX, CommonConnectorConfig.SOURCE_STRUCT_MAKER_VERSION);
Heartbeat.HEARTBEAT_TOPICS_PREFIX, CommonConnectorConfig.SOURCE_STRUCT_MAKER_VERSION,
TOASTED_VALUE_PLACEHOLDER);
Field.group(config, "Connector", CommonConnectorConfig.POLL_INTERVAL_MS, CommonConnectorConfig.MAX_BATCH_SIZE, CommonConnectorConfig.MAX_QUEUE_SIZE,
CommonConnectorConfig.SNAPSHOT_DELAY_MS, CommonConnectorConfig.SNAPSHOT_FETCH_SIZE,
SNAPSHOT_MODE, SNAPSHOT_LOCK_TIMEOUT_MS, TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE, HSTORE_HANDLING_MODE,

View File

@ -69,8 +69,17 @@ protected PostgresSchema(PostgresConnectorConfig config, TypeRegistry typeRegist
}
private static TableSchemaBuilder getTableSchemaBuilder(PostgresConnectorConfig config, TypeRegistry typeRegistry, Charset databaseCharset) {
PostgresValueConverter valueConverter = new PostgresValueConverter(databaseCharset, config.getDecimalMode(), config.getTemporalPrecisionMode(),
ZoneOffset.UTC, null, config.includeUnknownDatatypes(), typeRegistry, config.hStoreHandlingMode());
PostgresValueConverter valueConverter = new PostgresValueConverter(
databaseCharset,
config.getDecimalMode(),
config.getTemporalPrecisionMode(),
ZoneOffset.UTC,
null,
config.includeUnknownDatatypes(),
typeRegistry,
config.hStoreHandlingMode(),
config.toastedValuePlaceholder()
);
return new TableSchemaBuilder(valueConverter, SchemaNameAdjuster.create(LOGGER), config.getSourceInfoStructMaker().schema(), config.getSanitizeFieldNames());
}

View File

@ -123,16 +123,21 @@ public class PostgresValueConverter extends JdbcValueConverters {
private final JsonFactory jsonFactory;
private final String toastPlaceholderString;
private final byte[] toastPlaceholderBinary;
protected PostgresValueConverter(Charset databaseCharset, DecimalMode decimalMode,
TemporalPrecisionMode temporalPrecisionMode, ZoneOffset defaultOffset,
BigIntUnsignedMode bigIntUnsignedMode, boolean includeUnknownDatatypes, TypeRegistry typeRegistry,
HStoreHandlingMode hStoreMode) {
HStoreHandlingMode hStoreMode, byte[] toastPlaceholder) {
super(decimalMode, temporalPrecisionMode, defaultOffset, null, bigIntUnsignedMode);
this.databaseCharset = databaseCharset;
this.jsonFactory = new JsonFactory();
this.includeUnknownDatatypes = includeUnknownDatatypes;
this.typeRegistry = typeRegistry;
this.hStoreMode = hStoreMode;
this.toastPlaceholderBinary = toastPlaceholder;
this.toastPlaceholderString = new String(toastPlaceholder);
}
@Override
@ -321,7 +326,7 @@ public ValueConverter converter(Column column, Field fieldDefn) {
case PgOid.INT4RANGE_OID:
case PgOid.NUM_RANGE_OID:
case PgOid.INT8RANGE_OID:
return data -> super.convertString(column, fieldDefn, data);
return data -> convertString(column, fieldDefn, data);
case PgOid.POINT:
return data -> convertPoint(column, fieldDefn, data);
case PgOid.MONEY:
@ -788,9 +793,29 @@ protected int getTimePrecision(Column column) {
*/
@Override
protected Object convertBinary(Column column, Field fieldDefn, Object data) {
if (data instanceof ToastedReplicationMessageColumn.ToastedValue) {
return toastPlaceholderBinary;
}
if (data instanceof PgArray) {
data = ((PgArray) data).toString();
}
return super.convertBinary(column, fieldDefn, (data instanceof PGobject) ? ((PGobject) data).getValue() : data);
}
/**
* Replaces toasted value with a placeholder
*
* @param column the column definition describing the {@code data} value; never null
* @param fieldDefn the field definition; never null
* @param data the data object to be converted into a Kafka Connect type
* @return the converted value, or null if the conversion could not be made and the column allows nulls
* @throws IllegalArgumentException if the value could not be converted but the column does not allow nulls
*/
@Override
protected Object convertString(Column column, Field fieldDefn, Object data) {
if (data instanceof ToastedReplicationMessageColumn.ToastedValue) {
return toastPlaceholderString;
}
return super.convertString(column, fieldDefn, data);
}
}

View File

@ -18,6 +18,10 @@
*/
public class ToastedReplicationMessageColumn extends AbstractReplicationMessageColumn {
public static enum ToastedValue {
TOAST
};
public ToastedReplicationMessageColumn(String columnName, PostgresType type, String typeWithModifiers, boolean optional, boolean hasMetadata) {
super(columnName, type, typeWithModifiers, optional, hasMetadata);
}
@ -29,6 +33,6 @@ public boolean isToastedColumn() {
@Override
public Object getValue(PostgresStreamingChangeEventSource.PgConnectionSupplier connection, boolean includeUnknownDatatypes) {
throw new UnsupportedOperationException("A toasted column does not supply a value");
return ToastedValue.TOAST;
}
}

View File

@ -1090,6 +1090,8 @@ public void shouldRefreshSchemaOnUnchangedToastedDataWhenSchemaChanged() throws
@Test
@FixFor("DBZ-842")
public void shouldNotPropagateUnchangedToastedData() throws Exception {
final String toastedValuePlaceholder = "__DEBEZIUM_TOASTED_VALUE__";
startConnector(config -> config
.with(PostgresConnectorConfig.SCHEMA_REFRESH_MODE, PostgresConnectorConfig.SchemaRefreshMode.COLUMNS_DIFF_EXCLUDE_UNCHANGED_TOAST)
);
@ -1140,13 +1142,13 @@ public void shouldNotPropagateUnchangedToastedData() throws Exception {
), consumer.remove(), Envelope.FieldName.AFTER);
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 2),
new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, null),
new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, "")
new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, toastedValuePlaceholder),
new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, toastedValuePlaceholder)
), consumer.remove(), Envelope.FieldName.AFTER);
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 2),
new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, null),
new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, "")
new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, toastedValuePlaceholder),
new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, toastedValuePlaceholder)
), consumer.remove(), Envelope.FieldName.AFTER);
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 3),
@ -1155,13 +1157,13 @@ public void shouldNotPropagateUnchangedToastedData() throws Exception {
), consumer.remove(), Envelope.FieldName.AFTER);
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 3),
new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, null),
new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, "")
new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, toastedValuePlaceholder),
new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, toastedValuePlaceholder)
), consumer.remove(), Envelope.FieldName.AFTER);
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 3),
new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, null),
new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, "")
new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, toastedValuePlaceholder),
new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, toastedValuePlaceholder)
), consumer.remove(), Envelope.FieldName.AFTER);
}