DBZ-1405 Add back Kafka Record Interface.

This commit is contained in:
Bingqin Zhou 2019-09-11 13:48:14 -07:00 committed by Gunnar Morling
parent 16e6e62ce1
commit 8022dcc75d
4 changed files with 32 additions and 3 deletions

View File

@ -19,7 +19,7 @@
* Cell-level data about the source event. Each cell contains the name, value and
* type of a column in a Cassandra table.
*/
public class CellData {
public class CellData implements KafkaRecord {
/**
* The type of a column in a Cassandra table
*/
@ -62,6 +62,7 @@ public boolean isPrimary() {
return columnType == ColumnType.PARTITION || columnType == ColumnType.CLUSTERING;
}
@Override
public Struct record(Schema schema) {
return new Struct(schema)
.put(CELL_VALUE_KEY, value)
@ -86,6 +87,7 @@ static Schema cellSchema(ColumnMetadata cm, boolean optional) {
}
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
@ -100,10 +102,12 @@ public boolean equals(Object o) {
&& columnType == that.columnType;
}
@Override
public int hashCode() {
return Objects.hash(name, value, deletionTs, columnType);
}
@Override
public String toString() {
return "{"
+ "name=" + name

View File

@ -0,0 +1,17 @@
package io.debezium.connector.cassandra;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Schema;
/**
* An interface that indicates the record can be converted to a {@link Struct}.
*/
public interface KafkaRecord {
/**
* return an kafka connect Struct based on the schema passed into the method
* @param schema of the Struct
* @return a Struct
*/
Struct record(Schema schema);
}

View File

@ -24,7 +24,7 @@
* Row-level data about the source event. Contains a map where the key is the table column
* name and the value is the {@link CellData}.
*/
public class RowData {
public class RowData implements KafkaRecord {
private final Map<String, CellData> cellMap = new LinkedHashMap<>();
public void addCell(CellData cellData) {
@ -41,6 +41,7 @@ public boolean hasCell(String columnName) {
return cellMap.containsKey(columnName);
}
@Override
public Struct record(Schema schema) {
Struct struct = new Struct(schema);
for (Field field : schema.fields()) {
@ -83,10 +84,12 @@ List<CellData> getPrimary() {
return this.cellMap.values().stream().filter(CellData::isPrimary).collect(Collectors.toList());
}
@Override
public String toString() {
return this.cellMap.toString();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
@ -98,6 +101,7 @@ public boolean equals(Object o) {
return Objects.equals(cellMap, rowData.cellMap);
}
@Override
public int hashCode() {
return Objects.hash(cellMap);
}

View File

@ -17,7 +17,7 @@
/**
* Metadata about the source of the change event
*/
public class SourceInfo {
public class SourceInfo implements KafkaRecord {
public static final String DEBEZIUM_VERSION_KEY = "version";
public static final String DEBEZIUM_CONNECTOR_KEY = "connector";
public static final String CLUSTER_KEY = "cluster";
@ -56,6 +56,7 @@ public SourceInfo(String cluster, OffsetPosition offsetPosition, KeyspaceTable k
this.snapshot = snapshot;
}
@Override
public Struct record(Schema schema) {
return new Struct(schema)
.put(DEBEZIUM_VERSION_KEY, version)
@ -70,6 +71,7 @@ public Struct record(Schema schema) {
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
@ -87,10 +89,12 @@ public boolean equals(Object o) {
&& tsMicro == that.tsMicro;
}
@Override
public int hashCode() {
return Objects.hash(cluster, snapshot, offsetPosition, keyspaceTable, tsMicro);
}
@Override
public String toString() {
Map<String, Object> map = new HashMap<>();
map.put(DEBEZIUM_VERSION_KEY, version);