DBZ-1968 Removing SMTs deprecated in 0.10
This commit is contained in:
parent
7bec4f3c64
commit
905629973e
@ -1,26 +0,0 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.connector.mongodb.transforms;
|
||||
|
||||
import org.apache.kafka.connect.connector.ConnectRecord;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* @deprecated Use {@link ExtractNewDocumentState} instead. This class will be removed in a future release.
|
||||
*/
|
||||
@Deprecated
|
||||
public class UnwrapFromMongoDbEnvelope<R extends ConnectRecord<R>> extends ExtractNewDocumentState<R> {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(UnwrapFromMongoDbEnvelope.class);
|
||||
|
||||
public UnwrapFromMongoDbEnvelope() {
|
||||
LOGGER.warn(
|
||||
"{} has been deprecated and is scheduled for removal. Use {} instead.",
|
||||
getClass().getSimpleName(),
|
||||
ExtractNewDocumentState.class.getName());
|
||||
}
|
||||
}
|
@ -12,7 +12,6 @@
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
@ -1196,78 +1195,6 @@ public void shouldFlattenWithDelimiterTransformRecordForUpdateEvent() throws Int
|
||||
assertThat(value.schema().fields()).hasSize(4);
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-677")
|
||||
public void canUseDeprecatedSmt() throws InterruptedException {
|
||||
waitForStreamingRunning();
|
||||
|
||||
transformation = new UnwrapFromMongoDbEnvelope<SourceRecord>();
|
||||
transformation.configure(Collections.singletonMap("array.encoding", "array"));
|
||||
|
||||
final Map<String, String> props = new HashMap<>();
|
||||
props.put(OPERATION_HEADER, "true");
|
||||
props.put(ADD_SOURCE_FIELDS, "h,ts_ms,ord,db,rs");
|
||||
transformation.configure(props);
|
||||
|
||||
ObjectId objId = new ObjectId();
|
||||
Document obj = new Document()
|
||||
.append("_id", objId)
|
||||
.append("name", "Sally")
|
||||
.append("phone", 123L)
|
||||
.append("active", true)
|
||||
.append("scores", Arrays.asList(1.2, 3.4, 5.6));
|
||||
|
||||
// insert
|
||||
primary().execute("insert", client -> {
|
||||
client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
|
||||
});
|
||||
|
||||
SourceRecords records = consumeRecordsByTopic(1);
|
||||
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
|
||||
assertNoRecordsToConsume();
|
||||
|
||||
final SourceRecord record = records.allRecordsInOrder().get(0);
|
||||
final Struct source = ((Struct) record.value()).getStruct(Envelope.FieldName.SOURCE);
|
||||
|
||||
// Perform transformation
|
||||
final SourceRecord transformed = transformation.apply(record);
|
||||
|
||||
// then assert operation header is insert
|
||||
Iterator<Header> operationHeader = transformed.headers().allWithName(ExtractNewRecordStateConfigDefinition.DEBEZIUM_OPERATION_HEADER_KEY);
|
||||
assertThat((operationHeader).hasNext()).isTrue();
|
||||
assertThat(operationHeader.next().value().toString()).isEqualTo(Envelope.Operation.CREATE.code());
|
||||
|
||||
// acquire key and value Structs
|
||||
Struct key = (Struct) transformed.key();
|
||||
Struct value = (Struct) transformed.value();
|
||||
|
||||
// then assert key and its schema
|
||||
assertThat(key.schema()).isSameAs(transformed.keySchema());
|
||||
assertThat(key.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
|
||||
assertThat(key.get("id")).isEqualTo(objId.toString());
|
||||
|
||||
// and then assert value and its schema
|
||||
assertThat(value.schema().name()).isEqualTo(SERVER_NAME + "." + DB_NAME + "." + getCollectionName());
|
||||
assertThat(value.schema()).isSameAs(transformed.valueSchema());
|
||||
assertThat(value.get("name")).isEqualTo("Sally");
|
||||
assertThat(value.get("id")).isEqualTo(objId.toString());
|
||||
assertThat(value.get("phone")).isEqualTo(123L);
|
||||
assertThat(value.get("active")).isEqualTo(true);
|
||||
assertThat(value.get("scores")).isEqualTo(Arrays.asList(1.2, 3.4, 5.6));
|
||||
assertThat(value.get("__h")).isEqualTo(source.get("h"));
|
||||
assertThat(value.get("__ts_ms")).isEqualTo(source.get("ts_ms"));
|
||||
assertThat(value.get("__ord")).isEqualTo(source.get("ord"));
|
||||
assertThat(value.get("__db")).isEqualTo(source.get("db"));
|
||||
assertThat(value.get("__rs")).isEqualTo(source.get("rs"));
|
||||
|
||||
assertThat(value.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
|
||||
assertThat(value.schema().field("name").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
|
||||
assertThat(value.schema().field("phone").schema()).isEqualTo(SchemaBuilder.OPTIONAL_INT64_SCHEMA);
|
||||
assertThat(value.schema().field("active").schema()).isEqualTo(SchemaBuilder.OPTIONAL_BOOLEAN_SCHEMA);
|
||||
assertThat(value.schema().field("scores").schema()).isEqualTo(SchemaBuilder.array(SchemaBuilder.OPTIONAL_FLOAT64_SCHEMA).optional().build());
|
||||
assertThat(value.schema().fields()).hasSize(10);
|
||||
}
|
||||
|
||||
private static void waitForStreamingRunning() throws InterruptedException {
|
||||
waitForStreamingRunning("mongodb", SERVER_NAME);
|
||||
}
|
||||
|
@ -1,26 +0,0 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.transforms;
|
||||
|
||||
import org.apache.kafka.connect.connector.ConnectRecord;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* @deprecated Use {@link ExtractNewRecordState} instead. This class will be removed in a future release.
|
||||
*/
|
||||
@Deprecated
|
||||
public class UnwrapFromEnvelope<R extends ConnectRecord<R>> extends ExtractNewRecordState<R> {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(UnwrapFromEnvelope.class);
|
||||
|
||||
public UnwrapFromEnvelope() {
|
||||
LOGGER.warn(
|
||||
"{} has been deprecated and is scheduled for removal. Use {} instead.",
|
||||
getClass().getSimpleName(),
|
||||
ExtractNewRecordState.class.getName());
|
||||
}
|
||||
}
|
@ -306,19 +306,6 @@ public void testUnwrapPropagatesRecordHeaders() {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-677")
|
||||
public void canUseDeprecatedSmt() {
|
||||
try (final UnwrapFromEnvelope<SourceRecord> transform = new UnwrapFromEnvelope<>()) {
|
||||
final Map<String, String> props = new HashMap<>();
|
||||
transform.configure(props);
|
||||
|
||||
final SourceRecord createRecord = createCreateRecord();
|
||||
final SourceRecord unwrapped = transform.apply(createRecord);
|
||||
assertThat(((Struct) unwrapped.value()).getInt8("id")).isEqualTo((byte) 1);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddSourceField() {
|
||||
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
|
||||
|
Loading…
Reference in New Issue
Block a user