DBZ-1368 Support toasted placeholder in protobuf

This commit is contained in:
Jiri Pechanec 2019-09-13 13:14:52 +02:00 committed by Gunnar Morling
parent ae0bf61bb4
commit 9dd5a16160
2 changed files with 16 additions and 13 deletions

View File

@ -29,6 +29,7 @@
import io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.PgConnectionSupplier;
import io.debezium.connector.postgresql.PostgresType;
import io.debezium.connector.postgresql.PostgresValueConverter;
import io.debezium.connector.postgresql.ToastedReplicationMessageColumn;
import io.debezium.connector.postgresql.TypeRegistry;
import io.debezium.connector.postgresql.connection.AbstractReplicationMessageColumn;
import io.debezium.connector.postgresql.connection.ReplicationMessage;
@ -106,6 +107,9 @@ private List<ReplicationMessage.Column> transform(List<PgProto.DatumMessage> mes
final Optional<PgProto.TypeInfo> typeInfo = Optional.ofNullable(hasTypeMetadata() && typeInfoList != null ? typeInfoList.get(index) : null);
final String columnName = Strings.unquoteIdentifierPart(datum.getColumnName());
final PostgresType type = typeRegistry.get((int) datum.getColumnType());
if (datum.hasDatumMissing()) {
return new ToastedReplicationMessageColumn(columnName, type, typeInfo.map(PgProto.TypeInfo::getModifier).orElse(null), typeInfo.map(PgProto.TypeInfo::getValueOptional).orElse(Boolean.FALSE), hasTypeMetadata());
}
return new AbstractReplicationMessageColumn(columnName, type, typeInfo.map(PgProto.TypeInfo::getModifier).orElse(null), typeInfo.map(PgProto.TypeInfo::getValueOptional).orElse(Boolean.FALSE), hasTypeMetadata()) {
@Override
@ -141,8 +145,7 @@ public boolean isLastEventForLsn() {
*/
public Object getValue(PgProto.DatumMessage datumMessage, PgConnectionSupplier connection, boolean includeUnknownDatatypes) {
if (datumMessage.hasDatumMissing()) {
LOGGER.trace("No value received for unchanged TOASTed column {}", datumMessage.getColumnName());
return null;
return ToastedReplicationMessageColumn.ToastedValue.TOAST;
}
int columnType = (int) datumMessage.getColumnType();

View File

@ -70,6 +70,8 @@ public class RecordsStreamProducerIT extends AbstractRecordsProducerTest {
@Rule
public TestRule conditionalFail = new ConditionalFail();
private static final String TOASTED_VALUE_PLACEHOLDER = "__DEBEZIUM_TOASTED_VALUE__";
@Before
public void before() throws Exception {
// ensure the slot is deleted for each test
@ -1090,8 +1092,6 @@ public void shouldRefreshSchemaOnUnchangedToastedDataWhenSchemaChanged() throws
@Test
@FixFor("DBZ-842")
public void shouldNotPropagateUnchangedToastedData() throws Exception {
final String toastedValuePlaceholder = "__DEBEZIUM_TOASTED_VALUE__";
startConnector(config -> config
.with(PostgresConnectorConfig.SCHEMA_REFRESH_MODE, PostgresConnectorConfig.SchemaRefreshMode.COLUMNS_DIFF_EXCLUDE_UNCHANGED_TOAST)
);
@ -1142,13 +1142,13 @@ public void shouldNotPropagateUnchangedToastedData() throws Exception {
), consumer.remove(), Envelope.FieldName.AFTER);
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 2),
new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, toastedValuePlaceholder),
new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, toastedValuePlaceholder)
new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, TOASTED_VALUE_PLACEHOLDER),
new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, TOASTED_VALUE_PLACEHOLDER)
), consumer.remove(), Envelope.FieldName.AFTER);
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 2),
new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, toastedValuePlaceholder),
new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, toastedValuePlaceholder)
new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, TOASTED_VALUE_PLACEHOLDER),
new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, TOASTED_VALUE_PLACEHOLDER)
), consumer.remove(), Envelope.FieldName.AFTER);
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 3),
@ -1157,13 +1157,13 @@ public void shouldNotPropagateUnchangedToastedData() throws Exception {
), consumer.remove(), Envelope.FieldName.AFTER);
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 3),
new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, toastedValuePlaceholder),
new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, toastedValuePlaceholder)
new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, TOASTED_VALUE_PLACEHOLDER),
new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, TOASTED_VALUE_PLACEHOLDER)
), consumer.remove(), Envelope.FieldName.AFTER);
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 3),
new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, toastedValuePlaceholder),
new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, toastedValuePlaceholder)
new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, TOASTED_VALUE_PLACEHOLDER),
new SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, TOASTED_VALUE_PLACEHOLDER)
), consumer.remove(), Envelope.FieldName.AFTER);
}
@ -1415,7 +1415,7 @@ private void testReceiveChangesForReplicaIdentityFullTableWithToastedValue(Postg
assertRecordSchemaAndValues(Arrays.asList(
new SchemaAndValueField("id", SchemaBuilder.INT32_SCHEMA, 1),
new SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 20),
new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, null)
new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, TOASTED_VALUE_PLACEHOLDER)
), updatedRecord, Envelope.FieldName.AFTER);
}
else {