DBZ-1685 Document and add support for column.mask.with for PostgreSQL connector

This commit is contained in:
Chris Cranford 2020-01-13 12:38:35 -05:00 committed by Gunnar Morling
parent 9fc1eada11
commit 1d41d53636
3 changed files with 85 additions and 0 deletions

View File

@ -871,6 +871,24 @@ public static SchemaRefreshMode parse(String value) {
"the original value is a toasted value not provided by the database." + "the original value is a toasted value not provided by the database." +
"If starts with 'hex:' prefix it is expected that the rest of the string repesents hexadecimally encoded octets."); "If starts with 'hex:' prefix it is expected that the rest of the string repesents hexadecimally encoded octets.");
/**
* Method that generates a Field for specifying that string columns whose names match a set of regular expressions should
* have their values masked by the specified number of asterisk ('*') characters.
*
* @param length the number of asterisks that should appear in place of the column's string values written in source records;
* must be positive
* @return the field; never null
*/
public static final Field MASK_COLUMN(int length) {
if (length <= 0) {
throw new IllegalArgumentException("The mask length must be positive");
}
return Field.create("column.mask.with." + length + ".chars")
.withValidation(Field::isInteger)
.withDescription("A comma-separated list of regular expressions matching fully-qualified names of columns that should "
+ "be masked with " + length + " asterisk ('*') characters.");
}
/** /**
* The set of {@link Field}s defined as part of this configuration. * The set of {@link Field}s defined as part of this configuration.
*/ */

View File

@ -1305,6 +1305,69 @@ public void shouldCreatePublicationWhenReplicationSlotExists() throws Exception
}); });
} }
@Test
@FixFor("DBZ-1685")
public void shouldConsumeEventsWithMaskedColumns() throws Exception {
TestHelper.execute(SETUP_TABLES_STMT);
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.MASK_COLUMN(5), "s2.a.bb");
start(PostgresConnector.class, configBuilder.build());
assertConnectorIsRunning();
SourceRecords actualRecords = consumeRecordsByTopic(2);
assertThat(actualRecords.allRecordsInOrder().size()).isEqualTo(2);
List<SourceRecord> recordsForTopicS2 = actualRecords.recordsForTopic(topicName("s2.a"));
assertThat(recordsForTopicS2.size()).isEqualTo(1);
SourceRecord record = recordsForTopicS2.remove(0);
VerifyRecord.isValidRead(record, PK_FIELD, 1);
Struct value = (Struct) record.value();
if (value.getStruct("after") != null) {
assertThat(value.getStruct("after").getString("bb")).isEqualTo("*****");
}
// insert and verify inserts
TestHelper.execute("INSERT INTO s2.a (aa,bb) VALUES (1, 'test');");
actualRecords = consumeRecordsByTopic(1);
assertThat(actualRecords.topics().size()).isEqualTo(1);
recordsForTopicS2 = actualRecords.recordsForTopic(topicName("s2.a"));
assertThat(recordsForTopicS2.size()).isEqualTo(1);
record = recordsForTopicS2.remove(0);
VerifyRecord.isValidInsert(record, PK_FIELD, 2);
value = (Struct) record.value();
if (value.getStruct("after") != null) {
assertThat(value.getStruct("after").getString("bb")).isEqualTo("*****");
}
// update and verify update
TestHelper.execute("UPDATE s2.a SET aa=2, bb='hello' WHERE pk=2;");
actualRecords = consumeRecordsByTopic(1);
assertThat(actualRecords.topics().size()).isEqualTo(1);
recordsForTopicS2 = actualRecords.recordsForTopic(topicName("s2.a"));
assertThat(recordsForTopicS2.size()).isEqualTo(1);
record = recordsForTopicS2.remove(0);
VerifyRecord.isValidUpdate(record, PK_FIELD, 2);
value = (Struct) record.value();
if (value.getStruct("before") != null) {
assertThat(value.getStruct("before").getString("bb")).isEqualTo("*****");
}
if (value.getStruct("after") != null) {
assertThat(value.getStruct("after").getString("bb")).isEqualTo("*****");
}
stopConnector();
}
private CompletableFuture<Void> batchInsertRecords(long recordsCount, int batchSize) { private CompletableFuture<Void> batchInsertRecords(long recordsCount, int batchSize) {
String insertStmt = "INSERT INTO text_table(j, jb, x, u) " + String insertStmt = "INSERT INTO text_table(j, jb, x, u) " +
"VALUES ('{\"bar\": \"baz\"}'::json, '{\"bar\": \"baz\"}'::jsonb, " + "VALUES ('{\"bar\": \"baz\"}'::json, '{\"bar\": \"baz\"}'::jsonb, " +

View File

@ -1627,6 +1627,10 @@ Debezium will instead use the publication as defined.
When `true` the delete operations are represented by a delete event and a subsequent tombstone event. When `false` only a delete event is sent. + When `true` the delete operations are represented by a delete event and a subsequent tombstone event. When `false` only a delete event is sent. +
Emitting the tombstone event (the default behavior) allows Kafka to completely delete all events pertaining to the given key once the source record got deleted. Emitting the tombstone event (the default behavior) allows Kafka to completely delete all events pertaining to the given key once the source record got deleted.
|`column.mask.with._length_.chars`
|_n/a_
|An optional comma-separated list of regular expressions that match the fully-qualified names of character-based columns whose values should be replaced in the change event message values with a field value consisting of the specified number of asterisk (`*`) characters. Multiple properties with different lengths can be used in a single configuration, although in each the length must be a positive integer or zero. Fully-qualified names for columns are of the form _databaseName_._tableName_._columnName_, or _databaseName_._schemaName_._tableName_._columnName_.
|`column.propagate.source.type` 0.8.0 and later |`column.propagate.source.type` 0.8.0 and later
|_n/a_ |_n/a_
|An optional comma-separated list of regular expressions that match the fully-qualified names of columns whose original type and length should be added as a parameter to the corresponding field schemas in the emitted change messages. |An optional comma-separated list of regular expressions that match the fully-qualified names of columns whose original type and length should be added as a parameter to the corresponding field schemas in the emitted change messages.