DBZ-5384 org.postgresql.util.PSQLException: Bad value for type timestamp/date/time: CURRENT_TIMESTAMP

This commit is contained in:
harveyyue 2022-07-12 22:53:26 +08:00 committed by Jiri Pechanec
parent e8405bc376
commit 7af6037078
2 changed files with 54 additions and 62 deletions

View File

@ -44,6 +44,8 @@ public class PostgresDefaultValueConverter implements DefaultValueConverter {
private static final Pattern LITERAL_DEFAULT_PATTERN = Pattern.compile("'(.*?)'"); private static final Pattern LITERAL_DEFAULT_PATTERN = Pattern.compile("'(.*?)'");
private static final Pattern FUNCTION_DEFAULT_PATTERN = Pattern.compile("^[(]?[A-Za-z0-9_.]+\\((?:.+(?:, ?.+)*)?\\)"); private static final Pattern FUNCTION_DEFAULT_PATTERN = Pattern.compile("^[(]?[A-Za-z0-9_.]+\\((?:.+(?:, ?.+)*)?\\)");
private static final Set<String> CURRENT_DATE_TIMES = Collect.unmodifiableSet("current_timestamp",
"current_time", "current_date", "localtime", "localtimestamp");
private static final Set<String> TRIM_DATA_TYPES = Collect.unmodifiableSet("bit", "varbit", "bool", "numeric", private static final Set<String> TRIM_DATA_TYPES = Collect.unmodifiableSet("bit", "varbit", "bool", "numeric",
"float4", "float8", "int2", "int4", "serial", "int8", "bigserial", "smallserial", "uuid", "date", "time", "float4", "float8", "int2", "int4", "serial", "int8", "bigserial", "smallserial", "uuid", "date", "time",
"timestamp", "timestamptz", "interval"); "timestamp", "timestamptz", "interval");
@ -201,7 +203,7 @@ private static String extractDefault(String defaultValue) {
// If the default value is generated by a function, map a placeholder value for the schema // If the default value is generated by a function, map a placeholder value for the schema
private static String extractDefault(String defaultValue, String generatedValuePlaceholder) { private static String extractDefault(String defaultValue, String generatedValuePlaceholder) {
final Matcher functionMatcher = FUNCTION_DEFAULT_PATTERN.matcher(defaultValue); final Matcher functionMatcher = FUNCTION_DEFAULT_PATTERN.matcher(defaultValue);
if (functionMatcher.find()) { if (functionMatcher.find() || CURRENT_DATE_TIMES.contains(defaultValue.toLowerCase())) {
return generatedValuePlaceholder; return generatedValuePlaceholder;
} }

View File

@ -46,21 +46,10 @@ public void after() {
} }
@Test @Test
@FixFor("DBZ-4736") @FixFor({ "DBZ-4736", "DBZ-5384" })
public void shouldSetTheNullValueInSnapshot() throws Exception { public void shouldSetTheNullValueInSnapshot() throws Exception {
final String dml = "INSERT INTO s1.a (pk) VALUES (1);"; createTableAndInsertData();
final String ddl = "DROP SCHEMA IF EXISTS s1 CASCADE;" +
"CREATE SCHEMA s1; " +
"CREATE TABLE s1.a (pk SERIAL, dint integer DEFAULT NULL::integer, "
+ "dvc1 varchar(64) DEFAULT NULL::character varying, "
+ "dvc2 varchar(64) DEFAULT 'NULL', "
+ "dvc3 varchar(64) DEFAULT 'MYVALUE', "
+ "dvc4 varchar(64) DEFAULT 'NULL'::character varying, "
+ "dvc5 varchar(64) DEFAULT 'NULL::character varying', "
+ "dvc6 varchar(64) DEFAULT NULL, PRIMARY KEY(pk));";
TestHelper.execute(ddl);
TestHelper.execute(dml);
Configuration.Builder configBuilder = TestHelper.defaultConfig() Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL.getValue()) .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL.getValue())
.with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "s1"); .with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "s1");
@ -72,39 +61,12 @@ public void shouldSetTheNullValueInSnapshot() throws Exception {
Assertions.assertThat(records.recordsForTopic("test_server.s1.a")).hasSize(1); Assertions.assertThat(records.recordsForTopic("test_server.s1.a")).hasSize(1);
final SourceRecord sourceRecord = records.allRecordsInOrder().get(0); final SourceRecord sourceRecord = records.allRecordsInOrder().get(0);
final Schema valueSchema = sourceRecord.valueSchema(); assertDefaultValueChangeRecord(sourceRecord);
Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getInt32("dint")).isNull();
Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc1")).isNull();
Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc2")).isEqualTo("NULL");
Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc3")).isEqualTo("MYVALUE");
Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc4")).isEqualTo("NULL");
Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc5")).isEqualTo("NULL::character varying");
Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc6")).isNull();
Assertions.assertThat(valueSchema.field("after").schema().field("dint").schema().defaultValue()).isNull();
Assertions.assertThat(valueSchema.field("after").schema().field("dvc1").schema().defaultValue()).isNull();
Assertions.assertThat(valueSchema.field("after").schema().field("dvc2").schema().defaultValue()).isEqualTo("NULL");
Assertions.assertThat(valueSchema.field("after").schema().field("dvc3").schema().defaultValue()).isEqualTo("MYVALUE");
Assertions.assertThat(valueSchema.field("after").schema().field("dvc4").schema().defaultValue()).isEqualTo("NULL");
Assertions.assertThat(valueSchema.field("after").schema().field("dvc5").schema().defaultValue()).isEqualTo("NULL::character varying");
Assertions.assertThat(valueSchema.field("after").schema().field("dvc6").schema().defaultValue()).isNull();
} }
@Test @Test
@FixFor("DBZ-4736") @FixFor({ "DBZ-4736", "DBZ-5384" })
public void shouldSetTheNullValueInStreaming() throws Exception { public void shouldSetTheNullValueInStreaming() throws Exception {
final String dml = "INSERT INTO s1.a (pk) VALUES (1);";
final String ddl = "DROP SCHEMA IF EXISTS s1 CASCADE;" +
"CREATE SCHEMA s1; " +
"CREATE TABLE s1.a (pk SERIAL, dint integer DEFAULT NULL::integer, "
+ "dvc1 varchar(64) DEFAULT NULL::character varying, "
+ "dvc2 varchar(64) DEFAULT 'NULL', "
+ "dvc3 varchar(64) DEFAULT 'MYVALUE', "
+ "dvc4 varchar(64) DEFAULT 'NULL'::character varying, "
+ "dvc5 varchar(64) DEFAULT 'NULL::character varying', "
+ "dvc6 varchar(64) DEFAULT NULL, PRIMARY KEY(pk));";
Configuration.Builder configBuilder = TestHelper.defaultConfig() Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL.getValue()) .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL.getValue())
.with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "s1"); .with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "s1");
@ -113,30 +75,13 @@ public void shouldSetTheNullValueInStreaming() throws Exception {
waitForSnapshotToBeCompleted("postgres", TestHelper.TEST_SERVER); waitForSnapshotToBeCompleted("postgres", TestHelper.TEST_SERVER);
TestHelper.execute(ddl); createTableAndInsertData();
TestHelper.execute(dml);
final SourceRecords records = consumeRecordsByTopic(1); final SourceRecords records = consumeRecordsByTopic(1);
Assertions.assertThat(records.recordsForTopic("test_server.s1.a")).hasSize(1); Assertions.assertThat(records.recordsForTopic("test_server.s1.a")).hasSize(1);
final SourceRecord sourceRecord = records.allRecordsInOrder().get(0); final SourceRecord sourceRecord = records.allRecordsInOrder().get(0);
final Schema valueSchema = sourceRecord.valueSchema(); assertDefaultValueChangeRecord(sourceRecord);
Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getInt32("dint")).isNull();
Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc1")).isNull();
Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc2")).isEqualTo("NULL");
Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc3")).isEqualTo("MYVALUE");
Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc4")).isEqualTo("NULL");
Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc5")).isEqualTo("NULL::character varying");
Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc6")).isNull();
Assertions.assertThat(valueSchema.field("after").schema().field("dint").schema().defaultValue()).isNull();
Assertions.assertThat(valueSchema.field("after").schema().field("dvc1").schema().defaultValue()).isNull();
Assertions.assertThat(valueSchema.field("after").schema().field("dvc2").schema().defaultValue()).isEqualTo("NULL");
Assertions.assertThat(valueSchema.field("after").schema().field("dvc3").schema().defaultValue()).isEqualTo("MYVALUE");
Assertions.assertThat(valueSchema.field("after").schema().field("dvc4").schema().defaultValue()).isEqualTo("NULL");
Assertions.assertThat(valueSchema.field("after").schema().field("dvc5").schema().defaultValue()).isEqualTo("NULL::character varying");
Assertions.assertThat(valueSchema.field("after").schema().field("dvc6").schema().defaultValue()).isNull();
} }
@Test @Test
@ -176,4 +121,49 @@ public void testShouldHandleDefaultValueFunctionsWithSchemaPrefixes() throws Exc
final Object defaultValue = after.schema().field("id").schema().defaultValue(); final Object defaultValue = after.schema().field("id").schema().defaultValue();
assertThat(defaultValue).isEqualTo("00000000-0000-0000-0000-000000000000"); assertThat(defaultValue).isEqualTo("00000000-0000-0000-0000-000000000000");
} }
private void createTableAndInsertData() {
final String dml = "INSERT INTO s1.a (pk) VALUES (1);";
final String ddl = "DROP SCHEMA IF EXISTS s1 CASCADE;" +
"CREATE SCHEMA s1; " +
"CREATE TABLE s1.a (pk SERIAL, dint integer DEFAULT NULL::integer, "
+ "dvc1 varchar(64) DEFAULT NULL::character varying, "
+ "dvc2 varchar(64) DEFAULT 'NULL', "
+ "dvc3 varchar(64) DEFAULT 'MYVALUE', "
+ "dvc4 varchar(64) DEFAULT 'NULL'::character varying, "
+ "dvc5 varchar(64) DEFAULT 'NULL::character varying', "
+ "dvc6 varchar(64) DEFAULT NULL, "
+ "dt1 timestamp DEFAULT CURRENT_TIMESTAMP, "
+ "dt2 date DEFAULT CURRENT_DATE, "
+ "dt3 time DEFAULT CURRENT_TIME, "
+ "PRIMARY KEY(pk));";
TestHelper.execute(ddl);
TestHelper.execute(dml);
}
private void assertDefaultValueChangeRecord(SourceRecord sourceRecord) {
final Schema valueSchema = sourceRecord.valueSchema();
Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getInt32("dint")).isNull();
Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc1")).isNull();
Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc2")).isEqualTo("NULL");
Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc3")).isEqualTo("MYVALUE");
Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc4")).isEqualTo("NULL");
Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc5")).isEqualTo("NULL::character varying");
Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc6")).isNull();
Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getInt64("dt1")).isNotNull();
Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getInt32("dt2")).isNotNull();
Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getInt64("dt3")).isNotNull();
Assertions.assertThat(valueSchema.field("after").schema().field("dint").schema().defaultValue()).isNull();
Assertions.assertThat(valueSchema.field("after").schema().field("dvc1").schema().defaultValue()).isNull();
Assertions.assertThat(valueSchema.field("after").schema().field("dvc2").schema().defaultValue()).isEqualTo("NULL");
Assertions.assertThat(valueSchema.field("after").schema().field("dvc3").schema().defaultValue()).isEqualTo("MYVALUE");
Assertions.assertThat(valueSchema.field("after").schema().field("dvc4").schema().defaultValue()).isEqualTo("NULL");
Assertions.assertThat(valueSchema.field("after").schema().field("dvc5").schema().defaultValue()).isEqualTo("NULL::character varying");
Assertions.assertThat(valueSchema.field("after").schema().field("dvc6").schema().defaultValue()).isNull();
Assertions.assertThat(valueSchema.field("after").schema().field("dt1").schema().defaultValue()).isEqualTo(0L);
Assertions.assertThat(valueSchema.field("after").schema().field("dt2").schema().defaultValue()).isEqualTo(0);
Assertions.assertThat(valueSchema.field("after").schema().field("dt3").schema().defaultValue()).isEqualTo(0L);
}
} }