DBZ-4600 Remove deprecated class
This commit is contained in:
parent
688522c385
commit
4396fc3ec3
@ -1,110 +0,0 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.converters;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.common.config.ConfigDef;
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.SchemaAndValue;
|
||||
import org.apache.kafka.connect.errors.DataException;
|
||||
import org.apache.kafka.connect.storage.Converter;
|
||||
import org.apache.kafka.connect.storage.ConverterConfig;
|
||||
import org.apache.kafka.connect.storage.HeaderConverter;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.config.Instantiator;
|
||||
|
||||
/**
|
||||
* @deprecated
|
||||
* This class is scheduled to be renamed in Debezium 2.0 to "io.debezium.converters.BinaryDataConverter". <p/>
|
||||
*
|
||||
* A customized value converter to allow avro message to be delivered as it is (byte[]) to kafka, this is used
|
||||
* for outbox pattern where payload is serialized by KafkaAvroSerializer, the consumer need to get the deseralized payload.
|
||||
*
|
||||
* To enable the converter in a connector, the following value need to be specified
|
||||
* "value.converter": "io.debezium.converters.ByteBufferConverter"
|
||||
*
|
||||
* @since 1.9
|
||||
* @author Yang Yang
|
||||
*/
|
||||
@Deprecated
|
||||
public class ByteBufferConverter implements Converter, HeaderConverter {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(ByteBufferConverter.class);
|
||||
|
||||
public static final String DELEGATE_CONVERTER_TYPE = "delegate.converter.type";
|
||||
|
||||
private Converter delegateConverter;
|
||||
private static final ConfigDef CONFIG_DEF;
|
||||
|
||||
static {
|
||||
CONFIG_DEF = ConverterConfig.newConfigDef();
|
||||
CONFIG_DEF.define(DELEGATE_CONVERTER_TYPE, ConfigDef.Type.STRING, null, ConfigDef.Importance.LOW, "Specifies the delegate converter class");
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConfigDef config() {
|
||||
return CONFIG_DEF;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(Map<String, ?> configs) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(Map<String, ?> configs, boolean isKey) {
|
||||
final String converterTypeName = (String) configs.get(DELEGATE_CONVERTER_TYPE);
|
||||
if (converterTypeName != null) {
|
||||
delegateConverter = Instantiator.getInstance(converterTypeName, () -> getClass().getClassLoader(), null);
|
||||
delegateConverter.configure(Configuration.from(configs).subset(DELEGATE_CONVERTER_TYPE, true).asMap(), isKey);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] fromConnectData(String topic, Schema schema, Object value) {
|
||||
if (schema != null && schema.type() != Schema.Type.BYTES) {
|
||||
assertDataException("schema", schema.type());
|
||||
LOGGER.debug("Value is not of Schema.Type.BYTES, delegating to " + delegateConverter.getClass().getName());
|
||||
return delegateConverter.fromConnectData(topic, schema, value);
|
||||
}
|
||||
else if (value != null && !(value instanceof ByteBuffer)) {
|
||||
assertDataException("value", value.getClass().getName());
|
||||
LOGGER.debug("Value is not of type ByteBuffer, delegating to " + delegateConverter.getClass().getName());
|
||||
return delegateConverter.fromConnectData(topic, schema, value);
|
||||
}
|
||||
return value == null ? null : ((ByteBuffer) value).array();
|
||||
}
|
||||
|
||||
private void assertDataException(String name, Object type) {
|
||||
if (delegateConverter == null) {
|
||||
throw new DataException("A " + name + " of type '" + type + "' requires a delegate.converter.type to be configured");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public SchemaAndValue toConnectData(String topic, byte[] value) {
|
||||
return new SchemaAndValue(Schema.OPTIONAL_BYTES_SCHEMA, value == null ? null : ByteBuffer.wrap(value));
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] fromConnectHeader(String topic, String headerKey, Schema schema, Object value) {
|
||||
return fromConnectData(topic, schema, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value) {
|
||||
return toConnectData(topic, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
// do nothing
|
||||
}
|
||||
}
|
@ -1,196 +0,0 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
|
||||
package io.debezium.converters;
|
||||
|
||||
import static junit.framework.TestCase.fail;
|
||||
import static org.fest.assertions.Assertions.assertThat;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.SchemaAndValue;
|
||||
import org.apache.kafka.connect.errors.DataException;
|
||||
import org.apache.kafka.connect.json.JsonConverter;
|
||||
import org.apache.kafka.connect.json.JsonDeserializer;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
|
||||
public class ByteBufferConverterTest {
|
||||
|
||||
private static final String TOPIC = "topic";
|
||||
private static final byte[] SAMPLE_BYTES = "sample string".getBytes(StandardCharsets.UTF_8);
|
||||
|
||||
private ByteBufferConverter converter = new ByteBufferConverter();
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
converter.configure(Collections.emptyMap(), false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldConvertFromConnectData() {
|
||||
byte[] bytes = converter.fromConnectData(TOPIC, Schema.BYTES_SCHEMA, ByteBuffer.wrap(SAMPLE_BYTES));
|
||||
|
||||
assertThat(bytes).isEqualTo(SAMPLE_BYTES);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldConvertFromConnectDataForOptionalBytesSchema() {
|
||||
byte[] bytes = converter.fromConnectData(TOPIC, Schema.OPTIONAL_BYTES_SCHEMA, ByteBuffer.wrap(SAMPLE_BYTES));
|
||||
|
||||
assertThat(bytes).isEqualTo(SAMPLE_BYTES);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldConvertFromConnectDataWithoutSchema() {
|
||||
byte[] bytes = converter.fromConnectData(TOPIC, null, ByteBuffer.wrap(SAMPLE_BYTES));
|
||||
|
||||
assertThat(bytes).isEqualTo(SAMPLE_BYTES);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldConvertNullFromConnectData() {
|
||||
byte[] bytes = converter.fromConnectData(TOPIC, null, null);
|
||||
|
||||
assertThat(bytes).isNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldThrowWhenConvertNonByteSchemaFromConnectData() {
|
||||
try {
|
||||
converter.fromConnectData(TOPIC, Schema.BOOLEAN_SCHEMA, null);
|
||||
fail("now expected exception thrown");
|
||||
}
|
||||
catch (Exception e) {
|
||||
assertThat(e).isExactlyInstanceOf(DataException.class);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldThrowWhenConvertRawByteArrayFromConnectData() {
|
||||
try {
|
||||
converter.fromConnectData(TOPIC, Schema.BOOLEAN_SCHEMA, null);
|
||||
fail("now expected exception thrown");
|
||||
}
|
||||
catch (Exception e) {
|
||||
assertThat(e).isExactlyInstanceOf(DataException.class);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldConvertToConnectData() {
|
||||
SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, SAMPLE_BYTES);
|
||||
|
||||
assertThat(schemaAndValue.schema()).isEqualTo(Schema.OPTIONAL_BYTES_SCHEMA);
|
||||
assertThat(schemaAndValue.value()).isEqualTo(ByteBuffer.wrap(SAMPLE_BYTES));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldConvertToConnectDataForNullValue() {
|
||||
SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, null);
|
||||
|
||||
assertThat(schemaAndValue.schema()).isEqualTo(Schema.OPTIONAL_BYTES_SCHEMA);
|
||||
assertThat(schemaAndValue.value()).isNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldThrowWhenNoDelegateConverterConfigured() {
|
||||
try {
|
||||
converter.fromConnectData(TOPIC, Schema.OPTIONAL_STRING_SCHEMA, "Hello World");
|
||||
fail("now expected exception thrown");
|
||||
}
|
||||
catch (Exception e) {
|
||||
assertThat(e).isExactlyInstanceOf(DataException.class);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldConvertUsingDelegateConverter() {
|
||||
// Configure delegate converter
|
||||
converter.configure(Collections.singletonMap(ByteBufferConverter.DELEGATE_CONVERTER_TYPE, JsonConverter.class.getName()), false);
|
||||
|
||||
byte[] data = converter.fromConnectData(TOPIC, Schema.OPTIONAL_STRING_SCHEMA, "{\"message\": \"Hello World\"}");
|
||||
|
||||
JsonNode value;
|
||||
try (JsonDeserializer jsonDeserializer = new JsonDeserializer()) {
|
||||
value = jsonDeserializer.deserialize(TOPIC, data);
|
||||
}
|
||||
|
||||
assertThat(value).isNotNull();
|
||||
assertThat(value.get("schema")).isNotNull();
|
||||
assertThat(value.get("schema").get("type").asText()).isEqualTo("string");
|
||||
assertThat(value.get("schema").get("optional").asBoolean()).isTrue();
|
||||
assertThat(value.get("payload")).isNotNull();
|
||||
assertThat(value.get("payload").asText()).isEqualTo("{\"message\": \"Hello World\"}");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldConvertUsingDelegateConverterWithNoSchema() {
|
||||
// Configure delegate converter
|
||||
converter.configure(Collections.singletonMap(ByteBufferConverter.DELEGATE_CONVERTER_TYPE, JsonConverter.class.getName()), false);
|
||||
|
||||
byte[] data = converter.fromConnectData(TOPIC, null, "{\"message\": \"Hello World\"}");
|
||||
|
||||
JsonNode value;
|
||||
try (JsonDeserializer jsonDeserializer = new JsonDeserializer()) {
|
||||
value = jsonDeserializer.deserialize(TOPIC, data);
|
||||
}
|
||||
|
||||
assertThat(value).isNotNull();
|
||||
assertThat(value.get("schema")).isNotNull();
|
||||
assertThat(value.get("schema").asText()).isEqualTo("null");
|
||||
assertThat(value.get("payload")).isNotNull();
|
||||
assertThat(value.get("payload").asText()).isEqualTo("{\"message\": \"Hello World\"}");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldConvertUsingDelegateConverterWithOptions() {
|
||||
// Configure delegate converter
|
||||
final Map<String, String> config = new HashMap<>();
|
||||
config.put(ByteBufferConverter.DELEGATE_CONVERTER_TYPE, JsonConverter.class.getName());
|
||||
config.put(ByteBufferConverter.DELEGATE_CONVERTER_TYPE + ".schemas.enable", Boolean.FALSE.toString());
|
||||
converter.configure(config, false);
|
||||
|
||||
byte[] data = converter.fromConnectData(TOPIC, Schema.OPTIONAL_STRING_SCHEMA, "{\"message\": \"Hello World\"}");
|
||||
|
||||
JsonNode value;
|
||||
try (JsonDeserializer jsonDeserializer = new JsonDeserializer()) {
|
||||
value = jsonDeserializer.deserialize(TOPIC, data);
|
||||
}
|
||||
|
||||
assertThat(value.has("schema")).isFalse();
|
||||
assertThat(value.has("payload")).isFalse();
|
||||
assertThat(value.asText()).isEqualTo("{\"message\": \"Hello World\"}");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldConvertUsingDelegateConverterWithOptionsAndNoSchema() {
|
||||
// Configure delegate converter
|
||||
final Map<String, String> config = new HashMap<>();
|
||||
config.put(ByteBufferConverter.DELEGATE_CONVERTER_TYPE, JsonConverter.class.getName());
|
||||
config.put(ByteBufferConverter.DELEGATE_CONVERTER_TYPE + ".schemas.enable", Boolean.FALSE.toString());
|
||||
converter.configure(config, false);
|
||||
|
||||
byte[] data = converter.fromConnectData(TOPIC, null, "{\"message\": \"Hello World\"}");
|
||||
|
||||
JsonNode value;
|
||||
try (JsonDeserializer jsonDeserializer = new JsonDeserializer()) {
|
||||
value = jsonDeserializer.deserialize(TOPIC, data);
|
||||
}
|
||||
|
||||
assertThat(value.has("schema")).isFalse();
|
||||
assertThat(value.has("payload")).isFalse();
|
||||
assertThat(value.asText()).isEqualTo("{\"message\": \"Hello World\"}");
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user