From 8022dcc75d6525cb42df79debf75005ba817a0ed Mon Sep 17 00:00:00 2001 From: Bingqin Zhou Date: Wed, 11 Sep 2019 13:48:14 -0700 Subject: [PATCH] DBZ-1405 Add back Kafka Record Interface. --- .../debezium/connector/cassandra/CellData.java | 6 +++++- .../connector/cassandra/KafkaRecord.java | 17 +++++++++++++++++ .../debezium/connector/cassandra/RowData.java | 6 +++++- .../connector/cassandra/SourceInfo.java | 6 +++++- 4 files changed, 32 insertions(+), 3 deletions(-) create mode 100644 debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/KafkaRecord.java diff --git a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/CellData.java b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/CellData.java index 670e37685..5f72f03e9 100644 --- a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/CellData.java +++ b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/CellData.java @@ -19,7 +19,7 @@ * Cell-level data about the source event. Each cell contains the name, value and * type of a column in a Cassandra table. */ -public class CellData { +public class CellData implements KafkaRecord { /** * The type of a column in a Cassandra table */ @@ -62,6 +62,7 @@ public boolean isPrimary() { return columnType == ColumnType.PARTITION || columnType == ColumnType.CLUSTERING; } + @Override public Struct record(Schema schema) { return new Struct(schema) .put(CELL_VALUE_KEY, value) @@ -86,6 +87,7 @@ static Schema cellSchema(ColumnMetadata cm, boolean optional) { } } + @Override public boolean equals(Object o) { if (this == o) { return true; @@ -100,10 +102,12 @@ public boolean equals(Object o) { && columnType == that.columnType; } + @Override public int hashCode() { return Objects.hash(name, value, deletionTs, columnType); } + @Override public String toString() { return "{" + "name=" + name diff --git a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/KafkaRecord.java b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/KafkaRecord.java new file mode 100644 index 000000000..62ceb233a --- /dev/null +++ b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/KafkaRecord.java @@ -0,0 +1,17 @@ +package io.debezium.connector.cassandra; + +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.data.Schema; + + +/** + * An interface that indicates the record can be converted to a {@link Struct}. + */ +public interface KafkaRecord { + /** + * return an kafka connect Struct based on the schema passed into the method + * @param schema of the Struct + * @return a Struct + */ + Struct record(Schema schema); +} diff --git a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/RowData.java b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/RowData.java index abfa4ebc8..1d3014b18 100644 --- a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/RowData.java +++ b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/RowData.java @@ -24,7 +24,7 @@ * Row-level data about the source event. Contains a map where the key is the table column * name and the value is the {@link CellData}. */ -public class RowData { +public class RowData implements KafkaRecord { private final Map cellMap = new LinkedHashMap<>(); public void addCell(CellData cellData) { @@ -41,6 +41,7 @@ public boolean hasCell(String columnName) { return cellMap.containsKey(columnName); } + @Override public Struct record(Schema schema) { Struct struct = new Struct(schema); for (Field field : schema.fields()) { @@ -83,10 +84,12 @@ List getPrimary() { return this.cellMap.values().stream().filter(CellData::isPrimary).collect(Collectors.toList()); } + @Override public String toString() { return this.cellMap.toString(); } + @Override public boolean equals(Object o) { if (this == o) { return true; @@ -98,6 +101,7 @@ public boolean equals(Object o) { return Objects.equals(cellMap, rowData.cellMap); } + @Override public int hashCode() { return Objects.hash(cellMap); } diff --git a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/SourceInfo.java b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/SourceInfo.java index 157f134ba..9ac46a710 100644 --- a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/SourceInfo.java +++ b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/SourceInfo.java @@ -17,7 +17,7 @@ /** * Metadata about the source of the change event */ -public class SourceInfo { +public class SourceInfo implements KafkaRecord { public static final String DEBEZIUM_VERSION_KEY = "version"; public static final String DEBEZIUM_CONNECTOR_KEY = "connector"; public static final String CLUSTER_KEY = "cluster"; @@ -56,6 +56,7 @@ public SourceInfo(String cluster, OffsetPosition offsetPosition, KeyspaceTable k this.snapshot = snapshot; } + @Override public Struct record(Schema schema) { return new Struct(schema) .put(DEBEZIUM_VERSION_KEY, version) @@ -70,6 +71,7 @@ public Struct record(Schema schema) { } + @Override public boolean equals(Object o) { if (this == o) { return true; @@ -87,10 +89,12 @@ public boolean equals(Object o) { && tsMicro == that.tsMicro; } + @Override public int hashCode() { return Objects.hash(cluster, snapshot, offsetPosition, keyspaceTable, tsMicro); } + @Override public String toString() { Map map = new HashMap<>(); map.put(DEBEZIUM_VERSION_KEY, version);