DBZ-1495 Avoid NPE by throwing more meaningful error messages
This commit is contained in:
parent
9be1ec3d6b
commit
b20e689b2c
@ -14,6 +14,7 @@
|
|||||||
|
|
||||||
import org.apache.kafka.common.config.ConfigDef;
|
import org.apache.kafka.common.config.ConfigDef;
|
||||||
import org.apache.kafka.connect.connector.ConnectRecord;
|
import org.apache.kafka.connect.connector.ConnectRecord;
|
||||||
|
import org.apache.kafka.connect.data.Field;
|
||||||
import org.apache.kafka.connect.data.Schema;
|
import org.apache.kafka.connect.data.Schema;
|
||||||
import org.apache.kafka.connect.data.SchemaBuilder;
|
import org.apache.kafka.connect.data.SchemaBuilder;
|
||||||
import org.apache.kafka.connect.data.Struct;
|
import org.apache.kafka.connect.data.Struct;
|
||||||
@ -27,7 +28,6 @@
|
|||||||
|
|
||||||
import io.debezium.annotation.Incubating;
|
import io.debezium.annotation.Incubating;
|
||||||
import io.debezium.config.Configuration;
|
import io.debezium.config.Configuration;
|
||||||
import io.debezium.config.Field;
|
|
||||||
import io.debezium.data.Envelope;
|
import io.debezium.data.Envelope;
|
||||||
import io.debezium.transforms.outbox.EventRouterConfigDefinition.AdditionalField;
|
import io.debezium.transforms.outbox.EventRouterConfigDefinition.AdditionalField;
|
||||||
|
|
||||||
@ -101,7 +101,12 @@ public R apply(R r) {
|
|||||||
final R afterRecord = afterExtractor.apply(r);
|
final R afterRecord = afterExtractor.apply(r);
|
||||||
Struct eventStruct = requireStruct(afterRecord.value(), "Read Outbox Event");
|
Struct eventStruct = requireStruct(afterRecord.value(), "Read Outbox Event");
|
||||||
Schema eventValueSchema = afterRecord.valueSchema();
|
Schema eventValueSchema = afterRecord.valueSchema();
|
||||||
Schema payloadSchema = eventValueSchema.field(fieldPayload).schema();
|
|
||||||
|
final Field payloadField = eventValueSchema.field(fieldPayload);
|
||||||
|
if (payloadField == null) {
|
||||||
|
throw new ConnectException(String.format("Unable to find payload field %s in event", fieldPayload));
|
||||||
|
}
|
||||||
|
Schema payloadSchema = payloadField.schema();
|
||||||
|
|
||||||
Long timestamp = fieldEventTimestamp == null
|
Long timestamp = fieldEventTimestamp == null
|
||||||
? debeziumEventValue.getInt64("ts_ms")
|
? debeziumEventValue.getInt64("ts_ms")
|
||||||
@ -111,8 +116,13 @@ public R apply(R r) {
|
|||||||
Object payload = eventStruct.get(fieldPayload);
|
Object payload = eventStruct.get(fieldPayload);
|
||||||
Object payloadId = eventStruct.get(fieldPayloadId);
|
Object payloadId = eventStruct.get(fieldPayloadId);
|
||||||
|
|
||||||
|
final Field eventIdField = eventValueSchema.field(fieldEventId);
|
||||||
|
if (eventIdField == null) {
|
||||||
|
throw new ConnectException(String.format("Unable to find event-id field %s in event", fieldEventId));
|
||||||
|
}
|
||||||
|
|
||||||
Headers headers = r.headers();
|
Headers headers = r.headers();
|
||||||
headers.add("id", eventId, eventValueSchema.field(fieldEventId).schema());
|
headers.add("id", eventId, eventIdField.schema());
|
||||||
|
|
||||||
final Schema structValueSchema = onlyHeadersInOutputMessage ? null :
|
final Schema structValueSchema = onlyHeadersInOutputMessage ? null :
|
||||||
(fieldSchemaVersion == null)
|
(fieldSchemaVersion == null)
|
||||||
@ -207,7 +217,7 @@ public void close() {
|
|||||||
@Override
|
@Override
|
||||||
public void configure(Map<String, ?> configMap) {
|
public void configure(Map<String, ?> configMap) {
|
||||||
final Configuration config = Configuration.from(configMap);
|
final Configuration config = Configuration.from(configMap);
|
||||||
Field.Set allFields = Field.setOf(EventRouterConfigDefinition.CONFIG_FIELDS);
|
io.debezium.config.Field.Set allFields = io.debezium.config.Field.setOf(EventRouterConfigDefinition.CONFIG_FIELDS);
|
||||||
if (!config.validateAndRecord(allFields, LOGGER::error)) {
|
if (!config.validateAndRecord(allFields, LOGGER::error)) {
|
||||||
throw new ConnectException("Unable to validate config.");
|
throw new ConnectException("Unable to validate config.");
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user