diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchemaFactory.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchemaFactory.java index e070c48df..f37d06216 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchemaFactory.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchemaFactory.java @@ -32,15 +32,15 @@ public static PostgresSchemaFactory get() { /* * Postgres LogicalDecodingMessageMonitor schema */ + public static final String POSTGRES_LOGICAL_DECODING_MESSAGE_MONITOR_VALUE_SCHEMA_NAME = "io.debezium.connector.postgresql.MessageValue"; + private static final int POSTGRES_LOGICAL_DECODING_MESSAGE_MONITOR_VALUE_SCHEMA_VERSION = 1; + private static final String POSTGRES_LOGICAL_DECODING_MESSAGE_MONITOR_KEY_SCHEMA_NAME = "io.debezium.connector.postgresql.MessageKey"; private static final int POSTGRES_LOGICAL_DECODING_MESSAGE_MONITOR_KEY_SCHEMA_VERSION = 1; private static final String POSTGRES_LOGICAL_DECODING_MESSAGE_MONITOR_BLOCK_SCHEMA_NAME = "io.debezium.connector.postgresql.Message"; private static final int POSTGRES_LOGICAL_DECODING_MESSAGE_MONITOR_BLOCK_SCHEMA_VERSION = 1; - private static final String POSTGRES_LOGICAL_DECODING_MESSAGE_MONITOR_VALUE_SCHEMA_NAME = "io.debezium.connector.postgresql.MessageValue"; - private static final int POSTGRES_LOGICAL_DECODING_MESSAGE_MONITOR_VALUE_SCHEMA_VERSION = 1; - public Schema logicalDecodingMessageMonitorKeySchema(SchemaNameAdjuster adjuster) { return SchemaBuilder.struct() .name(adjuster.adjust(POSTGRES_LOGICAL_DECODING_MESSAGE_MONITOR_KEY_SCHEMA_NAME)) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/transforms/DecodeLogicalDecodingMessageContent.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/transforms/DecodeLogicalDecodingMessageContent.java index 029715193..527740212 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/transforms/DecodeLogicalDecodingMessageContent.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/transforms/DecodeLogicalDecodingMessageContent.java @@ -7,6 +7,7 @@ import static io.debezium.connector.postgresql.LogicalDecodingMessageMonitor.DEBEZIUM_LOGICAL_DECODING_MESSAGE_CONTENT_KEY; import static io.debezium.connector.postgresql.LogicalDecodingMessageMonitor.DEBEZIUM_LOGICAL_DECODING_MESSAGE_KEY; +import static io.debezium.connector.postgresql.PostgresSchemaFactory.POSTGRES_LOGICAL_DECODING_MESSAGE_MONITOR_VALUE_SCHEMA_NAME; import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; import java.util.Map; @@ -19,15 +20,16 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.transforms.ReplaceField; import org.apache.kafka.connect.transforms.Transformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import io.debezium.DebeziumException; import io.debezium.Module; import io.debezium.config.CommonConnectorConfig; import io.debezium.config.Configuration; @@ -49,8 +51,6 @@ public class DecodeLogicalDecodingMessageContent> imp private static final Logger LOGGER = LoggerFactory.getLogger(DecodeLogicalDecodingMessageContent.class); - private static final String POSTGRES_LOGICAL_DECODING_MESSAGE_SCHEMA_NAME = "io.debezium.connector.postgresql.MessageValue"; - private BoundedConcurrentHashMap logicalDecodingMessageContentSchemaCache; private ObjectMapper objectMapper; private JsonSchemaData jsonSchemaData; @@ -76,7 +76,7 @@ public void configure(final Map configs) { @Override public R apply(final R record) { // ignore all messages that are not logical decoding messages - if (!Objects.equals(record.valueSchema().name(), POSTGRES_LOGICAL_DECODING_MESSAGE_SCHEMA_NAME)) { + if (!Objects.equals(record.valueSchema().name(), POSTGRES_LOGICAL_DECODING_MESSAGE_MONITOR_VALUE_SCHEMA_NAME)) { LOGGER.debug("Ignore not a logical decoding message. Message key: \"{}\"", record.key()); return record; } @@ -105,7 +105,7 @@ private Struct getLogicalDecodingMessageContent(Struct valueStruct) { "Retrieve content of a logical decoding message"); if (logicalDecodingMessageStruct.schema().field(DEBEZIUM_LOGICAL_DECODING_MESSAGE_CONTENT_KEY).schema().type() != Schema.Type.BYTES) { - throw new DataException("The content of a logical decoding message is non-binary"); + throw new DebeziumException("The content of a logical decoding message is non-binary"); } byte[] logicalDecodingMessageContentBytes = logicalDecodingMessageStruct.getBytes(DEBEZIUM_LOGICAL_DECODING_MESSAGE_CONTENT_KEY); @@ -114,25 +114,25 @@ private Struct getLogicalDecodingMessageContent(Struct valueStruct) { private Struct convertLogicalDecodingMessageContentBytesToStruct(byte[] logicalDecodingMessageContent) { final String logicalDecodingMessageContentString = new String(logicalDecodingMessageContent); - try { - // parse and get Jackson JsonNode - final JsonNode logicalDecodingMessageContentJson = parseLogicalDecodingMessageContentJsonString(logicalDecodingMessageContentString); - // get schema of a logical decoding message content - Schema logicalDecodingMessageContentSchema = jsonSchemaData.toConnectSchema(null, logicalDecodingMessageContentJson); - // get Struct of a logical decoding message content - return (Struct) jsonSchemaData.toConnectData(logicalDecodingMessageContentJson, logicalDecodingMessageContentSchema); - } - catch (Exception e) { - LOGGER.warn("Conversion of logical decoding message content failed", e); - throw new DataException("Conversion of logical decoding message content failed"); - } + // parse and get Jackson JsonNode + final JsonNode logicalDecodingMessageContentJson = parseLogicalDecodingMessageContentJsonString(logicalDecodingMessageContentString); + // get schema of a logical decoding message content + Schema logicalDecodingMessageContentSchema = jsonSchemaData.toConnectSchema(null, logicalDecodingMessageContentJson); + // get Struct of a logical decoding message content + return (Struct) jsonSchemaData.toConnectData(logicalDecodingMessageContentJson, logicalDecodingMessageContentSchema); } - private JsonNode parseLogicalDecodingMessageContentJsonString(String logicalDecodingMessageContentJsonString) throws Exception { + private JsonNode parseLogicalDecodingMessageContentJsonString(String logicalDecodingMessageContentJsonString) { if (logicalDecodingMessageContentJsonString.startsWith("{") || logicalDecodingMessageContentJsonString.startsWith("[")) { - return objectMapper.readTree(logicalDecodingMessageContentJsonString); + try { + return objectMapper.readTree(logicalDecodingMessageContentJsonString); + } + catch (JsonProcessingException e) { + throw new DebeziumException(e); + } } - throw new Exception("Unable to parse logical decoding message content JSON string starting with '" + logicalDecodingMessageContentJsonString.charAt(0) + "'"); + throw new DebeziumException( + "Unable to parse logical decoding message content JSON string '" + logicalDecodingMessageContentJsonString + "'"); } private R removeLogicalDecodingMessageContentField(R record) { diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/transforms/DecodeLogicalDecodingMessageContentTest.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/transforms/DecodeLogicalDecodingMessageContentTest.java index 996a9fb27..897ede1bf 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/transforms/DecodeLogicalDecodingMessageContentTest.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/transforms/DecodeLogicalDecodingMessageContentTest.java @@ -19,7 +19,6 @@ import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.source.SourceRecord; import org.junit.After; import org.junit.Before; @@ -27,6 +26,7 @@ import org.junit.Test; import org.junit.rules.TestRule; +import io.debezium.DebeziumException; import io.debezium.config.Configuration; import io.debezium.connector.postgresql.PostgresConnector; import io.debezium.connector.postgresql.SourceInfo; @@ -87,9 +87,9 @@ public void shouldFailWhenLogicalDecodingMessageContentIsEmptyString() throws Ex List recordsForTopic = records.recordsForTopic(topicName("message")); assertThat(recordsForTopic).hasSize(1); - Exception exception = assertThrows(DataException.class, () -> decodeLogicalDecodingMessageContent.apply(recordsForTopic.get(0))); + Exception exception = assertThrows(DebeziumException.class, () -> decodeLogicalDecodingMessageContent.apply(recordsForTopic.get(0))); - assertThat(exception.getMessage()).isEqualTo("Conversion of logical decoding message content failed"); + assertThat(exception.getMessage()).isEqualTo("Unable to parse logical decoding message content JSON string ''"); } @Test diff --git a/debezium-core/src/main/java/io/debezium/converters/recordandmetadata/RecordAndMetadataHeaderImpl.java b/debezium-core/src/main/java/io/debezium/converters/recordandmetadata/RecordAndMetadataHeaderImpl.java index ef68ea0d6..da71ca7f7 100644 --- a/debezium-core/src/main/java/io/debezium/converters/recordandmetadata/RecordAndMetadataHeaderImpl.java +++ b/debezium-core/src/main/java/io/debezium/converters/recordandmetadata/RecordAndMetadataHeaderImpl.java @@ -15,6 +15,7 @@ import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.json.JsonConverter; +import io.debezium.DebeziumException; import io.debezium.converters.CloudEventsConverterConfig.MetadataSource; import io.debezium.converters.CloudEventsConverterConfig.MetadataSourceValue; import io.debezium.converters.spi.CloudEventsMaker; @@ -98,7 +99,7 @@ private SchemaAndValue getHeaderSchemaAndValue(Headers headers, String headerNam return SchemaAndValue.NULL; } else { - throw new RuntimeException("Header `" + headerName + "` was not provided"); + throw new DebeziumException("Header `" + headerName + "` was not provided"); } } return jsonHeaderConverter.toConnectData(null, header.value());