DBZ-1707 Ensuring correct Kafka record timestamp for outbox events;

Depending on the width of the configured TIMESTAMP column, a microsecond or nanosecond timestamp could have been returned before, whereas this should always be milliseconds.
This commit is contained in:
Gunnar Morling 2020-01-14 10:50:38 +01:00
parent 1d41d53636
commit 07e89de91d
3 changed files with 80 additions and 10 deletions

View File

@ -170,7 +170,6 @@ public void shouldSendEventTypeAsHeader() throws Exception {
assertThat(value).isInstanceOf(String.class);
JsonNode payload = (new ObjectMapper()).readTree((String) value);
assertThat(payload.get("email").getTextValue()).isEqualTo("gh@mefi.in");
}
@Test
@ -224,7 +223,7 @@ public void shouldSupportAllFeatures() throws Exception {
"UserEmail",
"7bdf2e9e",
"{\"email\": \"gh@mefi.in\"}",
", 1, true, TIMESTAMP '2019-03-24 20:52:59'"));
", 1, true, TIMESTAMP(3) '2019-03-24 20:52:59'"));
SourceRecords actualRecords = consumeRecordsByTopic(1);
assertThat(actualRecords.topics().size()).isEqualTo(1);
@ -245,7 +244,7 @@ public void shouldSupportAllFeatures() throws Exception {
assertConnectSchemasAreEqual(null, eventRouted.valueSchema(), expectedSchema);
assertThat(eventRouted.timestamp()).isEqualTo(1553460779000000L);
assertThat(eventRouted.timestamp()).isEqualTo(1553460779000L);
assertThat(eventRouted.topic()).isEqualTo("outbox.event.UserEmail");
// Validate headers
@ -271,6 +270,38 @@ public void shouldSupportAllFeatures() throws Exception {
assertThat(valueStruct.getBoolean("deleted")).isEqualTo(false);
}
@Test
@FixFor("DBZ-1707")
public void shouldConvertMicroSecondsTimestampToMilliSeconds() throws Exception {
startConnectorWithNoSnapshot();
outboxEventRouter = new EventRouter<>();
final Map<String, String> config = new HashMap<>();
config.put("table.field.event.timestamp", "createdat");
outboxEventRouter.configure(config);
TestHelper.execute("ALTER TABLE outboxsmtit.outbox add createdat timestamp without time zone not null;");
TestHelper.execute(createEventInsert(
UUID.fromString("f9171eb6-19f3-4579-9206-0e179d2ebad7"),
"UserUpdated",
"UserEmail",
"7bdf2e9e",
"{\"email\": \"gh@mefi.in\"}",
", TIMESTAMP '2019-03-24 20:52:59'"));
SourceRecords actualRecords = consumeRecordsByTopic(1);
assertThat(actualRecords.topics().size()).isEqualTo(1);
SourceRecord newEventRecord = actualRecords.recordsForTopic(topicName("outboxsmtit.outbox")).get(0);
SourceRecord eventRouted = outboxEventRouter.apply(newEventRecord);
// expecting microseconds value emitted for TIMESTAMP column without width to be
// converted to milliseconds, as that's the standard semantics of that property
// in Kafka
assertThat(eventRouted.timestamp()).isEqualTo(1553460779000L);
}
@Test
@FixFor("DBZ-1320")
public void shouldNotProduceTombstoneEventForNullPayload() throws Exception {
@ -310,7 +341,7 @@ public void shouldNotProduceTombstoneEventForNullPayload() throws Exception {
// Validate metadata
assertThat(eventRouted.valueSchema()).isNotNull();
assertThat(eventRouted.timestamp()).isEqualTo(1553460779000000L);
assertThat(eventRouted.timestamp()).isEqualTo(1553460779000L);
assertThat(eventRouted.topic()).isEqualTo("outbox.event.UserEmail");
// Validate headers
@ -373,7 +404,7 @@ public void shouldProduceTombstoneEventForNullPayload() throws Exception {
// Validate metadata
assertThat(eventRouted.valueSchema()).isNull();
assertThat(eventRouted.timestamp()).isEqualTo(1553460779000000L);
assertThat(eventRouted.timestamp()).isEqualTo(1553460779000L);
assertThat(eventRouted.topic()).isEqualTo("outbox.event.UserEmail");
// Validate headers

View File

@ -29,6 +29,9 @@
import io.debezium.annotation.Incubating;
import io.debezium.config.Configuration;
import io.debezium.data.Envelope;
import io.debezium.time.MicroTimestamp;
import io.debezium.time.NanoTimestamp;
import io.debezium.time.Timestamp;
import io.debezium.transforms.SmtManager;
import io.debezium.transforms.outbox.EventRouterConfigDefinition.AdditionalField;
@ -107,10 +110,7 @@ public R apply(R r) {
}
Schema payloadSchema = payloadField.schema();
Long timestamp = fieldEventTimestamp == null
? debeziumEventValue.getInt64("ts_ms")
: eventStruct.getInt64(fieldEventTimestamp);
Long timestamp = getEventTimestampMs(debeziumEventValue, eventStruct);
Object eventId = eventStruct.get(fieldEventId);
Object payload = eventStruct.get(fieldPayload);
Object payloadId = eventStruct.get(fieldPayloadId);
@ -177,6 +177,44 @@ else if (onlyHeadersInOutputMessage) {
return regexRouter.apply(newRecord);
}
/**
* Returns the Kafka record timestamp for the outgoing record.
* Either obtained from the configured field or the timestamp when Debezium processed the event.
*/
private Long getEventTimestampMs(Struct debeziumEventValue, Struct eventStruct) {
if (fieldEventTimestamp == null) {
return debeziumEventValue.getInt64("ts_ms");
}
Field timestampField = eventStruct.schema().field(fieldEventTimestamp);
if (timestampField == null) {
throw new ConnectException(String.format("Unable to find timestamp field %s in event", fieldEventTimestamp));
}
Long timestamp = eventStruct.getInt64(fieldEventTimestamp);
if (timestamp == null) {
return debeziumEventValue.getInt64("ts_ms");
}
String schemaName = timestampField.schema().name();
if (schemaName == null) {
throw new ConnectException(String.format("Unsupported field type %s (without logical schema name) for event timestamp", timestampField.schema().type()));
}
// not going through Instant here for the sake of performance
switch (schemaName) {
case Timestamp.SCHEMA_NAME:
return timestamp;
case MicroTimestamp.SCHEMA_NAME:
return timestamp / 1_000;
case NanoTimestamp.SCHEMA_NAME:
return timestamp / 1_000_000;
default:
throw new ConnectException(String.format("Unsupported field type %s for event timestamp", schemaName));
}
}
private Object defineRecordKey(Struct eventStruct, Object fallbackKey) {
Object eventKey = null;
if (fieldEventKey != null) {

View File

@ -28,6 +28,7 @@
import io.debezium.data.Envelope;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.time.Timestamp;
/**
* Unit tests for {@link EventRouter}
@ -344,7 +345,7 @@ public void canSetTimestampByUserDefinedConfiguration() {
Long expectedTimestamp = 14222264625338L;
Map<String, Schema> extraFields = new HashMap<>();
extraFields.put("event_timestamp", Schema.INT64_SCHEMA);
extraFields.put("event_timestamp", Timestamp.schema());
Map<String, Object> extraValues = new HashMap<>();
extraValues.put("event_timestamp", expectedTimestamp);