DBZ-2455 Adding constant for some MongoDB specific field names
This commit is contained in:
parent
642fda71d9
commit
193541800d
@ -79,13 +79,13 @@ public Struct valueFromDocument(Document document, Document filter, Envelope.Ope
|
||||
break;
|
||||
case UPDATE:
|
||||
final String patchStr = valueGenerator.apply(fieldFilter.apply(document));
|
||||
value.put("patch", patchStr);
|
||||
value.put(MongoDbFieldName.PATCH, patchStr);
|
||||
final String updateFilterStr = valueGenerator.apply(fieldFilter.apply(filter));
|
||||
value.put("filter", updateFilterStr);
|
||||
value.put(MongoDbFieldName.FILTER, updateFilterStr);
|
||||
break;
|
||||
case DELETE:
|
||||
final String deleteFilterStr = valueGenerator.apply(fieldFilter.apply(filter));
|
||||
value.put("filter", deleteFilterStr);
|
||||
value.put(MongoDbFieldName.FILTER, deleteFilterStr);
|
||||
break;
|
||||
}
|
||||
return value;
|
||||
|
@ -0,0 +1,21 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.connector.mongodb;
|
||||
|
||||
import io.debezium.data.Envelope;
|
||||
import io.debezium.data.Envelope.FieldName;
|
||||
|
||||
/**
|
||||
* Field names specific to MongoDB change event {@link Envelope}s.
|
||||
*
|
||||
* @author Gunnar Morling
|
||||
* @see FieldName
|
||||
*/
|
||||
public class MongoDbFieldName {
|
||||
|
||||
public static final String PATCH = "patch";
|
||||
public static final String FILTER = "filter";
|
||||
}
|
@ -83,8 +83,8 @@ public DataCollectionSchema schemaFor(CollectionId collectionId) {
|
||||
final Schema valueSchema = SchemaBuilder.struct()
|
||||
.name(adjuster.adjust(Envelope.schemaName(topicName)))
|
||||
.field(FieldName.AFTER, Json.builder().optional().build())
|
||||
.field("patch", Json.builder().optional().build())
|
||||
.field("filter", Json.builder().optional().build())
|
||||
.field(MongoDbFieldName.PATCH, Json.builder().optional().build())
|
||||
.field(MongoDbFieldName.FILTER, Json.builder().optional().build())
|
||||
.field(FieldName.SOURCE, sourceSchema)
|
||||
.field(FieldName.OPERATION, Schema.OPTIONAL_STRING_SCHEMA)
|
||||
.field(FieldName.TIMESTAMP, Schema.OPTIONAL_INT64_SCHEMA)
|
||||
|
@ -41,7 +41,9 @@
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.config.EnumeratedValue;
|
||||
import io.debezium.config.Field;
|
||||
import io.debezium.connector.mongodb.MongoDbFieldName;
|
||||
import io.debezium.data.Envelope;
|
||||
import io.debezium.data.Envelope.FieldName;
|
||||
import io.debezium.data.Envelope.Operation;
|
||||
import io.debezium.pipeline.txmetadata.TransactionMonitor;
|
||||
import io.debezium.schema.FieldNameSelector;
|
||||
@ -497,9 +499,9 @@ public void configure(final Map<String, ?> map) {
|
||||
handleDeletes = DeleteHandling.parse(config.getString(ExtractNewRecordStateConfigDefinition.HANDLE_DELETES));
|
||||
|
||||
final Map<String, String> afterExtractorConfig = new HashMap<>();
|
||||
afterExtractorConfig.put("field", "after");
|
||||
afterExtractorConfig.put("field", FieldName.AFTER);
|
||||
final Map<String, String> patchExtractorConfig = new HashMap<>();
|
||||
patchExtractorConfig.put("field", "patch");
|
||||
patchExtractorConfig.put("field", MongoDbFieldName.PATCH);
|
||||
final Map<String, String> keyExtractorConfig = new HashMap<>();
|
||||
keyExtractorConfig.put("field", "id");
|
||||
|
||||
@ -568,7 +570,7 @@ else if (parts.length == 2) {
|
||||
private static String determineStruct(String simpleFieldName) {
|
||||
if (simpleFieldName.equals(Envelope.FieldName.OPERATION) ||
|
||||
simpleFieldName.equals(Envelope.FieldName.TIMESTAMP) ||
|
||||
simpleFieldName.equals("patch")) {
|
||||
simpleFieldName.equals(MongoDbFieldName.PATCH)) {
|
||||
return null;
|
||||
}
|
||||
else if (simpleFieldName.equals(TransactionMonitor.DEBEZIUM_TRANSACTION_ID_KEY) ||
|
||||
|
@ -36,7 +36,7 @@
|
||||
public class FieldBlacklistIT extends AbstractConnectorTest {
|
||||
|
||||
private static final String SERVER_NAME = "serverX";
|
||||
private static final String PATCH = "patch";
|
||||
private static final String PATCH = MongoDbFieldName.PATCH;
|
||||
|
||||
private Configuration config;
|
||||
private MongoDbTaskContext context;
|
||||
|
@ -36,7 +36,7 @@
|
||||
public class FieldExcludeListIT extends AbstractConnectorTest {
|
||||
|
||||
private static final String SERVER_NAME = "serverX";
|
||||
private static final String PATCH = "patch";
|
||||
private static final String PATCH = MongoDbFieldName.PATCH;
|
||||
|
||||
private Configuration config;
|
||||
private MongoDbTaskContext context;
|
||||
|
@ -33,7 +33,7 @@ public class FieldRenamesIT extends AbstractMongoConnectorIT {
|
||||
private static final String DATABASE_NAME = "dbA";
|
||||
private static final String COLLECTION_NAME = "c1";
|
||||
private static final String SERVER_NAME = "serverX";
|
||||
private static final String PATCH = "patch";
|
||||
private static final String PATCH = MongoDbFieldName.PATCH;
|
||||
private static final String ID = "_id";
|
||||
|
||||
@Test
|
||||
|
@ -1310,7 +1310,7 @@ public void shouldGenerateRecordForUpdateEvent() throws Exception {
|
||||
assertThat(key.schema()).isSameAs(deleteRecord.keySchema());
|
||||
assertThat(key.get("id")).isEqualTo(JSONSerializers.getStrict().serialize(objId));
|
||||
|
||||
Document patchObj = Document.parse(value.getString("patch"));
|
||||
Document patchObj = Document.parse(value.getString(MongoDbFieldName.PATCH));
|
||||
patchObj.remove("$v");
|
||||
|
||||
assertThat(value.schema()).isSameAs(deleteRecord.valueSchema());
|
||||
@ -1365,7 +1365,7 @@ public void shouldGeneratorRecordForDeleteEvent() throws Exception {
|
||||
|
||||
assertThat(value.schema()).isSameAs(deleteRecord.valueSchema());
|
||||
assertThat(value.getString(Envelope.FieldName.AFTER)).isNull();
|
||||
assertThat(value.getString("patch")).isNull();
|
||||
assertThat(value.getString(MongoDbFieldName.PATCH)).isNull();
|
||||
assertThat(value.getString(Envelope.FieldName.OPERATION)).isEqualTo(Operation.DELETE.code());
|
||||
assertThat(value.getInt64(Envelope.FieldName.TIMESTAMP)).isGreaterThanOrEqualTo(timestamp.toEpochMilli());
|
||||
|
||||
@ -1424,7 +1424,7 @@ public void shouldGenerateRecordForDeleteEventWithoutTombstone() throws Exceptio
|
||||
|
||||
assertThat(value.schema()).isSameAs(record.valueSchema());
|
||||
assertThat(value.getString(Envelope.FieldName.AFTER)).isNull();
|
||||
assertThat(value.getString("patch")).isNull();
|
||||
assertThat(value.getString(MongoDbFieldName.PATCH)).isNull();
|
||||
assertThat(value.getString(Envelope.FieldName.OPERATION)).isEqualTo(Operation.DELETE.code());
|
||||
assertThat(value.getInt64(Envelope.FieldName.TIMESTAMP)).isGreaterThanOrEqualTo(timestamp.toEpochMilli());
|
||||
|
||||
@ -1804,7 +1804,7 @@ public void shouldGenerateRecordForUpdateEventUsingLegacyV1SourceInfo() throws E
|
||||
assertThat(key.schema()).isSameAs(deleteRecord.keySchema());
|
||||
assertThat(key.get("id")).isEqualTo(JSONSerializers.getStrict().serialize(objId));
|
||||
|
||||
Document patchObj = Document.parse(value.getString("patch"));
|
||||
Document patchObj = Document.parse(value.getString(MongoDbFieldName.PATCH));
|
||||
patchObj.remove("$v");
|
||||
|
||||
assertThat(value.schema()).isSameAs(deleteRecord.valueSchema());
|
||||
|
@ -28,6 +28,7 @@
|
||||
import org.fest.assertions.Assertions;
|
||||
import org.junit.Test;
|
||||
|
||||
import io.debezium.connector.mongodb.MongoDbFieldName;
|
||||
import io.debezium.connector.mongodb.TestHelper;
|
||||
import io.debezium.data.Envelope;
|
||||
import io.debezium.data.Envelope.Operation;
|
||||
@ -1417,7 +1418,7 @@ public void testAddPatchFieldAfterUpdate() throws Exception {
|
||||
assertNoRecordsToConsume();
|
||||
|
||||
final Map<String, String> props = new HashMap<>();
|
||||
props.put(ADD_FIELDS, "patch");
|
||||
props.put(ADD_FIELDS, MongoDbFieldName.PATCH);
|
||||
transformation.configure(props);
|
||||
|
||||
// Perform transformation
|
||||
|
Loading…
Reference in New Issue
Block a user