DBZ-8103 Apply suggestions from code review
This commit is contained in:
parent
a2de933a1d
commit
431e09f53b
@ -32,15 +32,15 @@ public static PostgresSchemaFactory get() {
|
|||||||
/*
|
/*
|
||||||
* Postgres LogicalDecodingMessageMonitor schema
|
* Postgres LogicalDecodingMessageMonitor schema
|
||||||
*/
|
*/
|
||||||
|
public static final String POSTGRES_LOGICAL_DECODING_MESSAGE_MONITOR_VALUE_SCHEMA_NAME = "io.debezium.connector.postgresql.MessageValue";
|
||||||
|
private static final int POSTGRES_LOGICAL_DECODING_MESSAGE_MONITOR_VALUE_SCHEMA_VERSION = 1;
|
||||||
|
|
||||||
private static final String POSTGRES_LOGICAL_DECODING_MESSAGE_MONITOR_KEY_SCHEMA_NAME = "io.debezium.connector.postgresql.MessageKey";
|
private static final String POSTGRES_LOGICAL_DECODING_MESSAGE_MONITOR_KEY_SCHEMA_NAME = "io.debezium.connector.postgresql.MessageKey";
|
||||||
private static final int POSTGRES_LOGICAL_DECODING_MESSAGE_MONITOR_KEY_SCHEMA_VERSION = 1;
|
private static final int POSTGRES_LOGICAL_DECODING_MESSAGE_MONITOR_KEY_SCHEMA_VERSION = 1;
|
||||||
|
|
||||||
private static final String POSTGRES_LOGICAL_DECODING_MESSAGE_MONITOR_BLOCK_SCHEMA_NAME = "io.debezium.connector.postgresql.Message";
|
private static final String POSTGRES_LOGICAL_DECODING_MESSAGE_MONITOR_BLOCK_SCHEMA_NAME = "io.debezium.connector.postgresql.Message";
|
||||||
private static final int POSTGRES_LOGICAL_DECODING_MESSAGE_MONITOR_BLOCK_SCHEMA_VERSION = 1;
|
private static final int POSTGRES_LOGICAL_DECODING_MESSAGE_MONITOR_BLOCK_SCHEMA_VERSION = 1;
|
||||||
|
|
||||||
private static final String POSTGRES_LOGICAL_DECODING_MESSAGE_MONITOR_VALUE_SCHEMA_NAME = "io.debezium.connector.postgresql.MessageValue";
|
|
||||||
private static final int POSTGRES_LOGICAL_DECODING_MESSAGE_MONITOR_VALUE_SCHEMA_VERSION = 1;
|
|
||||||
|
|
||||||
public Schema logicalDecodingMessageMonitorKeySchema(SchemaNameAdjuster adjuster) {
|
public Schema logicalDecodingMessageMonitorKeySchema(SchemaNameAdjuster adjuster) {
|
||||||
return SchemaBuilder.struct()
|
return SchemaBuilder.struct()
|
||||||
.name(adjuster.adjust(POSTGRES_LOGICAL_DECODING_MESSAGE_MONITOR_KEY_SCHEMA_NAME))
|
.name(adjuster.adjust(POSTGRES_LOGICAL_DECODING_MESSAGE_MONITOR_KEY_SCHEMA_NAME))
|
||||||
|
@ -7,6 +7,7 @@
|
|||||||
|
|
||||||
import static io.debezium.connector.postgresql.LogicalDecodingMessageMonitor.DEBEZIUM_LOGICAL_DECODING_MESSAGE_CONTENT_KEY;
|
import static io.debezium.connector.postgresql.LogicalDecodingMessageMonitor.DEBEZIUM_LOGICAL_DECODING_MESSAGE_CONTENT_KEY;
|
||||||
import static io.debezium.connector.postgresql.LogicalDecodingMessageMonitor.DEBEZIUM_LOGICAL_DECODING_MESSAGE_KEY;
|
import static io.debezium.connector.postgresql.LogicalDecodingMessageMonitor.DEBEZIUM_LOGICAL_DECODING_MESSAGE_KEY;
|
||||||
|
import static io.debezium.connector.postgresql.PostgresSchemaFactory.POSTGRES_LOGICAL_DECODING_MESSAGE_MONITOR_VALUE_SCHEMA_NAME;
|
||||||
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
|
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@ -19,15 +20,16 @@
|
|||||||
import org.apache.kafka.connect.data.Schema;
|
import org.apache.kafka.connect.data.Schema;
|
||||||
import org.apache.kafka.connect.data.SchemaBuilder;
|
import org.apache.kafka.connect.data.SchemaBuilder;
|
||||||
import org.apache.kafka.connect.data.Struct;
|
import org.apache.kafka.connect.data.Struct;
|
||||||
import org.apache.kafka.connect.errors.DataException;
|
|
||||||
import org.apache.kafka.connect.transforms.ReplaceField;
|
import org.apache.kafka.connect.transforms.ReplaceField;
|
||||||
import org.apache.kafka.connect.transforms.Transformation;
|
import org.apache.kafka.connect.transforms.Transformation;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import com.fasterxml.jackson.databind.JsonNode;
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import io.debezium.DebeziumException;
|
||||||
import io.debezium.Module;
|
import io.debezium.Module;
|
||||||
import io.debezium.config.CommonConnectorConfig;
|
import io.debezium.config.CommonConnectorConfig;
|
||||||
import io.debezium.config.Configuration;
|
import io.debezium.config.Configuration;
|
||||||
@ -49,8 +51,6 @@ public class DecodeLogicalDecodingMessageContent<R extends ConnectRecord<R>> imp
|
|||||||
|
|
||||||
private static final Logger LOGGER = LoggerFactory.getLogger(DecodeLogicalDecodingMessageContent.class);
|
private static final Logger LOGGER = LoggerFactory.getLogger(DecodeLogicalDecodingMessageContent.class);
|
||||||
|
|
||||||
private static final String POSTGRES_LOGICAL_DECODING_MESSAGE_SCHEMA_NAME = "io.debezium.connector.postgresql.MessageValue";
|
|
||||||
|
|
||||||
private BoundedConcurrentHashMap<Schema, Schema> logicalDecodingMessageContentSchemaCache;
|
private BoundedConcurrentHashMap<Schema, Schema> logicalDecodingMessageContentSchemaCache;
|
||||||
private ObjectMapper objectMapper;
|
private ObjectMapper objectMapper;
|
||||||
private JsonSchemaData jsonSchemaData;
|
private JsonSchemaData jsonSchemaData;
|
||||||
@ -76,7 +76,7 @@ public void configure(final Map<String, ?> configs) {
|
|||||||
@Override
|
@Override
|
||||||
public R apply(final R record) {
|
public R apply(final R record) {
|
||||||
// ignore all messages that are not logical decoding messages
|
// ignore all messages that are not logical decoding messages
|
||||||
if (!Objects.equals(record.valueSchema().name(), POSTGRES_LOGICAL_DECODING_MESSAGE_SCHEMA_NAME)) {
|
if (!Objects.equals(record.valueSchema().name(), POSTGRES_LOGICAL_DECODING_MESSAGE_MONITOR_VALUE_SCHEMA_NAME)) {
|
||||||
LOGGER.debug("Ignore not a logical decoding message. Message key: \"{}\"", record.key());
|
LOGGER.debug("Ignore not a logical decoding message. Message key: \"{}\"", record.key());
|
||||||
return record;
|
return record;
|
||||||
}
|
}
|
||||||
@ -105,7 +105,7 @@ private Struct getLogicalDecodingMessageContent(Struct valueStruct) {
|
|||||||
"Retrieve content of a logical decoding message");
|
"Retrieve content of a logical decoding message");
|
||||||
|
|
||||||
if (logicalDecodingMessageStruct.schema().field(DEBEZIUM_LOGICAL_DECODING_MESSAGE_CONTENT_KEY).schema().type() != Schema.Type.BYTES) {
|
if (logicalDecodingMessageStruct.schema().field(DEBEZIUM_LOGICAL_DECODING_MESSAGE_CONTENT_KEY).schema().type() != Schema.Type.BYTES) {
|
||||||
throw new DataException("The content of a logical decoding message is non-binary");
|
throw new DebeziumException("The content of a logical decoding message is non-binary");
|
||||||
}
|
}
|
||||||
|
|
||||||
byte[] logicalDecodingMessageContentBytes = logicalDecodingMessageStruct.getBytes(DEBEZIUM_LOGICAL_DECODING_MESSAGE_CONTENT_KEY);
|
byte[] logicalDecodingMessageContentBytes = logicalDecodingMessageStruct.getBytes(DEBEZIUM_LOGICAL_DECODING_MESSAGE_CONTENT_KEY);
|
||||||
@ -114,7 +114,6 @@ private Struct getLogicalDecodingMessageContent(Struct valueStruct) {
|
|||||||
|
|
||||||
private Struct convertLogicalDecodingMessageContentBytesToStruct(byte[] logicalDecodingMessageContent) {
|
private Struct convertLogicalDecodingMessageContentBytesToStruct(byte[] logicalDecodingMessageContent) {
|
||||||
final String logicalDecodingMessageContentString = new String(logicalDecodingMessageContent);
|
final String logicalDecodingMessageContentString = new String(logicalDecodingMessageContent);
|
||||||
try {
|
|
||||||
// parse and get Jackson JsonNode
|
// parse and get Jackson JsonNode
|
||||||
final JsonNode logicalDecodingMessageContentJson = parseLogicalDecodingMessageContentJsonString(logicalDecodingMessageContentString);
|
final JsonNode logicalDecodingMessageContentJson = parseLogicalDecodingMessageContentJsonString(logicalDecodingMessageContentString);
|
||||||
// get schema of a logical decoding message content
|
// get schema of a logical decoding message content
|
||||||
@ -122,17 +121,18 @@ private Struct convertLogicalDecodingMessageContentBytesToStruct(byte[] logicalD
|
|||||||
// get Struct of a logical decoding message content
|
// get Struct of a logical decoding message content
|
||||||
return (Struct) jsonSchemaData.toConnectData(logicalDecodingMessageContentJson, logicalDecodingMessageContentSchema);
|
return (Struct) jsonSchemaData.toConnectData(logicalDecodingMessageContentJson, logicalDecodingMessageContentSchema);
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
|
||||||
LOGGER.warn("Conversion of logical decoding message content failed", e);
|
|
||||||
throw new DataException("Conversion of logical decoding message content failed");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private JsonNode parseLogicalDecodingMessageContentJsonString(String logicalDecodingMessageContentJsonString) throws Exception {
|
private JsonNode parseLogicalDecodingMessageContentJsonString(String logicalDecodingMessageContentJsonString) {
|
||||||
if (logicalDecodingMessageContentJsonString.startsWith("{") || logicalDecodingMessageContentJsonString.startsWith("[")) {
|
if (logicalDecodingMessageContentJsonString.startsWith("{") || logicalDecodingMessageContentJsonString.startsWith("[")) {
|
||||||
|
try {
|
||||||
return objectMapper.readTree(logicalDecodingMessageContentJsonString);
|
return objectMapper.readTree(logicalDecodingMessageContentJsonString);
|
||||||
}
|
}
|
||||||
throw new Exception("Unable to parse logical decoding message content JSON string starting with '" + logicalDecodingMessageContentJsonString.charAt(0) + "'");
|
catch (JsonProcessingException e) {
|
||||||
|
throw new DebeziumException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw new DebeziumException(
|
||||||
|
"Unable to parse logical decoding message content JSON string '" + logicalDecodingMessageContentJsonString + "'");
|
||||||
}
|
}
|
||||||
|
|
||||||
private R removeLogicalDecodingMessageContentField(R record) {
|
private R removeLogicalDecodingMessageContentField(R record) {
|
||||||
|
@ -19,7 +19,6 @@
|
|||||||
|
|
||||||
import org.apache.kafka.connect.data.Field;
|
import org.apache.kafka.connect.data.Field;
|
||||||
import org.apache.kafka.connect.data.Struct;
|
import org.apache.kafka.connect.data.Struct;
|
||||||
import org.apache.kafka.connect.errors.DataException;
|
|
||||||
import org.apache.kafka.connect.source.SourceRecord;
|
import org.apache.kafka.connect.source.SourceRecord;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
@ -27,6 +26,7 @@
|
|||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.rules.TestRule;
|
import org.junit.rules.TestRule;
|
||||||
|
|
||||||
|
import io.debezium.DebeziumException;
|
||||||
import io.debezium.config.Configuration;
|
import io.debezium.config.Configuration;
|
||||||
import io.debezium.connector.postgresql.PostgresConnector;
|
import io.debezium.connector.postgresql.PostgresConnector;
|
||||||
import io.debezium.connector.postgresql.SourceInfo;
|
import io.debezium.connector.postgresql.SourceInfo;
|
||||||
@ -87,9 +87,9 @@ public void shouldFailWhenLogicalDecodingMessageContentIsEmptyString() throws Ex
|
|||||||
List<SourceRecord> recordsForTopic = records.recordsForTopic(topicName("message"));
|
List<SourceRecord> recordsForTopic = records.recordsForTopic(topicName("message"));
|
||||||
assertThat(recordsForTopic).hasSize(1);
|
assertThat(recordsForTopic).hasSize(1);
|
||||||
|
|
||||||
Exception exception = assertThrows(DataException.class, () -> decodeLogicalDecodingMessageContent.apply(recordsForTopic.get(0)));
|
Exception exception = assertThrows(DebeziumException.class, () -> decodeLogicalDecodingMessageContent.apply(recordsForTopic.get(0)));
|
||||||
|
|
||||||
assertThat(exception.getMessage()).isEqualTo("Conversion of logical decoding message content failed");
|
assertThat(exception.getMessage()).isEqualTo("Unable to parse logical decoding message content JSON string ''");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
import org.apache.kafka.connect.data.Struct;
|
import org.apache.kafka.connect.data.Struct;
|
||||||
import org.apache.kafka.connect.json.JsonConverter;
|
import org.apache.kafka.connect.json.JsonConverter;
|
||||||
|
|
||||||
|
import io.debezium.DebeziumException;
|
||||||
import io.debezium.converters.CloudEventsConverterConfig.MetadataSource;
|
import io.debezium.converters.CloudEventsConverterConfig.MetadataSource;
|
||||||
import io.debezium.converters.CloudEventsConverterConfig.MetadataSourceValue;
|
import io.debezium.converters.CloudEventsConverterConfig.MetadataSourceValue;
|
||||||
import io.debezium.converters.spi.CloudEventsMaker;
|
import io.debezium.converters.spi.CloudEventsMaker;
|
||||||
@ -98,7 +99,7 @@ private SchemaAndValue getHeaderSchemaAndValue(Headers headers, String headerNam
|
|||||||
return SchemaAndValue.NULL;
|
return SchemaAndValue.NULL;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
throw new RuntimeException("Header `" + headerName + "` was not provided");
|
throw new DebeziumException("Header `" + headerName + "` was not provided");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return jsonHeaderConverter.toConnectData(null, header.value());
|
return jsonHeaderConverter.toConnectData(null, header.value());
|
||||||
|
Loading…
Reference in New Issue
Block a user