DBZ-1292 Using formatted timestamp for CE time field

This commit is contained in:
GraySmog 2019-12-16 04:22:52 +08:00 committed by Gunnar Morling
parent 87ffe0c44e
commit 8b7c67cbdd
2 changed files with 9 additions and 5 deletions

View File

@ -371,8 +371,7 @@ private SchemaAndValue convertToCloudEventsFormat(CloudEventsMaker maker, Schema
ceValueBuilder.withValue(CloudEventsMaker.FieldName.TIME, maker.ceTime())
.withValue(CloudEventsMaker.FieldName.EXTRAINFO, maker.ceExtrainfo())
.withValue(CloudEventsMaker.FieldName.DATA, serializedData)
.build();
.withValue(CloudEventsMaker.FieldName.DATA, serializedData);
return new SchemaAndValue(ceSchema, ceValueBuilder.build());
}

View File

@ -5,7 +5,9 @@
*/
package io.debezium.cloudevents;
import java.text.SimpleDateFormat;
import java.util.Map;
import java.util.TimeZone;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
@ -151,12 +153,15 @@ public String ceDataschema() {
}
/**
* Get the time of CloudEvents envelope.
* Get the timestamp of CloudEvents envelope using the format defined in RFC 3339.
*
* @return the time of CloudEvents envelope
* @return the timestamp of CloudEvents envelope
*/
public String ceTime() {
return recordParser.getMetadata(AbstractSourceInfo.TIMESTAMP_KEY).toString();
long time = (long) recordParser.getMetadata(AbstractSourceInfo.TIMESTAMP_KEY);
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
formatter.setTimeZone(TimeZone.getTimeZone("UTC"));
return formatter.format(time);
}
/**