diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresDefaultValueConverter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresDefaultValueConverter.java index 7cbbaf807..bdb391f4c 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresDefaultValueConverter.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresDefaultValueConverter.java @@ -44,6 +44,8 @@ public class PostgresDefaultValueConverter implements DefaultValueConverter { private static final Pattern LITERAL_DEFAULT_PATTERN = Pattern.compile("'(.*?)'"); private static final Pattern FUNCTION_DEFAULT_PATTERN = Pattern.compile("^[(]?[A-Za-z0-9_.]+\\((?:.+(?:, ?.+)*)?\\)"); + private static final Set CURRENT_DATE_TIMES = Collect.unmodifiableSet("current_timestamp", + "current_time", "current_date", "localtime", "localtimestamp"); private static final Set TRIM_DATA_TYPES = Collect.unmodifiableSet("bit", "varbit", "bool", "numeric", "float4", "float8", "int2", "int4", "serial", "int8", "bigserial", "smallserial", "uuid", "date", "time", "timestamp", "timestamptz", "interval"); @@ -201,7 +203,7 @@ private static String extractDefault(String defaultValue) { // If the default value is generated by a function, map a placeholder value for the schema private static String extractDefault(String defaultValue, String generatedValuePlaceholder) { final Matcher functionMatcher = FUNCTION_DEFAULT_PATTERN.matcher(defaultValue); - if (functionMatcher.find()) { + if (functionMatcher.find() || CURRENT_DATE_TIMES.contains(defaultValue.toLowerCase())) { return generatedValuePlaceholder; } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresDefaultValueConverterIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresDefaultValueConverterIT.java index c2860696b..8e5cb8b48 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresDefaultValueConverterIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresDefaultValueConverterIT.java @@ -46,21 +46,10 @@ public void after() { } @Test - @FixFor("DBZ-4736") + @FixFor({ "DBZ-4736", "DBZ-5384" }) public void shouldSetTheNullValueInSnapshot() throws Exception { - final String dml = "INSERT INTO s1.a (pk) VALUES (1);"; - final String ddl = "DROP SCHEMA IF EXISTS s1 CASCADE;" + - "CREATE SCHEMA s1; " + - "CREATE TABLE s1.a (pk SERIAL, dint integer DEFAULT NULL::integer, " - + "dvc1 varchar(64) DEFAULT NULL::character varying, " - + "dvc2 varchar(64) DEFAULT 'NULL', " - + "dvc3 varchar(64) DEFAULT 'MYVALUE', " - + "dvc4 varchar(64) DEFAULT 'NULL'::character varying, " - + "dvc5 varchar(64) DEFAULT 'NULL::character varying', " - + "dvc6 varchar(64) DEFAULT NULL, PRIMARY KEY(pk));"; + createTableAndInsertData(); - TestHelper.execute(ddl); - TestHelper.execute(dml); Configuration.Builder configBuilder = TestHelper.defaultConfig() .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL.getValue()) .with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "s1"); @@ -72,39 +61,12 @@ public void shouldSetTheNullValueInSnapshot() throws Exception { Assertions.assertThat(records.recordsForTopic("test_server.s1.a")).hasSize(1); final SourceRecord sourceRecord = records.allRecordsInOrder().get(0); - final Schema valueSchema = sourceRecord.valueSchema(); - - Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getInt32("dint")).isNull(); - Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc1")).isNull(); - Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc2")).isEqualTo("NULL"); - Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc3")).isEqualTo("MYVALUE"); - Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc4")).isEqualTo("NULL"); - Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc5")).isEqualTo("NULL::character varying"); - Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc6")).isNull(); - - Assertions.assertThat(valueSchema.field("after").schema().field("dint").schema().defaultValue()).isNull(); - Assertions.assertThat(valueSchema.field("after").schema().field("dvc1").schema().defaultValue()).isNull(); - Assertions.assertThat(valueSchema.field("after").schema().field("dvc2").schema().defaultValue()).isEqualTo("NULL"); - Assertions.assertThat(valueSchema.field("after").schema().field("dvc3").schema().defaultValue()).isEqualTo("MYVALUE"); - Assertions.assertThat(valueSchema.field("after").schema().field("dvc4").schema().defaultValue()).isEqualTo("NULL"); - Assertions.assertThat(valueSchema.field("after").schema().field("dvc5").schema().defaultValue()).isEqualTo("NULL::character varying"); - Assertions.assertThat(valueSchema.field("after").schema().field("dvc6").schema().defaultValue()).isNull(); + assertDefaultValueChangeRecord(sourceRecord); } @Test - @FixFor("DBZ-4736") + @FixFor({ "DBZ-4736", "DBZ-5384" }) public void shouldSetTheNullValueInStreaming() throws Exception { - final String dml = "INSERT INTO s1.a (pk) VALUES (1);"; - final String ddl = "DROP SCHEMA IF EXISTS s1 CASCADE;" + - "CREATE SCHEMA s1; " + - "CREATE TABLE s1.a (pk SERIAL, dint integer DEFAULT NULL::integer, " - + "dvc1 varchar(64) DEFAULT NULL::character varying, " - + "dvc2 varchar(64) DEFAULT 'NULL', " - + "dvc3 varchar(64) DEFAULT 'MYVALUE', " - + "dvc4 varchar(64) DEFAULT 'NULL'::character varying, " - + "dvc5 varchar(64) DEFAULT 'NULL::character varying', " - + "dvc6 varchar(64) DEFAULT NULL, PRIMARY KEY(pk));"; - Configuration.Builder configBuilder = TestHelper.defaultConfig() .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL.getValue()) .with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "s1"); @@ -113,30 +75,13 @@ public void shouldSetTheNullValueInStreaming() throws Exception { waitForSnapshotToBeCompleted("postgres", TestHelper.TEST_SERVER); - TestHelper.execute(ddl); - TestHelper.execute(dml); + createTableAndInsertData(); final SourceRecords records = consumeRecordsByTopic(1); Assertions.assertThat(records.recordsForTopic("test_server.s1.a")).hasSize(1); final SourceRecord sourceRecord = records.allRecordsInOrder().get(0); - final Schema valueSchema = sourceRecord.valueSchema(); - - Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getInt32("dint")).isNull(); - Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc1")).isNull(); - Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc2")).isEqualTo("NULL"); - Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc3")).isEqualTo("MYVALUE"); - Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc4")).isEqualTo("NULL"); - Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc5")).isEqualTo("NULL::character varying"); - Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc6")).isNull(); - - Assertions.assertThat(valueSchema.field("after").schema().field("dint").schema().defaultValue()).isNull(); - Assertions.assertThat(valueSchema.field("after").schema().field("dvc1").schema().defaultValue()).isNull(); - Assertions.assertThat(valueSchema.field("after").schema().field("dvc2").schema().defaultValue()).isEqualTo("NULL"); - Assertions.assertThat(valueSchema.field("after").schema().field("dvc3").schema().defaultValue()).isEqualTo("MYVALUE"); - Assertions.assertThat(valueSchema.field("after").schema().field("dvc4").schema().defaultValue()).isEqualTo("NULL"); - Assertions.assertThat(valueSchema.field("after").schema().field("dvc5").schema().defaultValue()).isEqualTo("NULL::character varying"); - Assertions.assertThat(valueSchema.field("after").schema().field("dvc6").schema().defaultValue()).isNull(); + assertDefaultValueChangeRecord(sourceRecord); } @Test @@ -176,4 +121,49 @@ public void testShouldHandleDefaultValueFunctionsWithSchemaPrefixes() throws Exc final Object defaultValue = after.schema().field("id").schema().defaultValue(); assertThat(defaultValue).isEqualTo("00000000-0000-0000-0000-000000000000"); } + + private void createTableAndInsertData() { + final String dml = "INSERT INTO s1.a (pk) VALUES (1);"; + final String ddl = "DROP SCHEMA IF EXISTS s1 CASCADE;" + + "CREATE SCHEMA s1; " + + "CREATE TABLE s1.a (pk SERIAL, dint integer DEFAULT NULL::integer, " + + "dvc1 varchar(64) DEFAULT NULL::character varying, " + + "dvc2 varchar(64) DEFAULT 'NULL', " + + "dvc3 varchar(64) DEFAULT 'MYVALUE', " + + "dvc4 varchar(64) DEFAULT 'NULL'::character varying, " + + "dvc5 varchar(64) DEFAULT 'NULL::character varying', " + + "dvc6 varchar(64) DEFAULT NULL, " + + "dt1 timestamp DEFAULT CURRENT_TIMESTAMP, " + + "dt2 date DEFAULT CURRENT_DATE, " + + "dt3 time DEFAULT CURRENT_TIME, " + + "PRIMARY KEY(pk));"; + TestHelper.execute(ddl); + TestHelper.execute(dml); + } + + private void assertDefaultValueChangeRecord(SourceRecord sourceRecord) { + final Schema valueSchema = sourceRecord.valueSchema(); + + Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getInt32("dint")).isNull(); + Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc1")).isNull(); + Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc2")).isEqualTo("NULL"); + Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc3")).isEqualTo("MYVALUE"); + Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc4")).isEqualTo("NULL"); + Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc5")).isEqualTo("NULL::character varying"); + Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc6")).isNull(); + Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getInt64("dt1")).isNotNull(); + Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getInt32("dt2")).isNotNull(); + Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getInt64("dt3")).isNotNull(); + + Assertions.assertThat(valueSchema.field("after").schema().field("dint").schema().defaultValue()).isNull(); + Assertions.assertThat(valueSchema.field("after").schema().field("dvc1").schema().defaultValue()).isNull(); + Assertions.assertThat(valueSchema.field("after").schema().field("dvc2").schema().defaultValue()).isEqualTo("NULL"); + Assertions.assertThat(valueSchema.field("after").schema().field("dvc3").schema().defaultValue()).isEqualTo("MYVALUE"); + Assertions.assertThat(valueSchema.field("after").schema().field("dvc4").schema().defaultValue()).isEqualTo("NULL"); + Assertions.assertThat(valueSchema.field("after").schema().field("dvc5").schema().defaultValue()).isEqualTo("NULL::character varying"); + Assertions.assertThat(valueSchema.field("after").schema().field("dvc6").schema().defaultValue()).isNull(); + Assertions.assertThat(valueSchema.field("after").schema().field("dt1").schema().defaultValue()).isEqualTo(0L); + Assertions.assertThat(valueSchema.field("after").schema().field("dt2").schema().defaultValue()).isEqualTo(0); + Assertions.assertThat(valueSchema.field("after").schema().field("dt3").schema().defaultValue()).isEqualTo(0L); + } }