DBZ-3953 Mark wal2json plugin as deprecated

This commit is contained in:
Jiri Pechanec 2022-01-21 10:05:25 +01:00 committed by Gunnar Morling
parent 664d989e42
commit f1b03c176e
3 changed files with 42 additions and 64 deletions

View File

@ -396,6 +396,7 @@ public boolean supportsLogicalDecodingMessage() {
return false;
}
},
@Deprecated
WAL2JSON_STREAMING("wal2json_streaming") {
@Override
public MessageDecoder messageDecoder(MessageDecoderContext config) {
@ -427,6 +428,7 @@ public boolean supportsLogicalDecodingMessage() {
return false;
}
},
@Deprecated
WAL2JSON_RDS_STREAMING("wal2json_rds_streaming") {
@Override
public MessageDecoder messageDecoder(MessageDecoderContext config) {
@ -463,6 +465,7 @@ public boolean supportsLogicalDecodingMessage() {
return false;
}
},
@Deprecated
WAL2JSON("wal2json") {
@Override
public MessageDecoder messageDecoder(MessageDecoderContext config) {
@ -494,6 +497,7 @@ public boolean supportsLogicalDecodingMessage() {
return true;
}
},
@Deprecated
WAL2JSON_RDS("wal2json_rds") {
@Override
public MessageDecoder messageDecoder(MessageDecoderContext config) {
@ -672,13 +676,10 @@ public static SchemaRefreshMode parse(String value) {
.withEnum(LogicalDecoder.class, LogicalDecoder.DECODERBUFS)
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withValidation(PostgresConnectorConfig::validatePluginName)
.withDescription("The name of the Postgres logical decoding plugin installed on the server. " +
"Supported values are '" + LogicalDecoder.DECODERBUFS.getValue()
+ "', '" + LogicalDecoder.WAL2JSON.getValue()
+ "', '" + LogicalDecoder.PGOUTPUT.getValue()
+ "', '" + LogicalDecoder.WAL2JSON_STREAMING.getValue()
+ "', '" + LogicalDecoder.WAL2JSON_RDS.getValue()
+ "' and '" + LogicalDecoder.WAL2JSON_RDS_STREAMING.getValue()
+ "' and '" + LogicalDecoder.PGOUTPUT.getValue()
+ "'. " +
"Defaults to '" + LogicalDecoder.DECODERBUFS.getValue() + "'.");
@ -1313,6 +1314,15 @@ private static int validateToastedValuePlaceholder(Configuration config, Field f
return 0;
}
private static int validatePluginName(Configuration config, Field field, Field.ValidationOutput problems) {
final String pluginName = config.getString(PLUGIN_NAME);
if (!Strings.isNullOrEmpty(pluginName) && pluginName.startsWith("wal2json")) {
LOGGER.warn("Logical decoder '{}' is deprecated and will be removed in future versions",
pluginName);
}
return 0;
}
private static int validateLogicalDecodingMessageExcludeList(Configuration config, Field field, Field.ValidationOutput problems) {
String includeList = config.getString(LOGICAL_DECODING_MESSAGE_PREFIX_INCLUDE_LIST);
String excludeList = config.getString(LOGICAL_DECODING_MESSAGE_PREFIX_EXCLUDE_LIST);

View File

@ -11,6 +11,7 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions;
import org.junit.Before;
@ -92,6 +93,7 @@ public void shouldProcessFromStreaming() throws Exception {
waitForStreamingToStart();
TestHelper.execute(STATEMENTS);
TestHelper.execute("ALTER TABLE nopk.t3 REPLICA IDENTITY FULL");
final int expectedRecordsCount = 1 + 1 + 1;
@ -103,6 +105,21 @@ public void shouldProcessFromStreaming() throws Exception {
Assertions.assertThat(recordsByTopic.get("test_server.nopk.t2").get(0).keySchema().field("pk")).isNotNull();
Assertions.assertThat(recordsByTopic.get("test_server.nopk.t2").get(0).keySchema().fields()).hasSize(1);
Assertions.assertThat(recordsByTopic.get("test_server.nopk.t3").get(0).keySchema()).isNull();
TestHelper.execute("UPDATE nopk.t3 SET val = 300 WHERE pk = 3;");
TestHelper.execute("DELETE FROM nopk.t3;");
consumer.expects(2);
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
final Map<String, List<SourceRecord>> recordsByTopic2 = recordsByTopic(2, consumer);
final SourceRecord update = recordsByTopic2.get("test_server.nopk.t3").get(0);
final SourceRecord delete = recordsByTopic2.get("test_server.nopk.t3").get(1);
Assertions.assertThat(update.keySchema()).isNull();
Assertions.assertThat(delete.keySchema()).isNull();
Assertions.assertThat(((Struct) update.value()).getStruct("before").get("val")).isEqualTo(30);
Assertions.assertThat(((Struct) update.value()).getStruct("after").get("val")).isEqualTo(300);
Assertions.assertThat(((Struct) delete.value()).getStruct("before").get("val")).isEqualTo(300);
}
@Test

View File

@ -58,7 +58,7 @@ The PostgreSQL connector contains two main parts that work together to read and
ifdef::community[]
* A logical decoding output plug-in. You might need to install the output plug-in that you choose to use. You must configure a replication slot that uses your chosen output plug-in before running the PostgreSQL server. The plug-in can be one of the following:
** link:https://github.com/debezium/postgres-decoderbufs[`decoderbufs`] is based on Protobuf and maintained by the {prodname} community.
** link:https://github.com/eulerto/wal2json[`wal2json`] is based on JSON and maintained by the wal2json community.
** link:https://github.com/eulerto/wal2json[`wal2json`] is based on JSON and maintained by the wal2json community (deprecated, scheduled for removal).
** `pgoutput` is the standard logical decoding output plug-in in PostgreSQL 10+. It is maintained by the PostgreSQL community, and used by PostgreSQL itself for link:https://www.postgresql.org/docs/current/logical-replication-architecture.html[logical replication]. This plug-in is always present so no additional libraries need to be installed. The {prodname} connector interprets the raw replication event stream directly into change events.
* Java code (the actual Kafka Connect connector) that reads the changes produced by the chosen logical decoding output plug-in. It uses PostgreSQL's link:https://www.postgresql.org/docs/current/static/logicaldecoding-walsender.html[_streaming replication protocol_], by means of the PostgreSQL link:https://github.com/pgjdbc/pgjdbc[_JDBC driver_]
@ -1889,16 +1889,6 @@ There is no safe way for {prodname} to read the missing value out-of-bands direc
Similarly, when receiving a `DELETE` event, no TOAST columns, if any, are in the `before` field.
As {prodname} cannot safely provide the column value in this case, the connector returns a placeholder value as defined by the connector configuration property, `unavailable.value.placeholder`.
ifdef::community[]
[IMPORTANT]
====
There is a problem related to Amazon RDS instances. The `wal2json` plug-in has evolved over the time and there were releases that provided out-of-band toasted values. Amazon supports different versions of the plug-in for different PostgreSQL versions. See https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/CHAP_PostgreSQL.html[Amazon's documentation] to obtain version to version mapping. For consistent toasted values handling:
* Use the `pgoutput` plug-in for PostgreSQL 10+ instances.
* Set `include-unchanged-toast=0` for older versions of the `wal2json` plug-in by using the `slot.stream.params` configuration option.
====
endif::community[]
[id="postgresql-default-values"]
=== Default values
If a default value is specified for a column in the database schema, the PostgreSQL connector will attempt to propagate this value to the Kafka schema whenever possible. Most common data types are supported, including:
@ -2012,7 +2002,7 @@ It is possible to capture changes in a PostgreSQL database that is running in li
It is link:https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_WorkingWithParamGroups.html[automatically changed] when the `rds.logical_replication` parameter is set to `1`.
If the `wal_level` is not set to `logical` after you make the preceding change, it is probably because the instance has to be restarted after the parameter group change.
Restarts occur during your maintenance window, or you can initiate a restart manually.
* Set the {prodname} `plugin.name` parameter to `wal2json`. You can skip this on PostgreSQL 10+ if you plan to use `pgoutput` logical replication stream support.
* Set the {prodname} `plugin.name` parameter to `pgoutput`.
* Initiate logical replication from an AWS account that has the `rds_replication` role.
The role grants permissions to manage logical slots and to stream data using logical slots.
By default, only the master user account on AWS has the `rds_replication` role on Amazon RDS.
@ -2021,24 +2011,10 @@ It is possible to capture changes in a PostgreSQL database that is running in li
To enable accounts other than the master account to create an initial snapshot, you must grant `SELECT` permission to the accounts on the tables to be captured.
For more information about security for PostgreSQL logical replication, see the link:https://www.postgresql.org/docs/current/logical-replication-security.html[PostgreSQL documentation].
[IMPORTANT]
====
Ensure that you use the latest versions of PostgreSQL 9.6, 10 or 11 on Amazon RDS.
Otherwise, older versions of the `wal2json` plug-in might be installed.
See https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/CHAP_PostgreSQL.html[the official documentation] for the exact `wal2json` versions installed on Amazon RDS.
In the case of an older version, replication messages received from the database might not contain complete information about type constraints such as length or scale or `NULL`/`NOT NULL`. This might cause creation of messages with an inconsistent schema for a short period of time when there are changes to a column's definition.
As of January 2019, the following PostgreSQL versions on RDS come with an up-to-date version of `wal2json` and thus should be used:
* PostgreSQL 9.6: 9.6.10 and newer
* PostgreSQL 10: 10.5 and newer
* PostgreSQL 11: any version
====
[[postgresql-on-azure]]
==== PostgreSQL on Azure
It is possible to use {prodname} withlink:https://docs.microsoft.com/azure/postgresql/[Azure Database for PostgreSQL], which has support for the `wal2json` and`pgoutput`plug-ins, both of which are supported by {prodname} as well.
It is possible to use {prodname} withlink:https://docs.microsoft.com/azure/postgresql/[Azure Database for PostgreSQL], which has support for the `pgoutput` logical decodingplug-in, which is supported by {prodname}.
Set the Azure replication support to`logical`. You can use the link:https://docs.microsoft.com/en-us/azure/postgresql/concepts-logical#using-azure-cli[Azure CLI] or the link:https://docs.microsoft.com/en-us/azure/postgresql/concepts-logical#using-azure-portal[Azure Portal] to configure this. For example, to use the Azure CLI, here are the link:https://docs.microsoft.com/cli/azure/postgres/server?view=azure-cli-latest[`az postgres server`] commands that you need to execute:
@ -2066,19 +2042,11 @@ While using the `pgoutput` plug-in, it is recommended that you configure `filter
See {link-prefix}:{link-postgresql-plugins}[Logical Decoding Output Plug-in Installation for PostgreSQL] for more detailed instructions for setting up and testing logical decoding plug-ins.
====
[NOTE]
====
As of {prodname} 0.10, the connector supports PostgreSQL 10+ logical replication streaming by using `pgoutput`.
This means that a logical decoding output plug-in is no longer necessary and changes can be emitted directly from the replication stream by the connector.
====
As of PostgreSQL 9.4, the only way to read changes to the write-ahead-log is to install a logical decoding output plug-in. Plug-ins are written in C, compiled, and installed on the machine that runs the PostgreSQL server. Plug-ins use a number of PostgreSQL specific APIs, as described by the link:https://www.postgresql.org/docs/current/static/logicaldecoding-output-plugin.html[PostgreSQL documentation].
The PostgreSQL connector works with one of {prodname}'s supported logical decoding plug-ins to encode the changes in either link:https://github.com/google/protobuf[Protobuf format] or link:http://www.json.org/[JSON] format.
See the documentation for your chosen plug-in to learn more about the plug-in's requirements, limitations, and how to compile it.
* link:https://github.com/debezium/postgres-decoderbufs/blob/main/README.md[`protobuf`]
* link:https://github.com/eulerto/wal2json/blob/master/README.md[`wal2json`]
The PostgreSQL connector works with one of {prodname}'s supported logical decoding plug-ins to encode the changes in either link:https://github.com/google/protobuf[Protobuf format] or link:https://github.com/postgres/postgres/blob/master/src/backend/replication/pgoutput/pgoutput.c[pgoutput] format.
`pgoutput` plugin comes out-of-the-box with PostgreSQL server.
For more details Protobuf plug-in see the link:https://github.com/debezium/postgres-decoderbufs/blob/main/README.md[`documentation`] to learn more about the plug-in's requirements, limitations, and how to compile it.
For simplicity, {prodname} also provides a container image based on the upstream PostgreSQL server image, on top of which it compiles and installs the plug-ins. You can link:https://github.com/debezium/docker-images/tree/main/postgres/13[use this image] as an example of the detailed steps required for the installation.
@ -2093,14 +2061,6 @@ The {prodname} logical decoding plug-ins have been installed and tested on only
Plug-in behavior is not completely the same for all cases.
These differences have been identified:
* The `wal2json` and `decoderbufs` plug-ins emit events for tables without primary keys.
* The `wal2json` plug-in does not support special values, such as `NaN` or `infinity`, for floating point types.
* The `wal2json` plug-in should be used with the `schema.refresh.mode` connector configuration property set to `columns_diff_exclude_unchanged_toast`. Otherwise, when receiving a change event for a row that contains an unchanged `TOAST` column, no field for that column is contained in the emitted change event's `after` field. This is because `wal2json` plug-in messages do not contain a field for such a column.
+
The requirement for adding this is tracked under the link:https://github.com/eulerto/wal2json/issues/98[`wal2json` issue 98].
See the documentation of `columns_diff_exclude_unchanged_toast` further below for implications of using it.
* The `pgoutput` plug-in does not emit all events for tables without primary keys. It emits only events for `INSERT` operations.
* While all plug-ins will refresh schema metadata from the database upon detection of a schema change during streaming, the `pgoutput` plug-in is somewhat more 'eager' about triggering such refreshes. For example, a change to the default value for a column will trigger a refresh with `pgoutput`, while other plug-ins will not be aware of this change until another change triggers a refresh (eg. addition of a new column.) This is due to the behaviour of `pgoutput`, rather than {prodname} itself.
All up-to-date differences are tracked in a test suite link:https://github.com/debezium/debezium/blob/main/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/DecoderDifferences.java[Java class].
@ -2115,9 +2075,9 @@ If you are using a {link-prefix}:{link-postgresql-connector}#postgresql-output-p
[source,properties]
----
# MODULES
shared_preload_libraries = 'decoderbufs,wal2json' // <1>
shared_preload_libraries = 'decoderbufs' // <1>
----
<1> Instructs the server to load the `decoderbufs` and `wal2json` logical decoding plug-ins at startup. The names of the plug-ins are set in the link:https://github.com/debezium/postgres-decoderbufs/blob/v0.3.0/Makefile[`Protobuf`] and link:https://github.com/eulerto/wal2json/blob/master/Makefile[`wal2json`] make files.
<1> Instructs the server to load the `decoderbufs` logical decoding plug-ins at startup. The names of the plug-ins are set in the link:https://github.com/debezium/postgres-decoderbufs/blob/v0.3.0/Makefile[`Protobuf`] make file.
. To configure the replication slot regardless of the decoder being used, specify the following in the `postgresql.conf` file:
+
@ -2293,11 +2253,6 @@ The database typically reclaims disk space in batch blocks. This is expected beh
** Enable periodic heartbeat record generation with the `heartbeat.interval.ms` connector configuration property.
** Regularly emit change events from the database for which {prodname} is capturing changes.
ifdef::community[]
+
In the case of `wal2json` decoder plug-in, it is sufficient to generate empty events. This can be achieved for example by truncating an empty temporary table. For other decoder plug-ins, the recommendation is to create a supplementary table for which {prodname} is not capturing changes.
endif::community[]
+
A separate process would then periodically update the table by either inserting a new row or repeatedly updating the same row.
@ -2649,9 +2604,9 @@ The following configuration properties are _required_ unless a default value is
|The name of the PostgreSQL {link-prefix}:{link-postgresql-connector}#postgresql-output-plugin[logical decoding plug-in] installed on the PostgreSQL server.
ifdef::community[]
Supported values are `decoderbufs`, `wal2json`, `+wal2json_rds+`, `+wal2json_streaming+`, `+wal2json_rds_streaming+` and `pgoutput`.
Supported values are `decoderbufs`, and `pgoutput`.
If you are using a `wal2json` plug-in and transactions are very large, the JSON batch event that contains all transaction changes might not fit into the hard-coded memory buffer, which has a size of 1 GB. In such cases, switch to a streaming plug-in, by setting the `plugin-name` property to `wal2json_streaming` or `wal2json_rds_streaming`. With a streaming plug-in, PostgreSQL sends the connector a separate message for each change in a transaction.
The `wal2json` plug-in is deprecated and scheduled for removal.
endif::community[]
ifdef::product[]
@ -3114,10 +3069,6 @@ become outdated if TOASTable columns are dropped from the table.
|No default
|Semicolon separated list of parameters to pass to the configured logical decoding plug-in. For example, `add-tables=public.table,public.table2;include-lsn=true`.
ifdef::community[]
If you are using the `wal2json` plug-in, this property is useful for enabling server-side table filtering. Allowed values depend on the configured plug-in.
endif::community[]
|[[postgresql-property-sanitize-field-names]]<<postgresql-property-sanitize-field-names, `+sanitize.field.names+`>>
|`true` if connector configuration sets the `key.converter` or `value.converter` property to the Avro converter.