DBZ-5967 Provide additional checks
This commit is contained in:
parent
c2d7d82d30
commit
26bcb67cdc
@ -80,7 +80,7 @@ public void configure(Map<String, ?> props) {
|
||||
smtManager.validate(config, Field.setOf(PARTITION_TABLE_FIELD_NAME_MAPPINGS_FIELD, FIELD_TABLE_PARTITION_NUM_MAPPINGS_FIELD));
|
||||
|
||||
fieldNameByTable = ComputePartitionConfigDefinition.parseMappings(config.getStrings(PARTITION_TABLE_FIELD_NAME_MAPPINGS_FIELD, LIST_SEPARATOR));
|
||||
numberOfPartitionsByTable = ComputePartitionConfigDefinition.parseIntMappings(config.getStrings(FIELD_TABLE_PARTITION_NUM_MAPPINGS_FIELD, LIST_SEPARATOR));
|
||||
numberOfPartitionsByTable = ComputePartitionConfigDefinition.parseParititionMappings(config.getStrings(FIELD_TABLE_PARTITION_NUM_MAPPINGS_FIELD, LIST_SEPARATOR));
|
||||
|
||||
checkConfigurationConsistency();
|
||||
|
||||
@ -90,7 +90,7 @@ public void configure(Map<String, ?> props) {
|
||||
private void checkConfigurationConsistency() {
|
||||
|
||||
if (numberOfPartitionsByTable.size() != fieldNameByTable.size()) {
|
||||
throw new ConnectException(String.format("Unable to validate config. %s and %s has different number of table defined",
|
||||
throw new ComputePartitionException(String.format("Unable to validate config. %s and %s has different number of table defined",
|
||||
FIELD_TABLE_PARTITION_NUM_MAPPINGS_CONF, FIELD_TABLE_FIELD_NAME_MAPPINGS_CONF));
|
||||
}
|
||||
|
||||
@ -98,8 +98,9 @@ private void checkConfigurationConsistency() {
|
||||
intersection.retainAll(fieldNameByTable.keySet());
|
||||
|
||||
if (intersection.size() != numberOfPartitionsByTable.size()) {
|
||||
throw new ConnectException(String.format("Unable to validate config. %s and %s has different tables defined", FIELD_TABLE_PARTITION_NUM_MAPPINGS_CONF,
|
||||
FIELD_TABLE_FIELD_NAME_MAPPINGS_CONF));
|
||||
throw new ComputePartitionException(
|
||||
String.format("Unable to validate config. %s and %s has different tables defined", FIELD_TABLE_PARTITION_NUM_MAPPINGS_CONF,
|
||||
FIELD_TABLE_FIELD_NAME_MAPPINGS_CONF));
|
||||
}
|
||||
|
||||
if (numberOfPartitionsByTable.containsValue(0)) {
|
||||
|
@ -6,6 +6,8 @@
|
||||
|
||||
package io.debezium.transforms.partitions;
|
||||
|
||||
import static io.debezium.transforms.partitions.ComputePartitionConfigDefinition.FIELD_TABLE_PARTITION_NUM_MAPPINGS_CONF;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -73,7 +75,7 @@ public static int isValidMapping(Configuration config, Field field, Field.Valida
|
||||
return 0;
|
||||
}
|
||||
|
||||
static Map<String, Integer> parseIntMappings(List<String> mappings) {
|
||||
static Map<String, Integer> parseParititionMappings(List<String> mappings) {
|
||||
|
||||
final Map<String, Integer> m = new HashMap<>();
|
||||
for (String mapping : mappings) {
|
||||
@ -82,7 +84,12 @@ static Map<String, Integer> parseIntMappings(List<String> mappings) {
|
||||
throw new ComputePartitionException("Invalid mapping: " + mapping);
|
||||
}
|
||||
try {
|
||||
int value = Integer.parseInt(parts[1]);
|
||||
final int value = Integer.parseInt(parts[1]);
|
||||
if (value <= 0) {
|
||||
throw new ComputePartitionException(
|
||||
String.format("Unable to validate config. %s: partition number for '%s' must be positive",
|
||||
FIELD_TABLE_PARTITION_NUM_MAPPINGS_CONF, parts[0]));
|
||||
}
|
||||
m.put(parts[0], value);
|
||||
}
|
||||
catch (NumberFormatException e) {
|
||||
|
@ -9,6 +9,9 @@
|
||||
import io.debezium.DebeziumException;
|
||||
|
||||
public class ComputePartitionException extends DebeziumException {
|
||||
|
||||
private static final long serialVersionUID = -1317267303694502915L;
|
||||
|
||||
public ComputePartitionException() {
|
||||
super();
|
||||
}
|
||||
|
@ -18,7 +18,6 @@
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.SchemaBuilder;
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
import org.apache.kafka.connect.errors.ConnectException;
|
||||
import org.apache.kafka.connect.source.SourceRecord;
|
||||
import org.junit.Test;
|
||||
|
||||
@ -120,7 +119,7 @@ public void rowWithDifferentConfiguredFieldValueWillHaveDifferentPartition() {
|
||||
public void notConsistentConfigurationSizeWillThrowConnectionException() {
|
||||
|
||||
assertThatThrownBy(() -> configureTransformation("inventory.orders,inventory.products", "inventory.orders:purchaser,inventory.products:product", "purchaser:2"))
|
||||
.isInstanceOf(ConnectException.class)
|
||||
.isInstanceOf(ComputePartitionException.class)
|
||||
.hasMessageContaining(
|
||||
"Unable to validate config. partition.data-collections.partition.num.mappings and partition.data-collections.field.mappings has different number of table defined");
|
||||
}
|
||||
@ -130,7 +129,7 @@ public void notConsistentConfigurationWillThrowConnectionException() {
|
||||
|
||||
assertThatThrownBy(
|
||||
() -> configureTransformation("inventory.orders,inventory.products", "inventory.orders:purchaser,inventory.products:product", "prod:2,purchaser:2"))
|
||||
.isInstanceOf(ConnectException.class)
|
||||
.isInstanceOf(ComputePartitionException.class)
|
||||
.hasMessageContaining(
|
||||
"Unable to validate config. partition.data-collections.partition.num.mappings and partition.data-collections.field.mappings has different tables defined");
|
||||
}
|
||||
@ -149,13 +148,22 @@ public void negativeHashCodeValueWillBeCorrectlyManaged() {
|
||||
|
||||
@Test
|
||||
public void zeroAsPartitionNumberWillThrowConnectionException() {
|
||||
|
||||
assertThatThrownBy(
|
||||
() -> configureTransformation("inventory.orders,inventory.products", "inventory.orders:purchaser,inventory.products:products",
|
||||
"inventory.products:0,inventory.orders:2"))
|
||||
.isInstanceOf(ConnectException.class)
|
||||
.isInstanceOf(ComputePartitionException.class)
|
||||
.hasMessageContaining(
|
||||
"Unable to validate config. partition.data-collections.partition.num.mappings: partition number cannot be 0");
|
||||
"Unable to validate config. partition.data-collections.partition.num.mappings: partition number for 'inventory.products' must be positive");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void negativeAsPartitionNumberWillThrowConnectionException() {
|
||||
assertThatThrownBy(
|
||||
() -> configureTransformation("inventory.orders,inventory.products", "inventory.orders:purchaser,inventory.products:products",
|
||||
"inventory.products:-3,inventory.orders:2"))
|
||||
.isInstanceOf(ComputePartitionException.class)
|
||||
.hasMessageContaining(
|
||||
"Unable to validate config. partition.data-collections.partition.num.mappings: partition number for 'inventory.products' must be positive");
|
||||
}
|
||||
|
||||
private SourceRecord buildSourceRecord(String connector, String db, String schema, String tableName, Struct row, Envelope.Operation operation) {
|
||||
|
Loading…
Reference in New Issue
Block a user