From 1d41d53636f95730e622e589fc31c732679cd923 Mon Sep 17 00:00:00 2001 From: Chris Cranford Date: Mon, 13 Jan 2020 12:38:35 -0500 Subject: [PATCH] DBZ-1685 Document and add support for column.mask.with for PostgreSQL connector --- .../postgresql/PostgresConnectorConfig.java | 18 ++++++ .../postgresql/PostgresConnectorIT.java | 63 +++++++++++++++++++ .../ROOT/pages/connectors/postgresql.adoc | 4 ++ 3 files changed, 85 insertions(+) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java index 6de723a6e..5ec980478 100755 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java @@ -871,6 +871,24 @@ public static SchemaRefreshMode parse(String value) { "the original value is a toasted value not provided by the database." + "If starts with 'hex:' prefix it is expected that the rest of the string repesents hexadecimally encoded octets."); + /** + * Method that generates a Field for specifying that string columns whose names match a set of regular expressions should + * have their values masked by the specified number of asterisk ('*') characters. + * + * @param length the number of asterisks that should appear in place of the column's string values written in source records; + * must be positive + * @return the field; never null + */ + public static final Field MASK_COLUMN(int length) { + if (length <= 0) { + throw new IllegalArgumentException("The mask length must be positive"); + } + return Field.create("column.mask.with." + length + ".chars") + .withValidation(Field::isInteger) + .withDescription("A comma-separated list of regular expressions matching fully-qualified names of columns that should " + + "be masked with " + length + " asterisk ('*') characters."); + } + /** * The set of {@link Field}s defined as part of this configuration. */ diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java index 2f9e35284..954e3382b 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java @@ -1305,6 +1305,69 @@ public void shouldCreatePublicationWhenReplicationSlotExists() throws Exception }); } + @Test + @FixFor("DBZ-1685") + public void shouldConsumeEventsWithMaskedColumns() throws Exception { + TestHelper.execute(SETUP_TABLES_STMT); + Configuration.Builder configBuilder = TestHelper.defaultConfig() + .with(PostgresConnectorConfig.MASK_COLUMN(5), "s2.a.bb"); + start(PostgresConnector.class, configBuilder.build()); + assertConnectorIsRunning(); + + SourceRecords actualRecords = consumeRecordsByTopic(2); + assertThat(actualRecords.allRecordsInOrder().size()).isEqualTo(2); + + List recordsForTopicS2 = actualRecords.recordsForTopic(topicName("s2.a")); + assertThat(recordsForTopicS2.size()).isEqualTo(1); + + SourceRecord record = recordsForTopicS2.remove(0); + VerifyRecord.isValidRead(record, PK_FIELD, 1); + + Struct value = (Struct) record.value(); + if (value.getStruct("after") != null) { + assertThat(value.getStruct("after").getString("bb")).isEqualTo("*****"); + } + + // insert and verify inserts + TestHelper.execute("INSERT INTO s2.a (aa,bb) VALUES (1, 'test');"); + + actualRecords = consumeRecordsByTopic(1); + assertThat(actualRecords.topics().size()).isEqualTo(1); + + recordsForTopicS2 = actualRecords.recordsForTopic(topicName("s2.a")); + assertThat(recordsForTopicS2.size()).isEqualTo(1); + + record = recordsForTopicS2.remove(0); + VerifyRecord.isValidInsert(record, PK_FIELD, 2); + + value = (Struct) record.value(); + if (value.getStruct("after") != null) { + assertThat(value.getStruct("after").getString("bb")).isEqualTo("*****"); + } + + // update and verify update + TestHelper.execute("UPDATE s2.a SET aa=2, bb='hello' WHERE pk=2;"); + + actualRecords = consumeRecordsByTopic(1); + assertThat(actualRecords.topics().size()).isEqualTo(1); + + recordsForTopicS2 = actualRecords.recordsForTopic(topicName("s2.a")); + assertThat(recordsForTopicS2.size()).isEqualTo(1); + + record = recordsForTopicS2.remove(0); + VerifyRecord.isValidUpdate(record, PK_FIELD, 2); + + value = (Struct) record.value(); + if (value.getStruct("before") != null) { + assertThat(value.getStruct("before").getString("bb")).isEqualTo("*****"); + } + if (value.getStruct("after") != null) { + assertThat(value.getStruct("after").getString("bb")).isEqualTo("*****"); + } + + stopConnector(); + } + private CompletableFuture batchInsertRecords(long recordsCount, int batchSize) { String insertStmt = "INSERT INTO text_table(j, jb, x, u) " + "VALUES ('{\"bar\": \"baz\"}'::json, '{\"bar\": \"baz\"}'::jsonb, " + diff --git a/documentation/modules/ROOT/pages/connectors/postgresql.adoc b/documentation/modules/ROOT/pages/connectors/postgresql.adoc index d44c5e7d7..dfc801e4b 100644 --- a/documentation/modules/ROOT/pages/connectors/postgresql.adoc +++ b/documentation/modules/ROOT/pages/connectors/postgresql.adoc @@ -1627,6 +1627,10 @@ Debezium will instead use the publication as defined. When `true` the delete operations are represented by a delete event and a subsequent tombstone event. When `false` only a delete event is sent. + Emitting the tombstone event (the default behavior) allows Kafka to completely delete all events pertaining to the given key once the source record got deleted. +|`column.mask.with._length_.chars` +|_n/a_ +|An optional comma-separated list of regular expressions that match the fully-qualified names of character-based columns whose values should be replaced in the change event message values with a field value consisting of the specified number of asterisk (`*`) characters. Multiple properties with different lengths can be used in a single configuration, although in each the length must be a positive integer or zero. Fully-qualified names for columns are of the form _databaseName_._tableName_._columnName_, or _databaseName_._schemaName_._tableName_._columnName_. + |`column.propagate.source.type` 0.8.0 and later |_n/a_ |An optional comma-separated list of regular expressions that match the fully-qualified names of columns whose original type and length should be added as a parameter to the corresponding field schemas in the emitted change messages.