diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/OutboxEventRouterIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/OutboxEventRouterIT.java index 12fc207fb..e0ebfa487 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/OutboxEventRouterIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/OutboxEventRouterIT.java @@ -170,7 +170,6 @@ public void shouldSendEventTypeAsHeader() throws Exception { assertThat(value).isInstanceOf(String.class); JsonNode payload = (new ObjectMapper()).readTree((String) value); assertThat(payload.get("email").getTextValue()).isEqualTo("gh@mefi.in"); - } @Test @@ -224,7 +223,7 @@ public void shouldSupportAllFeatures() throws Exception { "UserEmail", "7bdf2e9e", "{\"email\": \"gh@mefi.in\"}", - ", 1, true, TIMESTAMP '2019-03-24 20:52:59'")); + ", 1, true, TIMESTAMP(3) '2019-03-24 20:52:59'")); SourceRecords actualRecords = consumeRecordsByTopic(1); assertThat(actualRecords.topics().size()).isEqualTo(1); @@ -245,7 +244,7 @@ public void shouldSupportAllFeatures() throws Exception { assertConnectSchemasAreEqual(null, eventRouted.valueSchema(), expectedSchema); - assertThat(eventRouted.timestamp()).isEqualTo(1553460779000000L); + assertThat(eventRouted.timestamp()).isEqualTo(1553460779000L); assertThat(eventRouted.topic()).isEqualTo("outbox.event.UserEmail"); // Validate headers @@ -271,6 +270,38 @@ public void shouldSupportAllFeatures() throws Exception { assertThat(valueStruct.getBoolean("deleted")).isEqualTo(false); } + @Test + @FixFor("DBZ-1707") + public void shouldConvertMicroSecondsTimestampToMilliSeconds() throws Exception { + startConnectorWithNoSnapshot(); + + outboxEventRouter = new EventRouter<>(); + final Map config = new HashMap<>(); + config.put("table.field.event.timestamp", "createdat"); + outboxEventRouter.configure(config); + + TestHelper.execute("ALTER TABLE outboxsmtit.outbox add createdat timestamp without time zone not null;"); + + TestHelper.execute(createEventInsert( + UUID.fromString("f9171eb6-19f3-4579-9206-0e179d2ebad7"), + "UserUpdated", + "UserEmail", + "7bdf2e9e", + "{\"email\": \"gh@mefi.in\"}", + ", TIMESTAMP '2019-03-24 20:52:59'")); + + SourceRecords actualRecords = consumeRecordsByTopic(1); + assertThat(actualRecords.topics().size()).isEqualTo(1); + + SourceRecord newEventRecord = actualRecords.recordsForTopic(topicName("outboxsmtit.outbox")).get(0); + SourceRecord eventRouted = outboxEventRouter.apply(newEventRecord); + + // expecting microseconds value emitted for TIMESTAMP column without width to be + // converted to milliseconds, as that's the standard semantics of that property + // in Kafka + assertThat(eventRouted.timestamp()).isEqualTo(1553460779000L); + } + @Test @FixFor("DBZ-1320") public void shouldNotProduceTombstoneEventForNullPayload() throws Exception { @@ -310,7 +341,7 @@ public void shouldNotProduceTombstoneEventForNullPayload() throws Exception { // Validate metadata assertThat(eventRouted.valueSchema()).isNotNull(); - assertThat(eventRouted.timestamp()).isEqualTo(1553460779000000L); + assertThat(eventRouted.timestamp()).isEqualTo(1553460779000L); assertThat(eventRouted.topic()).isEqualTo("outbox.event.UserEmail"); // Validate headers @@ -373,7 +404,7 @@ public void shouldProduceTombstoneEventForNullPayload() throws Exception { // Validate metadata assertThat(eventRouted.valueSchema()).isNull(); - assertThat(eventRouted.timestamp()).isEqualTo(1553460779000000L); + assertThat(eventRouted.timestamp()).isEqualTo(1553460779000L); assertThat(eventRouted.topic()).isEqualTo("outbox.event.UserEmail"); // Validate headers diff --git a/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouter.java b/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouter.java index a82d3f7df..0d13df767 100644 --- a/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouter.java +++ b/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouter.java @@ -29,6 +29,9 @@ import io.debezium.annotation.Incubating; import io.debezium.config.Configuration; import io.debezium.data.Envelope; +import io.debezium.time.MicroTimestamp; +import io.debezium.time.NanoTimestamp; +import io.debezium.time.Timestamp; import io.debezium.transforms.SmtManager; import io.debezium.transforms.outbox.EventRouterConfigDefinition.AdditionalField; @@ -107,10 +110,7 @@ public R apply(R r) { } Schema payloadSchema = payloadField.schema(); - Long timestamp = fieldEventTimestamp == null - ? debeziumEventValue.getInt64("ts_ms") - : eventStruct.getInt64(fieldEventTimestamp); - + Long timestamp = getEventTimestampMs(debeziumEventValue, eventStruct); Object eventId = eventStruct.get(fieldEventId); Object payload = eventStruct.get(fieldPayload); Object payloadId = eventStruct.get(fieldPayloadId); @@ -177,6 +177,44 @@ else if (onlyHeadersInOutputMessage) { return regexRouter.apply(newRecord); } + /** + * Returns the Kafka record timestamp for the outgoing record. + * Either obtained from the configured field or the timestamp when Debezium processed the event. + */ + private Long getEventTimestampMs(Struct debeziumEventValue, Struct eventStruct) { + if (fieldEventTimestamp == null) { + return debeziumEventValue.getInt64("ts_ms"); + } + + Field timestampField = eventStruct.schema().field(fieldEventTimestamp); + if (timestampField == null) { + throw new ConnectException(String.format("Unable to find timestamp field %s in event", fieldEventTimestamp)); + } + + Long timestamp = eventStruct.getInt64(fieldEventTimestamp); + if (timestamp == null) { + return debeziumEventValue.getInt64("ts_ms"); + } + + String schemaName = timestampField.schema().name(); + + if (schemaName == null) { + throw new ConnectException(String.format("Unsupported field type %s (without logical schema name) for event timestamp", timestampField.schema().type())); + } + + // not going through Instant here for the sake of performance + switch (schemaName) { + case Timestamp.SCHEMA_NAME: + return timestamp; + case MicroTimestamp.SCHEMA_NAME: + return timestamp / 1_000; + case NanoTimestamp.SCHEMA_NAME: + return timestamp / 1_000_000; + default: + throw new ConnectException(String.format("Unsupported field type %s for event timestamp", schemaName)); + } + } + private Object defineRecordKey(Struct eventStruct, Object fallbackKey) { Object eventKey = null; if (fieldEventKey != null) { diff --git a/debezium-core/src/test/java/io/debezium/transforms/outbox/EventRouterTest.java b/debezium-core/src/test/java/io/debezium/transforms/outbox/EventRouterTest.java index c3313363a..492bd741f 100644 --- a/debezium-core/src/test/java/io/debezium/transforms/outbox/EventRouterTest.java +++ b/debezium-core/src/test/java/io/debezium/transforms/outbox/EventRouterTest.java @@ -28,6 +28,7 @@ import io.debezium.data.Envelope; import io.debezium.data.VerifyRecord; import io.debezium.doc.FixFor; +import io.debezium.time.Timestamp; /** * Unit tests for {@link EventRouter} @@ -344,7 +345,7 @@ public void canSetTimestampByUserDefinedConfiguration() { Long expectedTimestamp = 14222264625338L; Map extraFields = new HashMap<>(); - extraFields.put("event_timestamp", Schema.INT64_SCHEMA); + extraFields.put("event_timestamp", Timestamp.schema()); Map extraValues = new HashMap<>(); extraValues.put("event_timestamp", expectedTimestamp);