DBZ-3364 add additional metadata to fields for API spec generator for the Debezium UI

part of https://issues.redhat.com/browse/DBZ-3364
DBZ-2653 User rollback from parent class
This commit is contained in:
rkerner 2021-07-27 13:43:00 +02:00 committed by Gunnar Morling
parent 973fa248ed
commit 52333596de
28 changed files with 571 additions and 60 deletions

View File

@ -123,6 +123,7 @@ public static SnapshotMode parse(String value, String defaultValue) {
public static final Field HOSTS = Field.create("mongodb.hosts")
.withDisplayName("Hosts")
.withType(Type.LIST)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 1))
.withWidth(Width.LONG)
.withImportance(Importance.HIGH)
.withValidation(MongoDbConnectorConfig::validateHosts)
@ -132,9 +133,10 @@ public static SnapshotMode parse(String value, String defaultValue) {
public static final Field LOGICAL_NAME = Field.create("mongodb.name")
.withDisplayName("Namespace")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 0))
.withWidth(Width.MEDIUM)
.withImportance(Importance.HIGH)
.withValidation(Field::isRequired)
.required()
.withDescription("Unique name that identifies the MongoDB replica set or cluster and all recorded offsets, and"
+ "that is used as a prefix for all schemas and topics. "
+ "Each distinct MongoDB installation should have a separate namespace and monitored by "
@ -143,6 +145,7 @@ public static SnapshotMode parse(String value, String defaultValue) {
public static final Field USER = Field.create("mongodb.user")
.withDisplayName("User")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 3))
.withWidth(Width.SHORT)
.withImportance(Importance.HIGH)
.withDescription("Database user for connecting to MongoDB, if necessary.");
@ -150,6 +153,7 @@ public static SnapshotMode parse(String value, String defaultValue) {
public static final Field PASSWORD = Field.create("mongodb.password")
.withDisplayName("Password")
.withType(Type.PASSWORD)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 4))
.withWidth(Width.SHORT)
.withImportance(Importance.HIGH)
.withDescription("Password to be used when connecting to MongoDB, if necessary.");
@ -157,6 +161,7 @@ public static SnapshotMode parse(String value, String defaultValue) {
public static final Field AUTH_SOURCE = Field.create("mongodb.authsource")
.withDisplayName("Credentials Database")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED, 3))
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withDefault(ReplicaSetDiscovery.ADMIN_DATABASE_NAME)
@ -176,6 +181,7 @@ public static SnapshotMode parse(String value, String defaultValue) {
public static final Field MONGODB_POLL_INTERVAL_MS = Field.create("mongodb.poll.interval.ms")
.withDisplayName("Replica membership poll interval (ms)")
.withType(Type.LONG)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 5))
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withDefault(30_000L)
@ -185,6 +191,7 @@ public static SnapshotMode parse(String value, String defaultValue) {
public static final Field SSL_ENABLED = Field.create("mongodb.ssl.enabled")
.withDisplayName("Enable SSL connection to MongoDB")
.withType(Type.BOOLEAN)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_SSL, 0))
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withDefault(false)
@ -194,6 +201,7 @@ public static SnapshotMode parse(String value, String defaultValue) {
public static final Field SSL_ALLOW_INVALID_HOSTNAMES = Field.create("mongodb.ssl.invalid.hostname.allowed")
.withDisplayName("Allow invalid hostnames for SSL connection")
.withType(Type.BOOLEAN)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_SSL, 1))
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withDefault(false)
@ -214,6 +222,7 @@ public static SnapshotMode parse(String value, String defaultValue) {
public static final Field CONNECT_BACKOFF_INITIAL_DELAY_MS = Field.create("connect.backoff.initial.delay.ms")
.withDisplayName("Initial delay before reconnection (ms)")
.withType(Type.LONG)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED, 1))
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withDefault(TimeUnit.SECONDS.toMillis(1))
@ -224,6 +233,7 @@ public static SnapshotMode parse(String value, String defaultValue) {
public static final Field CONNECT_BACKOFF_MAX_DELAY_MS = Field.create("connect.backoff.max.delay.ms")
.withDisplayName("Maximum delay before reconnection (ms)")
.withType(Type.LONG)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED, 2))
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withDefault(TimeUnit.SECONDS.toMillis(120))
@ -234,6 +244,7 @@ public static SnapshotMode parse(String value, String defaultValue) {
public static final Field MAX_FAILED_CONNECTIONS = Field.create("connect.max.attempts")
.withDisplayName("Connection attempt limit")
.withType(Type.INT)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED, 4))
.withWidth(Width.SHORT)
.withImportance(Importance.HIGH)
.withDefault(16)
@ -247,6 +258,7 @@ public static SnapshotMode parse(String value, String defaultValue) {
public static final Field AUTO_DISCOVER_MEMBERS = Field.create("mongodb.members.auto.discover")
.withDisplayName("Auto-discovery")
.withType(Type.BOOLEAN)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 2))
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDefault(true)
@ -263,6 +275,7 @@ public static SnapshotMode parse(String value, String defaultValue) {
public static final Field DATABASE_INCLUDE_LIST = Field.create("database.include.list")
.withDisplayName("Include Databases")
.withType(Type.LIST)
.withGroup(Field.createGroupEntry(Field.Group.FILTERS, 0))
.withWidth(Width.LONG)
.withImportance(Importance.HIGH)
.withValidation(Field::isListOfRegex)
@ -289,6 +302,7 @@ public static SnapshotMode parse(String value, String defaultValue) {
public static final Field DATABASE_EXCLUDE_LIST = Field.create("database.exclude.list")
.withDisplayName("Exclude Databases")
.withType(Type.LIST)
.withGroup(Field.createGroupEntry(Field.Group.FILTERS, 1))
.withWidth(Width.LONG)
.withImportance(Importance.HIGH)
.withValidation(Field::isListOfRegex, MongoDbConnectorConfig::validateDatabaseExcludeList)
@ -316,6 +330,7 @@ public static SnapshotMode parse(String value, String defaultValue) {
public static final Field COLLECTION_INCLUDE_LIST = Field.create("collection.include.list")
.withDisplayName("Include Collections")
.withType(Type.LIST)
.withGroup(Field.createGroupEntry(Field.Group.FILTERS, 2))
.withWidth(Width.LONG)
.withImportance(Importance.HIGH)
.withValidation(Field::isListOfRegex)
@ -341,6 +356,7 @@ public static SnapshotMode parse(String value, String defaultValue) {
* Must not be used with {@link #COLLECTION_INCLUDE_LIST}.
*/
public static final Field COLLECTION_EXCLUDE_LIST = Field.create("collection.exclude.list")
.withGroup(Field.createGroupEntry(Field.Group.FILTERS, 3))
.withValidation(Field::isListOfRegex, MongoDbConnectorConfig::validateCollectionExcludeList)
.withInvisibleRecommender()
.withDescription("A comma-separated list of regular expressions that match the collection names for which changes are to be excluded");
@ -364,6 +380,7 @@ public static SnapshotMode parse(String value, String defaultValue) {
public static final Field FIELD_EXCLUDE_LIST = Field.create("field.exclude.list")
.withDisplayName("Exclude Fields")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.FILTERS, 5))
.withWidth(Width.LONG)
.withImportance(Importance.MEDIUM)
.withValidation(MongoDbConnectorConfig::validateFieldExcludeList)
@ -392,6 +409,7 @@ public static SnapshotMode parse(String value, String defaultValue) {
public static final Field FIELD_RENAMES = Field.create("field.renames")
.withDisplayName("Rename Fields")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 0))
.withWidth(Width.LONG)
.withImportance(Importance.MEDIUM)
.withValidation(MongoDbConnectorConfig::validateFieldRenamesList)
@ -400,6 +418,7 @@ public static SnapshotMode parse(String value, String defaultValue) {
public static final Field SNAPSHOT_MODE = Field.create("snapshot.mode")
.withDisplayName("Snapshot mode")
.withEnum(SnapshotMode.class, SnapshotMode.INITIAL)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 0))
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDescription("The criteria for running a snapshot upon startup of the connector. "
@ -410,6 +429,7 @@ public static SnapshotMode parse(String value, String defaultValue) {
public static final Field CONNECT_TIMEOUT_MS = Field.create("mongodb.connect.timeout.ms")
.withDisplayName("Connect Timeout MS")
.withType(Type.INT)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED, 0))
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDefault(10_000)
@ -418,6 +438,7 @@ public static SnapshotMode parse(String value, String defaultValue) {
public static final Field SERVER_SELECTION_TIMEOUT_MS = Field.create("mongodb.server.selection.timeout.ms")
.withDisplayName("Server selection timeout MS")
.withType(Type.INT)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED, 5))
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDefault(30_000)
@ -426,6 +447,7 @@ public static SnapshotMode parse(String value, String defaultValue) {
public static final Field SOCKET_TIMEOUT_MS = Field.create("mongodb.socket.timeout.ms")
.withDisplayName("Socket timeout MS")
.withType(Type.INT)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED, 6))
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDefault(0)
@ -439,6 +461,7 @@ public static SnapshotMode parse(String value, String defaultValue) {
public static final Field SNAPSHOT_FILTER_QUERY_BY_COLLECTION = Field.create("snapshot.collection.filter.overrides")
.withDisplayName("Snapshot mode")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 1))
.withWidth(Width.LONG)
.withImportance(Importance.MEDIUM)
.withDescription("This property contains a comma-separated list of <dbName>.<collectionName>, for which "

View File

@ -0,0 +1,41 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb;
import java.util.Arrays;
import java.util.List;
import org.apache.kafka.connect.source.SourceConnector;
import io.debezium.config.Field;
import io.debezium.metadata.AbstractConnectorMetadata;
import io.debezium.metadata.ConnectorDescriptor;
public class MongoDbConnectorMetadata extends AbstractConnectorMetadata {
@Override
public ConnectorDescriptor getConnectorDescriptor() {
return new ConnectorDescriptor("mongodb", "Debezium MongoDB Connector", getConnector().version());
}
@Override
public Field.Set getAllConnectorFields() {
return MongoDbConnectorConfig.ALL_FIELDS;
}
@Override
public SourceConnector getConnector() {
return new MongoDbConnector();
}
@Override
public List<String> deprecatedFieldNames() {
return Arrays.asList(
MongoDbConnectorConfig.POLL_INTERVAL_SEC.name(),
MongoDbConnectorConfig.MAX_COPY_THREADS.name());
}
}

View File

@ -0,0 +1,20 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb;
import org.junit.Before;
import io.debezium.config.AbstractFieldTest;
public class FieldTest extends AbstractFieldTest {
@Before
public void before() {
setAllConnectorFields(MongoDbConnectorConfig.ALL_FIELDS);
}
}

View File

@ -594,6 +594,7 @@ public static GtidNewChannelPosition parse(String value, String defaultValue) {
public static final Field ON_CONNECT_STATEMENTS = Field.create("database.initial.statements")
.withDisplayName("Initial statements")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED, 4))
.withWidth(Width.LONG)
.withImportance(Importance.LOW)
.withDescription(
@ -607,10 +608,12 @@ public static GtidNewChannelPosition parse(String value, String defaultValue) {
public static final Field SERVER_ID = Field.create("database.server.id")
.withDisplayName("Cluster ID")
.withType(Type.LONG)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 1))
.withWidth(Width.LONG)
.withImportance(Importance.HIGH)
.withDefault(MySqlConnectorConfig::randomServerId)
.withValidation(Field::isRequired, Field::isPositiveLong)
.required()
.withValidation(Field::isPositiveLong)
.withDescription("A numeric ID of this database client, which must be unique across all "
+ "currently-running database processes in the cluster. This connector joins the "
+ "MySQL database cluster as another server (with this unique ID) so it can read "
@ -619,6 +622,7 @@ public static GtidNewChannelPosition parse(String value, String defaultValue) {
public static final Field SERVER_ID_OFFSET = Field.create("database.server.id.offset")
.withDisplayName("Cluster ID offset")
.withType(Type.LONG)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED, 0))
.withWidth(Width.LONG)
.withImportance(Importance.HIGH)
.withDefault(10000L)
@ -630,6 +634,7 @@ public static GtidNewChannelPosition parse(String value, String defaultValue) {
public static final Field SSL_MODE = Field.create("database.ssl.mode")
.withDisplayName("SSL mode")
.withEnum(SecureConnectionMode.class, SecureConnectionMode.DISABLED)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_SSL, 0))
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withDescription("Whether to use an encrypted connection to MySQL. Options include"
@ -644,6 +649,7 @@ public static GtidNewChannelPosition parse(String value, String defaultValue) {
public static final Field SSL_KEYSTORE = Field.create("database.ssl.keystore")
.withDisplayName("SSL Keystore")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_SSL, 1))
.withWidth(Width.LONG)
.withImportance(Importance.MEDIUM)
.withDescription("Location of the Java keystore file containing an application process's own certificate and private key.");
@ -651,6 +657,7 @@ public static GtidNewChannelPosition parse(String value, String defaultValue) {
public static final Field SSL_KEYSTORE_PASSWORD = Field.create("database.ssl.keystore.password")
.withDisplayName("SSL Keystore Password")
.withType(Type.PASSWORD)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_SSL, 2))
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withDescription(
@ -660,6 +667,7 @@ public static GtidNewChannelPosition parse(String value, String defaultValue) {
public static final Field SSL_TRUSTSTORE = Field.create("database.ssl.truststore")
.withDisplayName("SSL Truststore")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_SSL, 3))
.withWidth(Width.LONG)
.withImportance(Importance.MEDIUM)
.withDescription("Location of the Java truststore file containing the collection of CA certificates trusted by this application process (trust store).");
@ -667,6 +675,7 @@ public static GtidNewChannelPosition parse(String value, String defaultValue) {
public static final Field SSL_TRUSTSTORE_PASSWORD = Field.create("database.ssl.truststore.password")
.withDisplayName("SSL Truststore Password")
.withType(Type.PASSWORD)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_SSL, 4))
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withDescription(
@ -678,6 +687,7 @@ public static GtidNewChannelPosition parse(String value, String defaultValue) {
public static final Field JDBC_DRIVER = Field.create("database.jdbc.driver")
.withDisplayName("Jdbc Driver Class Name")
.withType(Type.CLASS)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 41))
.withWidth(Width.MEDIUM)
.withDefault(com.mysql.cj.jdbc.Driver.class.getName())
.withImportance(Importance.LOW)
@ -693,6 +703,7 @@ public static GtidNewChannelPosition parse(String value, String defaultValue) {
public static final Field GTID_SOURCE_INCLUDES = Field.create("gtid.source.includes")
.withDisplayName("Include GTID sources")
.withType(Type.LIST)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 24))
.withWidth(Width.LONG)
.withImportance(Importance.HIGH)
.withDependents(TABLE_INCLUDE_LIST_NAME, TABLE_WHITELIST_NAME)
@ -707,6 +718,7 @@ public static GtidNewChannelPosition parse(String value, String defaultValue) {
public static final Field GTID_SOURCE_EXCLUDES = Field.create("gtid.source.excludes")
.withDisplayName("Exclude GTID sources")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 25))
.withWidth(Width.LONG)
.withImportance(Importance.MEDIUM)
.withValidation(MySqlConnectorConfig::validateGtidSetExcludes)
@ -725,6 +737,7 @@ public static GtidNewChannelPosition parse(String value, String defaultValue) {
public static final Field GTID_SOURCE_FILTER_DML_EVENTS = Field.create("gtid.source.filter.dml.events")
.withDisplayName("Filter DML events")
.withType(Type.BOOLEAN)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 23))
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withDefault(true)
@ -738,6 +751,7 @@ public static GtidNewChannelPosition parse(String value, String defaultValue) {
*
* Defaults to latest.
*/
@Deprecated
public static final Field GTID_NEW_CHANNEL_POSITION = Field.create("gtid.new.channel.position")
.withDisplayName("GTID start position")
.withEnum(GtidNewChannelPosition.class, GtidNewChannelPosition.EARLIEST)
@ -750,6 +764,7 @@ public static GtidNewChannelPosition parse(String value, String defaultValue) {
public static final Field CONNECTION_TIMEOUT_MS = Field.create("connect.timeout.ms")
.withDisplayName("Connection Timeout (ms)")
.withType(Type.INT)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED, 1))
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withDescription(
@ -760,6 +775,7 @@ public static GtidNewChannelPosition parse(String value, String defaultValue) {
public static final Field KEEP_ALIVE = Field.create("connect.keep.alive")
.withDisplayName("Keep connection alive (true/false)")
.withType(Type.BOOLEAN)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED, 2))
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDescription("Whether a separate thread should be used to ensure the connection is kept alive.")
@ -769,6 +785,7 @@ public static GtidNewChannelPosition parse(String value, String defaultValue) {
public static final Field KEEP_ALIVE_INTERVAL_MS = Field.create("connect.keep.alive.interval.ms")
.withDisplayName("Keep alive interval (ms)")
.withType(Type.LONG)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED, 3))
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDescription("Interval for connection checking if keep alive thread is used, given in milliseconds Defaults to 1 minute (60,000 ms).")
@ -777,7 +794,8 @@ public static GtidNewChannelPosition parse(String value, String defaultValue) {
public static final Field ROW_COUNT_FOR_STREAMING_RESULT_SETS = Field.create("min.row.count.to.stream.results")
.withDisplayName("Stream result set of size")
.withType(Type.LONG)
.withType(Type.INT)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 2))
.withWidth(Width.MEDIUM)
.withImportance(Importance.LOW)
.withDescription("The number of rows a table must contain to stream results rather than pull "
@ -789,6 +807,7 @@ public static GtidNewChannelPosition parse(String value, String defaultValue) {
public static final Field BUFFER_SIZE_FOR_BINLOG_READER = Field.create("binlog.buffer.size")
.withDisplayName("Binlog reader buffer size")
.withType(Type.INT)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 3))
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withDescription("The size of a look-ahead buffer used by the binlog reader to decide whether "
@ -816,6 +835,7 @@ public static GtidNewChannelPosition parse(String value, String defaultValue) {
public static final Field INCLUDE_SQL_QUERY = Field.create("include.query")
.withDisplayName("Include original SQL query with in change events")
.withType(Type.BOOLEAN)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 0))
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withDescription("Whether the connector should include the original SQL query that generated the change event. "
@ -827,6 +847,7 @@ public static GtidNewChannelPosition parse(String value, String defaultValue) {
public static final Field SNAPSHOT_MODE = Field.create("snapshot.mode")
.withDisplayName("Snapshot mode")
.withEnum(SnapshotMode.class, SnapshotMode.INITIAL)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 0))
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDescription("The criteria for running a snapshot upon startup of the connector. "
@ -841,6 +862,7 @@ public static GtidNewChannelPosition parse(String value, String defaultValue) {
public static final Field SNAPSHOT_LOCKING_MODE = Field.create("snapshot.locking.mode")
.withDisplayName("Snapshot locking mode")
.withEnum(SnapshotLockingMode.class, SnapshotLockingMode.MINIMAL)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 1))
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDescription("Controls how long the connector holds onto the global read lock while it is performing a snapshot. The default is 'minimal', "
@ -856,6 +878,7 @@ public static GtidNewChannelPosition parse(String value, String defaultValue) {
public static final Field SNAPSHOT_NEW_TABLES = Field.create("snapshot.new.tables")
.withDisplayName("Snapshot newly added tables")
.withEnum(SnapshotNewTables.class, SnapshotNewTables.OFF)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 4))
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDescription("BETA FEATURE: On connector restart, the connector will check if there have been any new tables added to the configuration, "
@ -868,6 +891,7 @@ public static GtidNewChannelPosition parse(String value, String defaultValue) {
public static final Field TIME_PRECISION_MODE = RelationalDatabaseConnectorConfig.TIME_PRECISION_MODE
.withEnum(TemporalPrecisionMode.class, TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 26))
.withValidation(MySqlConnectorConfig::validateTimePrecisionMode)
.withDescription("Time, date and timestamps can be represented with different kinds of precisions, including:"
+ "'adaptive_time_microseconds': the precision of date and timestamp values is based the database column's precision; but time fields always use microseconds precision;"
@ -877,6 +901,7 @@ public static GtidNewChannelPosition parse(String value, String defaultValue) {
public static final Field BIGINT_UNSIGNED_HANDLING_MODE = Field.create("bigint.unsigned.handling.mode")
.withDisplayName("BIGINT UNSIGNED Handling")
.withEnum(BigIntUnsignedHandlingMode.class, BigIntUnsignedHandlingMode.LONG)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 27))
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withDescription("Specify how BIGINT UNSIGNED columns should be represented in change events, including:"
@ -886,6 +911,7 @@ public static GtidNewChannelPosition parse(String value, String defaultValue) {
public static final Field EVENT_DESERIALIZATION_FAILURE_HANDLING_MODE = Field.create("event.deserialization.failure.handling.mode")
.withDisplayName("Event deserialization failure handling")
.withEnum(EventProcessingFailureHandlingMode.class, EventProcessingFailureHandlingMode.FAIL)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 21))
.withValidation(MySqlConnectorConfig::validateEventDeserializationFailureHandlingModeNotSet)
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
@ -897,6 +923,7 @@ public static GtidNewChannelPosition parse(String value, String defaultValue) {
public static final Field INCONSISTENT_SCHEMA_HANDLING_MODE = Field.create("inconsistent.schema.handling.mode")
.withDisplayName("Inconsistent schema failure handling")
.withEnum(EventProcessingFailureHandlingMode.class, EventProcessingFailureHandlingMode.FAIL)
.withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 2))
.withValidation(MySqlConnectorConfig::validateInconsistentSchemaHandlingModeNotIgnore)
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
@ -909,6 +936,7 @@ public static GtidNewChannelPosition parse(String value, String defaultValue) {
public static final Field ENABLE_TIME_ADJUSTER = Field.create("enable.time.adjuster")
.withDisplayName("Enable Time Adjuster")
.withType(Type.BOOLEAN)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 22))
.withDefault(true)
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
@ -960,6 +988,8 @@ public static GtidNewChannelPosition parse(String value, String defaultValue) {
BIGINT_UNSIGNED_HANDLING_MODE,
TIME_PRECISION_MODE,
ENABLE_TIME_ADJUSTER,
BINARY_HANDLING_MODE,
ROW_COUNT_FOR_STREAMING_RESULT_SETS,
INCREMENTAL_SNAPSHOT_CHUNK_SIZE)
.events(
INCLUDE_SQL_QUERY,

View File

@ -0,0 +1,37 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import java.util.Collections;
import java.util.List;
import org.apache.kafka.connect.source.SourceConnector;
import io.debezium.config.Field;
import io.debezium.metadata.AbstractConnectorMetadata;
import io.debezium.metadata.ConnectorDescriptor;
public class MySqlConnectorMetadata extends AbstractConnectorMetadata {
@Override
public ConnectorDescriptor getConnectorDescriptor() {
return new ConnectorDescriptor("mysql", "Debezium MySQL Connector", getConnector().version());
}
@Override
public SourceConnector getConnector() {
return new MySqlConnector();
}
@Override
public Field.Set getAllConnectorFields() {
return MySqlConnectorConfig.ALL_FIELDS;
}
public List<String> deprecatedFieldNames() {
return Collections.singletonList(MySqlConnectorConfig.GTID_NEW_CHANNEL_POSITION.name());
}
}

View File

@ -0,0 +1,20 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import org.junit.Before;
import io.debezium.config.AbstractFieldTest;
public class FieldTest extends AbstractFieldTest {
@Before
public void before() {
setAllConnectorFields(MySqlConnectorConfig.ALL_FIELDS);
}
}

View File

@ -637,6 +637,7 @@ public static SchemaRefreshMode parse(String value) {
public static final Field PLUGIN_NAME = Field.create("plugin.name")
.withDisplayName("Plugin")
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_REPLICATION, 0))
.withEnum(LogicalDecoder.class, LogicalDecoder.DECODERBUFS)
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
@ -653,6 +654,7 @@ public static SchemaRefreshMode parse(String value) {
public static final Field SLOT_NAME = Field.create("slot.name")
.withDisplayName("Slot")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_REPLICATION, 1))
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withDefault(ReplicationConnection.Builder.DEFAULT_SLOT_NAME)
@ -663,6 +665,7 @@ public static SchemaRefreshMode parse(String value) {
public static final Field DROP_SLOT_ON_STOP = Field.create("slot.drop.on.stop")
.withDisplayName("Drop slot on stop")
.withType(Type.BOOLEAN)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_REPLICATION, 3))
.withDefault(false)
.withImportance(Importance.MEDIUM)
.withDescription(
@ -672,6 +675,7 @@ public static SchemaRefreshMode parse(String value) {
public static final Field PUBLICATION_NAME = Field.create("publication.name")
.withDisplayName("Publication")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_REPLICATION, 8))
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withDefault(ReplicationConnection.Builder.DEFAULT_PUBLICATION_NAME)
@ -741,6 +745,7 @@ public static AutoCreateMode parse(String value, String defaultValue) {
public static final Field PUBLICATION_AUTOCREATE_MODE = Field.create("publication.autocreate.mode")
.withDisplayName("Publication Auto Create Mode")
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_REPLICATION, 9))
.withEnum(AutoCreateMode.class, AutoCreateMode.ALL_TABLES)
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
@ -760,6 +765,7 @@ public static AutoCreateMode parse(String value, String defaultValue) {
public static final Field STREAM_PARAMS = Field.create("slot.stream.params")
.withDisplayName("Optional parameters to pass to the logical decoder when the stream is started.")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_REPLICATION, 2))
.withWidth(Width.LONG)
.withImportance(Importance.LOW)
.withDescription(
@ -768,6 +774,7 @@ public static AutoCreateMode parse(String value, String defaultValue) {
public static final Field MAX_RETRIES = Field.create("slot.max.retries")
.withDisplayName("Retry count")
.withType(Type.INT)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_REPLICATION, 4))
.withImportance(Importance.LOW)
.withDefault(DEFAULT_MAX_RETRIES)
.withValidation(Field::isInteger)
@ -776,6 +783,7 @@ public static AutoCreateMode parse(String value, String defaultValue) {
public static final Field RETRY_DELAY_MS = Field.create("slot.retry.delay.ms")
.withDisplayName("Retry delay")
.withType(Type.LONG)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_REPLICATION, 5))
.withImportance(Importance.LOW)
.withDefault(Duration.ofSeconds(10).toMillis())
.withValidation(Field::isInteger)
@ -785,6 +793,7 @@ public static AutoCreateMode parse(String value, String defaultValue) {
public static final Field ON_CONNECT_STATEMENTS = Field.create(DATABASE_CONFIG_PREFIX + JdbcConfiguration.ON_CONNECT_STATEMENTS)
.withDisplayName("Initial statements")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED, 1))
.withWidth(Width.LONG)
.withImportance(Importance.LOW)
.withDescription("A semicolon separated list of SQL statements to be executed when a JDBC connection to the database is established. "
@ -794,6 +803,7 @@ public static AutoCreateMode parse(String value, String defaultValue) {
public static final Field SSL_MODE = Field.create(DATABASE_CONFIG_PREFIX + "sslmode")
.withDisplayName("SSL mode")
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_SSL, 0))
.withEnum(SecureConnectionMode.class, SecureConnectionMode.DISABLED)
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
@ -807,6 +817,7 @@ public static AutoCreateMode parse(String value, String defaultValue) {
public static final Field SSL_CLIENT_CERT = Field.create(DATABASE_CONFIG_PREFIX + "sslcert")
.withDisplayName("SSL Client Certificate")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_SSL, 1))
.withWidth(Width.LONG)
.withImportance(Importance.MEDIUM)
.withDescription("File containing the SSL Certificate for the client. See the Postgres SSL docs for further information");
@ -814,6 +825,7 @@ public static AutoCreateMode parse(String value, String defaultValue) {
public static final Field SSL_CLIENT_KEY = Field.create(DATABASE_CONFIG_PREFIX + "sslkey")
.withDisplayName("SSL Client Key")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_SSL, 4))
.withWidth(Width.LONG)
.withImportance(Importance.MEDIUM)
.withDescription("File containing the SSL private key for the client. See the Postgres SSL docs for further information");
@ -821,6 +833,7 @@ public static AutoCreateMode parse(String value, String defaultValue) {
public static final Field SSL_CLIENT_KEY_PASSWORD = Field.create(DATABASE_CONFIG_PREFIX + "sslpassword")
.withDisplayName("SSL Client Key Password")
.withType(Type.PASSWORD)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_SSL, 2))
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withDescription("Password to access the client private key from the file specified by 'database.sslkey'. See the Postgres SSL docs for further information");
@ -828,6 +841,7 @@ public static AutoCreateMode parse(String value, String defaultValue) {
public static final Field SSL_ROOT_CERT = Field.create(DATABASE_CONFIG_PREFIX + "sslrootcert")
.withDisplayName("SSL Root Certificate")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_SSL, 3))
.withWidth(Width.LONG)
.withImportance(Importance.MEDIUM)
.withDescription("File containing the root certificate(s) against which the server is validated. See the Postgres JDBC SSL docs for further information");
@ -835,6 +849,7 @@ public static AutoCreateMode parse(String value, String defaultValue) {
public static final Field SSL_SOCKET_FACTORY = Field.create(DATABASE_CONFIG_PREFIX + "sslfactory")
.withDisplayName("SSL Root Certificate")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_SSL, 5))
.withWidth(Width.LONG)
.withImportance(Importance.MEDIUM)
.withDescription(
@ -842,6 +857,7 @@ public static AutoCreateMode parse(String value, String defaultValue) {
public static final Field SNAPSHOT_MODE = Field.create("snapshot.mode")
.withDisplayName("Snapshot mode")
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 0))
.withEnum(SnapshotMode.class, SnapshotMode.INITIAL)
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
@ -864,6 +880,7 @@ public static AutoCreateMode parse(String value, String defaultValue) {
public static final Field SNAPSHOT_MODE_CLASS = Field.create("snapshot.custom.class")
.withDisplayName("Snapshot Mode Custom Class")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 9))
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withValidation((config, field, output) -> {
@ -879,6 +896,7 @@ public static AutoCreateMode parse(String value, String defaultValue) {
public static final Field TRUNCATE_HANDLING_MODE = Field.create("truncate.handling.mode")
.withDisplayName("Truncate handling mode")
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 23))
.withEnum(TruncateHandlingMode.class, TruncateHandlingMode.SKIP)
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
@ -889,6 +907,7 @@ public static AutoCreateMode parse(String value, String defaultValue) {
public static final Field HSTORE_HANDLING_MODE = Field.create("hstore.handling.mode")
.withDisplayName("HStore Handling")
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 22))
.withEnum(HStoreHandlingMode.class, HStoreHandlingMode.JSON)
.withWidth(Width.MEDIUM)
.withImportance(Importance.LOW)
@ -898,6 +917,7 @@ public static AutoCreateMode parse(String value, String defaultValue) {
public static final Field INTERVAL_HANDLING_MODE = Field.create("interval.handling.mode")
.withDisplayName("Interval Handling")
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 21))
.withEnum(IntervalHandlingMode.class, IntervalHandlingMode.NUMERIC)
.withWidth(Width.MEDIUM)
.withImportance(Importance.LOW)
@ -908,6 +928,7 @@ public static AutoCreateMode parse(String value, String defaultValue) {
public static final Field STATUS_UPDATE_INTERVAL_MS = Field.create("status.update.interval.ms")
.withDisplayName("Status update interval (ms)")
.withType(Type.INT) // Postgres doesn't accept long for this value
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_REPLICATION, 6))
.withDefault(10_000)
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
@ -917,6 +938,7 @@ public static AutoCreateMode parse(String value, String defaultValue) {
public static final Field TCP_KEEPALIVE = Field.create(DATABASE_CONFIG_PREFIX + "tcpKeepAlive")
.withDisplayName("TCP keep-alive probe")
.withType(Type.BOOLEAN)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED, 0))
.withDefault(true)
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
@ -926,6 +948,7 @@ public static AutoCreateMode parse(String value, String defaultValue) {
public static final Field INCLUDE_UNKNOWN_DATATYPES = Field.create("include.unknown.datatypes")
.withDisplayName("Include unknown datatypes")
.withType(Type.BOOLEAN)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 1))
.withDefault(false)
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
@ -935,6 +958,7 @@ public static AutoCreateMode parse(String value, String defaultValue) {
public static final Field SCHEMA_REFRESH_MODE = Field.create("schema.refresh.mode")
.withDisplayName("Schema refresh mode")
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 0))
.withEnum(SchemaRefreshMode.class, SchemaRefreshMode.COLUMNS_DIFF)
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
@ -950,6 +974,7 @@ public static AutoCreateMode parse(String value, String defaultValue) {
public static final Field XMIN_FETCH_INTERVAL = Field.create("xmin.fetch.interval.ms")
.withDisplayName("Xmin fetch interval (ms)")
.withType(Type.LONG)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_REPLICATION, 7))
.withWidth(Width.SHORT)
.withDefault(0L)
.withImportance(Importance.MEDIUM)
@ -963,6 +988,7 @@ public static AutoCreateMode parse(String value, String defaultValue) {
public static final Field TOASTED_VALUE_PLACEHOLDER = Field.create("toasted.value.placeholder")
.withDisplayName("Toasted value placeholder")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 2))
.withWidth(Width.MEDIUM)
.withDefault("__debezium_unavailable_value")
.withImportance(Importance.MEDIUM)
@ -1132,6 +1158,7 @@ protected SourceInfoStructMaker<? extends AbstractSourceInfo> getSourceInfoStruc
BINARY_HANDLING_MODE,
INTERVAL_HANDLING_MODE,
SCHEMA_REFRESH_MODE,
TRUNCATE_HANDLING_MODE,
INCREMENTAL_SNAPSHOT_CHUNK_SIZE)
.excluding(INCLUDE_SCHEMA_CHANGES)
.create();

View File

@ -0,0 +1,31 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql;
import org.apache.kafka.connect.connector.Connector;
import io.debezium.config.Field;
import io.debezium.metadata.AbstractConnectorMetadata;
import io.debezium.metadata.ConnectorDescriptor;
public class PostgresConnectorMetadata extends AbstractConnectorMetadata {
@Override
public ConnectorDescriptor getConnectorDescriptor() {
return new ConnectorDescriptor("postgres", "Debezium PostgreSQL Connector", getConnector().version());
}
@Override
public Connector getConnector() {
return new PostgresConnector();
}
@Override
public Field.Set getAllConnectorFields() {
return PostgresConnectorConfig.ALL_FIELDS;
}
}

View File

@ -0,0 +1,20 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql;
import org.junit.Before;
import io.debezium.config.AbstractFieldTest;
public class FieldTest extends AbstractFieldTest {
@Before
public void before() {
setAllConnectorFields(PostgresConnectorConfig.ALL_FIELDS);
}
}

View File

@ -249,6 +249,7 @@ public static BinaryHandlingMode parse(String value, String defaultValue) {
public static final Field RETRIABLE_RESTART_WAIT = Field.create("retriable.restart.connector.wait.ms")
.withDisplayName("Retriable restart wait (ms)")
.withType(Type.LONG)
.withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 18))
.withWidth(Width.MEDIUM)
.withImportance(Importance.LOW)
.withDefault(DEFAULT_RETRIABLE_RESTART_WAIT)
@ -259,6 +260,7 @@ public static BinaryHandlingMode parse(String value, String defaultValue) {
public static final Field TOMBSTONES_ON_DELETE = Field.create("tombstones.on.delete")
.withDisplayName("Change the behaviour of Debezium with regards to delete operations")
.withType(Type.BOOLEAN)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 1))
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withDefault(true)
@ -271,6 +273,7 @@ public static BinaryHandlingMode parse(String value, String defaultValue) {
public static final Field MAX_QUEUE_SIZE = Field.create("max.queue.size")
.withDisplayName("Change event buffer size")
.withType(Type.INT)
.withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 15))
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withDescription("Maximum size of the queue for change events read from the database log but not yet recorded or forwarded. Defaults to "
@ -281,6 +284,7 @@ public static BinaryHandlingMode parse(String value, String defaultValue) {
public static final Field MAX_BATCH_SIZE = Field.create("max.batch.size")
.withDisplayName("Change event batch size")
.withType(Type.INT)
.withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 14))
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withDescription("Maximum size of each batch of source records. Defaults to " + DEFAULT_MAX_BATCH_SIZE + ".")
@ -290,6 +294,7 @@ public static BinaryHandlingMode parse(String value, String defaultValue) {
public static final Field POLL_INTERVAL_MS = Field.create("poll.interval.ms")
.withDisplayName("Poll interval (ms)")
.withType(Type.LONG)
.withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 17))
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withDescription("Time to wait for new change events to appear after receiving no events, given in milliseconds. Defaults to 500 ms.")
@ -299,6 +304,7 @@ public static BinaryHandlingMode parse(String value, String defaultValue) {
public static final Field MAX_QUEUE_SIZE_IN_BYTES = Field.create("max.queue.size.in.bytes")
.withDisplayName("Change event buffer size in bytes")
.withType(Type.LONG)
.withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 16))
.withWidth(Width.LONG)
.withImportance(Importance.MEDIUM)
.withDescription("Maximum size of the queue in bytes for change events read from the database log but not yet recorded or forwarded. Defaults to "
@ -309,6 +315,7 @@ public static BinaryHandlingMode parse(String value, String defaultValue) {
public static final Field SNAPSHOT_DELAY_MS = Field.create("snapshot.delay.ms")
.withDisplayName("Snapshot Delay (milliseconds)")
.withType(Type.LONG)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 5))
.withWidth(Width.MEDIUM)
.withImportance(Importance.LOW)
.withDescription("A delay period before a snapshot will begin, given in milliseconds. Defaults to 0 ms.")
@ -318,6 +325,7 @@ public static BinaryHandlingMode parse(String value, String defaultValue) {
public static final Field SNAPSHOT_FETCH_SIZE = Field.create("snapshot.fetch.size")
.withDisplayName("Snapshot fetch size")
.withType(Type.INT)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 3))
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withDescription("The maximum number of records that should be loaded into memory while performing a snapshot")
@ -335,6 +343,7 @@ public static BinaryHandlingMode parse(String value, String defaultValue) {
public static final Field SNAPSHOT_MODE_TABLES = Field.create("snapshot.include.collection.list")
.withDisplayName("Snapshot mode include data collection")
.withType(Type.LIST)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 2))
.withWidth(Width.LONG)
.withImportance(Importance.MEDIUM)
.withValidation(Field::isListOfRegex)
@ -343,6 +352,7 @@ public static BinaryHandlingMode parse(String value, String defaultValue) {
public static final Field SOURCE_STRUCT_MAKER_VERSION = Field.create("source.struct.version")
.withDisplayName("Source struct maker version")
.withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 19))
.withEnum(Version.class, Version.V2)
.withWidth(Width.MEDIUM)
.withImportance(Importance.LOW)
@ -352,6 +362,7 @@ public static BinaryHandlingMode parse(String value, String defaultValue) {
public static final Field SANITIZE_FIELD_NAMES = Field.create("sanitize.field.names")
.withDisplayName("Sanitize field names to adhere to Avro naming conventions")
.withType(Type.BOOLEAN)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 18))
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDescription("Whether field names will be sanitized to Avro naming conventions")
@ -360,6 +371,7 @@ public static BinaryHandlingMode parse(String value, String defaultValue) {
public static final Field PROVIDE_TRANSACTION_METADATA = Field.create("provide.transaction.metadata")
.withDisplayName("Store transaction metadata information in a dedicated topic.")
.withType(Type.BOOLEAN)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 17))
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDescription("Enables transaction metadata extraction together with event counting")
@ -367,6 +379,7 @@ public static BinaryHandlingMode parse(String value, String defaultValue) {
public static final Field EVENT_PROCESSING_FAILURE_HANDLING_MODE = Field.create("event.processing.failure.handling.mode")
.withDisplayName("Event deserialization failure handling")
.withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 12))
.withEnum(EventProcessingFailureHandlingMode.class, EventProcessingFailureHandlingMode.FAIL)
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
@ -378,6 +391,7 @@ public static BinaryHandlingMode parse(String value, String defaultValue) {
public static final Field CUSTOM_CONVERTERS = Field.create("converters")
.withDisplayName("List of prefixes defining custom values converters.")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 10))
.withWidth(Width.MEDIUM)
.withImportance(Importance.LOW)
.withDescription("Optional list of custom converters that would be used instead of default ones. "
@ -386,6 +400,7 @@ public static BinaryHandlingMode parse(String value, String defaultValue) {
public static final Field SKIPPED_OPERATIONS = Field.create("skipped.operations")
.withDisplayName("skipped Operations")
.withType(Type.LIST)
.withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 11))
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withValidation(CommonConnectorConfig::validateSkippedOperation)
@ -394,6 +409,7 @@ public static BinaryHandlingMode parse(String value, String defaultValue) {
public static final Field BINARY_HANDLING_MODE = Field.create("binary.handling.mode")
.withDisplayName("Binary Handling")
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 3))
.withEnum(BinaryHandlingMode.class, BinaryHandlingMode.BYTES)
.withWidth(Width.MEDIUM)
.withImportance(Importance.LOW)
@ -405,6 +421,7 @@ public static BinaryHandlingMode parse(String value, String defaultValue) {
public static final Field QUERY_FETCH_SIZE = Field.create("query.fetch.size")
.withDisplayName("Query fetch size")
.withType(Type.INT)
.withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 13))
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withDescription("The maximum number of records that should be loaded into memory while streaming. A value of `0` uses the default JDBC fetch size.")
@ -414,6 +431,7 @@ public static BinaryHandlingMode parse(String value, String defaultValue) {
public static final Field SNAPSHOT_MAX_THREADS = Field.create("snapshot.max.threads")
.withDisplayName("Snapshot maximum threads")
.withType(Type.INT)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 7))
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withDefault(1)
@ -422,6 +440,7 @@ public static BinaryHandlingMode parse(String value, String defaultValue) {
public static final Field SIGNAL_DATA_COLLECTION = Field.create("signal.data.collection")
.withDisplayName("Signaling data collection")
.withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 20))
.withType(Type.STRING)
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)

View File

@ -101,7 +101,7 @@ private void addToList(List<Field> list, List<Field> fields) {
private void addToConfigDef(ConfigDef configDef, String group, List<Field> fields) {
if (!fields.isEmpty()) {
Field.group(configDef, group, fields.toArray(new Field[fields.size()]));
Field.group(configDef, group, fields.toArray(new Field[0]));
}
}
}

View File

@ -832,7 +832,7 @@ public static Configuration from(Map<String, ?> properties) {
* @return the configuration; never null
*/
public static <T> Configuration from(Map<String, T> properties, Function<T, String> conversion) {
Map<String, Object> props = new HashMap<>();
Map<String, T> props = new HashMap<>();
if (properties != null) {
props.putAll(properties);
}

View File

@ -108,7 +108,7 @@ public Iterator<Field> iterator() {
* @return the array of fields; never null
*/
public Field[] asArray() {
return fieldsByName.values().toArray(new Field[fieldsByName.size()]);
return fieldsByName.values().toArray(new Field[0]);
}
/**
@ -168,13 +168,17 @@ public Set with(Iterable<Field> fields) {
});
return new Set(all);
}
public java.util.Set<String> allFieldNames() {
return this.fieldsByName.keySet();
}
}
/**
* A functional interface that accepts validation results.
*/
@FunctionalInterface
public static interface ValidationOutput {
public interface ValidationOutput {
/**
* Accept a problem with the given value for the field.
* @param field the field with the value; may not be null
@ -188,7 +192,7 @@ public static interface ValidationOutput {
* A functional interface that can be used to validate field values.
*/
@FunctionalInterface
public static interface Validator {
public interface Validator {
/**
* Validate the supplied value for the field, and report any problems to the designated consumer.
@ -221,7 +225,7 @@ default Validator and(Validator other) {
* In case that there are {@link Field#dependents() dependencies} between fields, the valid values and visibility
* for a field may change given the values of other fields.
*/
public static interface Recommender {
public interface Recommender {
/**
* Return a set of recommended (and valid) values for the field given the current configuration values.
* @param field the field for which the recommended values are to be found; may not be null
@ -239,6 +243,46 @@ public static interface Recommender {
public boolean visible(Field field, Configuration config);
}
public enum Group {
CONNECTION,
CONNECTION_ADVANCED_SSL,
CONNECTION_ADVANCED,
CONNECTION_ADVANCED_REPLICATION,
CONNECTION_ADVANCED_PUBLICATION,
FILTERS,
CONNECTOR_SNAPSHOT,
CONNECTOR,
ADVANCED_HEARTBEAT,
CONNECTOR_ADVANCED,
ADVANCED
};
public static class GroupEntry {
private final Group group;
private final int positionInGroup;
GroupEntry(Group group, int positionInGroup) {
this.group = group;
this.positionInGroup = positionInGroup;
}
public Group getGroup() {
return group;
}
public int getPositionInGroup() {
return positionInGroup;
}
}
public static GroupEntry createGroupEntry(Group group) {
return new GroupEntry(group, 9999);
}
public static GroupEntry createGroupEntry(Group group, int positionInGroup) {
return new GroupEntry(group, positionInGroup);
}
/**
* Create an immutable {@link Field} instance with the given property name.
* @param name the name of the field; may not be null
@ -417,6 +461,9 @@ public static ConfigDef group(ConfigDef configDef, String groupName, Field... fi
private final Importance importance;
private final List<String> dependents;
private final Recommender recommender;
private final java.util.Set<?> allowedValues;
private final GroupEntry group;
private final boolean isRequired;
protected Field(String name, String displayName, Type type, Width width, String description, Importance importance,
Supplier<Object> defaultValueGenerator, Validator validator) {
@ -426,6 +473,13 @@ protected Field(String name, String displayName, Type type, Width width, String
protected Field(String name, String displayName, Type type, Width width, String description, Importance importance,
List<String> dependents, Supplier<Object> defaultValueGenerator, Validator validator,
Recommender recommender) {
this(name, displayName, type, width, description, importance, dependents, defaultValueGenerator, validator,
recommender, false, Field.createGroupEntry(Group.ADVANCED), Collections.emptySet());
}
protected Field(String name, String displayName, Type type, Width width, String description, Importance importance,
List<String> dependents, Supplier<Object> defaultValueGenerator, Validator validator,
Recommender recommender, boolean isRequired, GroupEntry group, java.util.Set<?> allowedValues) {
Objects.requireNonNull(name, "The field name is required");
this.name = name;
this.displayName = displayName;
@ -437,6 +491,9 @@ protected Field(String name, String displayName, Type type, Width width, String
this.importance = importance != null ? importance : Importance.MEDIUM;
this.dependents = dependents != null ? dependents : Collections.emptyList();
this.recommender = recommender;
this.isRequired = isRequired;
this.group = group;
this.allowedValues = allowedValues;
assert this.name != null;
}
@ -529,6 +586,30 @@ public Recommender recommender() {
return recommender;
}
/**
* Get if the field is required/mandatory.
* @return if the field is required; true or false
*/
public boolean isRequired() {
return isRequired;
}
/**
* Get the group of this field.
* @return the group
*/
public GroupEntry group() {
return group;
}
/**
* Get the allowed values for this field.
* @return the java.util.Set of allowed values; may be null if there's no set of specific values
*/
public java.util.Set<?> allowedValues() {
return allowedValues;
}
/**
* Validate the supplied value for this field, and report any problems to the designated consumer.
* @param config the field values keyed by their name; may not be null
@ -555,7 +636,7 @@ public boolean validate(Configuration config, ValidationOutput problems) {
*/
protected void validate(Configuration config, Function<String, Field> fieldSupplier, Map<String, ConfigValue> results) {
// First, merge any new recommended values ...
ConfigValue value = results.computeIfAbsent(this.name(), n -> new ConfigValue(n));
ConfigValue value = results.computeIfAbsent(this.name(), ConfigValue::new);
// Apply the validator ...
validate(config, (f, v, problem) -> {
@ -597,7 +678,7 @@ protected void validate(Configuration config, Function<String, Field> fieldSuppl
*/
public Field withDescription(String description) {
return new Field(name(), displayName, type(), width, description, importance(), dependents,
defaultValueGenerator, validator, recommender);
defaultValueGenerator, validator, recommender, isRequired, group, allowedValues);
}
/**
@ -608,7 +689,7 @@ public Field withDescription(String description) {
*/
public Field withDisplayName(String displayName) {
return new Field(name(), displayName, type(), width, description(), importance(), dependents,
defaultValueGenerator, validator, recommender);
defaultValueGenerator, validator, recommender, isRequired, group, allowedValues);
}
/**
@ -618,7 +699,7 @@ public Field withDisplayName(String displayName) {
*/
public Field withWidth(Width width) {
return new Field(name(), displayName(), type(), width, description(), importance(), dependents,
defaultValueGenerator, validator, recommender);
defaultValueGenerator, validator, recommender, isRequired, group, allowedValues);
}
/**
@ -628,7 +709,7 @@ public Field withWidth(Width width) {
*/
public Field withType(Type type) {
return new Field(name(), displayName(), type, width(), description(), importance(), dependents,
defaultValueGenerator, validator, recommender);
defaultValueGenerator, validator, recommender, isRequired, group, allowedValues);
}
/**
@ -654,7 +735,8 @@ public <T extends Enum<T>> Field withEnum(Class<T> enumType) {
*/
public <T extends Enum<T>> Field withEnum(Class<T> enumType, T defaultOption) {
EnumRecommender<T> recommendator = new EnumRecommender<>(enumType);
Field result = withType(Type.STRING).withRecommender(recommendator).withValidation(recommendator);
Field result = withType(Type.STRING).withRecommender(recommendator).withValidation(recommendator)
.withAllowedValues(getEnumLiterals(enumType));
// Not all enums support EnumeratedValue yet
if (defaultOption != null) {
if (defaultOption instanceof EnumeratedValue) {
@ -667,6 +749,22 @@ public <T extends Enum<T>> Field withEnum(Class<T> enumType, T defaultOption) {
return result;
}
public Field required() {
return new Field(name(), displayName(), type(), width(), description(), importance, dependents,
defaultValueGenerator, validator, recommender, true, group, allowedValues)
.withValidation(Field::isRequired);
}
public Field withGroup(GroupEntry group) {
return new Field(name(), displayName(), type(), width(), description(), importance, dependents,
defaultValueGenerator, validator, recommender, isRequired, group, allowedValues);
}
public Field withAllowedValues(java.util.Set<?> allowedValues) {
return new Field(name(), displayName(), type(), width(), description(), importance, dependents,
defaultValueGenerator, validator, recommender, isRequired, group, allowedValues);
}
/**
* Create and return a new Field instance that is a copy of this field but with the given importance.
* @param importance the new importance for the field
@ -674,7 +772,7 @@ public <T extends Enum<T>> Field withEnum(Class<T> enumType, T defaultOption) {
*/
public Field withImportance(Importance importance) {
return new Field(name(), displayName(), type(), width(), description(), importance, dependents,
defaultValueGenerator, validator, recommender);
defaultValueGenerator, validator, recommender, isRequired, group, allowedValues);
}
/**
@ -684,7 +782,7 @@ public Field withImportance(Importance importance) {
*/
public Field withDependents(String... dependents) {
return new Field(name(), displayName(), type(), width, description(), importance(),
Arrays.asList(dependents), defaultValueGenerator, validator, recommender);
Arrays.asList(dependents), defaultValueGenerator, validator, recommender, isRequired, group, allowedValues);
}
/**
@ -694,7 +792,7 @@ public Field withDependents(String... dependents) {
*/
public Field withDefault(String defaultValue) {
return new Field(name(), displayName(), type(), width, description(), importance(), dependents,
() -> defaultValue, validator, recommender);
() -> defaultValue, validator, recommender, isRequired, group, allowedValues);
}
/**
@ -705,7 +803,7 @@ public Field withDefault(String defaultValue) {
*/
public Field withDefault(boolean defaultValue) {
return new Field(name(), displayName(), type(), width, description(), importance(), dependents,
() -> Boolean.valueOf(defaultValue), validator, recommender);
() -> Boolean.valueOf(defaultValue), validator, recommender, isRequired, group, allowedValues);
}
/**
@ -715,7 +813,7 @@ public Field withDefault(boolean defaultValue) {
*/
public Field withDefault(int defaultValue) {
return new Field(name(), displayName(), type(), width, description(), importance(), dependents,
() -> defaultValue, validator, recommender);
() -> defaultValue, validator, recommender, isRequired, group, allowedValues);
}
/**
@ -726,7 +824,7 @@ public Field withDefault(int defaultValue) {
*/
public Field withDefault(long defaultValue) {
return new Field(name(), displayName(), type(), width, description(), importance(), dependents,
() -> defaultValue, validator, recommender);
() -> defaultValue, validator, recommender, isRequired, group, allowedValues);
}
/**
@ -738,7 +836,7 @@ public Field withDefault(long defaultValue) {
*/
public Field withDefault(BooleanSupplier defaultValueGenerator) {
return new Field(name(), displayName(), type(), width, description(), importance(), dependents,
defaultValueGenerator::getAsBoolean, validator, recommender);
defaultValueGenerator::getAsBoolean, validator, recommender, isRequired, group, allowedValues);
}
/**
@ -750,7 +848,7 @@ public Field withDefault(BooleanSupplier defaultValueGenerator) {
*/
public Field withDefault(IntSupplier defaultValueGenerator) {
return new Field(name(), displayName(), type(), width, description(), importance(), dependents,
defaultValueGenerator::getAsInt, validator, recommender);
defaultValueGenerator::getAsInt, validator, recommender, isRequired, group, allowedValues);
}
/**
@ -762,7 +860,7 @@ public Field withDefault(IntSupplier defaultValueGenerator) {
*/
public Field withDefault(LongSupplier defaultValueGenerator) {
return new Field(name(), displayName(), type(), width, description(), importance(), dependents,
defaultValueGenerator::getAsLong, validator, recommender);
defaultValueGenerator::getAsLong, validator, recommender, isRequired, group, allowedValues);
}
/**
@ -773,7 +871,7 @@ public Field withDefault(LongSupplier defaultValueGenerator) {
*/
public Field withRecommender(Recommender recommender) {
return new Field(name(), displayName(), type(), width, description(), importance(), dependents,
defaultValueGenerator, validator, recommender);
defaultValueGenerator, validator, recommender, isRequired, group, allowedValues);
}
public Field withInvisibleRecommender() {
@ -787,7 +885,7 @@ public Field withInvisibleRecommender() {
*/
public Field withNoValidation() {
return new Field(name(), displayName(), type(), width, description(), importance(), dependents,
defaultValueGenerator, null, recommender);
defaultValueGenerator, null, recommender, isRequired, group, allowedValues);
}
/**
@ -805,7 +903,7 @@ public Field withValidation(Validator... validators) {
}
}
return new Field(name(), displayName(), type(), width(), description(), importance(), dependents,
defaultValueGenerator, actualValidator, recommender);
defaultValueGenerator, actualValidator, recommender, isRequired, group, allowedValues);
}
@Override
@ -945,6 +1043,21 @@ public boolean visible(Field field, Configuration config) {
}
}
private static <T extends Enum<T>> java.util.Set<String> getEnumLiterals(Class<T> enumType) {
if (Arrays.asList(enumType.getInterfaces()).contains(EnumeratedValue.class)) {
return Arrays.stream(enumType.getEnumConstants())
.map(x -> ((EnumeratedValue) x).getValue())
.map(String::toLowerCase)
.collect(Collectors.toSet());
}
else {
return Arrays.stream(enumType.getEnumConstants())
.map(Enum::name)
.map(String::toLowerCase)
.collect(Collectors.toSet());
}
}
public static class EnumRecommender<T extends Enum<T>> implements Recommender, Validator {
private final List<Object> validValues;
@ -953,18 +1066,7 @@ public static class EnumRecommender<T extends Enum<T>> implements Recommender, V
public EnumRecommender(Class<T> enumType) {
// Not all enums support EnumeratedValue yet
if (Arrays.asList(enumType.getInterfaces()).contains(EnumeratedValue.class)) {
this.literals = Arrays.stream(enumType.getEnumConstants())
.map(x -> ((EnumeratedValue) x).getValue())
.map(String::toLowerCase)
.collect(Collectors.toSet());
}
else {
this.literals = Arrays.stream(enumType.getEnumConstants())
.map(Enum::name)
.map(String::toLowerCase)
.collect(Collectors.toSet());
}
this.literals = getEnumLiterals(enumType);
this.validValues = Collections.unmodifiableList(new ArrayList<>(this.literals));
this.literalsStr = Strings.join(", ", validValues);
}

View File

@ -29,6 +29,7 @@ public class DatabaseHeartbeatImpl extends HeartbeatImpl {
public static final Field HEARTBEAT_ACTION_QUERY = Field.create(HEARTBEAT_ACTION_QUERY_PROPERTY_NAME)
.withDisplayName("An optional query to execute with every heartbeat")
.withType(ConfigDef.Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.ADVANCED_HEARTBEAT, 2))
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.LOW)
.withDescription("The query executed with every heartbeat.");

View File

@ -41,6 +41,7 @@ interface OffsetProducer {
Field HEARTBEAT_INTERVAL = Field.create(HEARTBEAT_INTERVAL_PROPERTY_NAME)
.withDisplayName("Connector heartbeat interval (milli-seconds)")
.withType(Type.INT)
.withGroup(Field.createGroupEntry(Field.Group.ADVANCED_HEARTBEAT, 0))
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withDescription("Length of an interval in milli-seconds in in which the connector periodically sends heartbeat messages "
@ -53,6 +54,7 @@ interface OffsetProducer {
Field HEARTBEAT_TOPICS_PREFIX = Field.create("heartbeat.topics.prefix")
.withDisplayName("A prefix used for naming of heartbeat topics")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.ADVANCED_HEARTBEAT, 1))
.withWidth(Width.MEDIUM)
.withImportance(Importance.LOW)
.withDescription("The prefix that is used to name heartbeat topics."

View File

@ -0,0 +1,26 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.metadata;
import java.util.Collections;
import java.util.List;
import org.apache.kafka.connect.connector.Connector;
import io.debezium.config.Field;
public abstract class AbstractConnectorMetadata {
public abstract ConnectorDescriptor getConnectorDescriptor();
public abstract Connector getConnector();
public abstract Field.Set getAllConnectorFields();
public List<String> deprecatedFieldNames() {
return Collections.emptyList();
}
}

View File

@ -0,0 +1,19 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.metadata;
public class ConnectorDescriptor {
public final String id;
public final String name;
public final String version;
public ConnectorDescriptor(String id, String name, String version) {
this.id = id;
this.name = name;
this.version = version;
}
}

View File

@ -148,14 +148,16 @@ public static DecimalHandlingMode parse(String value, String defaultValue) {
public static final Field HOSTNAME = Field.create(DATABASE_CONFIG_PREFIX + JdbcConfiguration.HOSTNAME)
.withDisplayName("Hostname")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 2))
.withWidth(Width.MEDIUM)
.withImportance(Importance.HIGH)
.withValidation(Field::isRequired)
.required()
.withDescription("Resolvable hostname or IP address of the database server.");
public static final Field PORT = Field.create(DATABASE_CONFIG_PREFIX + JdbcConfiguration.PORT)
.withDisplayName("Port")
.withType(Type.INT)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 3))
.withWidth(Width.SHORT)
.withImportance(Importance.HIGH)
.withValidation(Field::isInteger)
@ -164,14 +166,16 @@ public static DecimalHandlingMode parse(String value, String defaultValue) {
public static final Field USER = Field.create(DATABASE_CONFIG_PREFIX + JdbcConfiguration.USER)
.withDisplayName("User")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 4))
.withWidth(Width.SHORT)
.withImportance(Importance.HIGH)
.withValidation(Field::isRequired)
.required()
.withDescription("Name of the database user to be used when connecting to the database.");
public static final Field PASSWORD = Field.create(DATABASE_CONFIG_PREFIX + JdbcConfiguration.PASSWORD)
.withDisplayName("Password")
.withType(Type.PASSWORD)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 5))
.withWidth(Width.SHORT)
.withImportance(Importance.HIGH)
.withDescription("Password of the database user to be used when connecting to the database.");
@ -179,17 +183,20 @@ public static DecimalHandlingMode parse(String value, String defaultValue) {
public static final Field DATABASE_NAME = Field.create(DATABASE_CONFIG_PREFIX + JdbcConfiguration.DATABASE)
.withDisplayName("Database")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 6))
.withWidth(Width.MEDIUM)
.withImportance(Importance.HIGH)
.withValidation(Field::isRequired)
.required()
.withDescription("The name of the database from which the connector should capture changes");
public static final Field SERVER_NAME = Field.create(DATABASE_CONFIG_PREFIX + "server.name")
.withDisplayName("Namespace")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 0))
.withWidth(Width.MEDIUM)
.withImportance(Importance.HIGH)
.withValidation(Field::isRequired, RelationalDatabaseConnectorConfig::validateServerName)
.required()
.withValidation(RelationalDatabaseConnectorConfig::validateServerName)
.withDescription("Unique name that identifies the database server and all "
+ "recorded offsets, and that is used as a prefix for all schemas and topics. "
+ "Each distinct installation should have a separate namespace and be monitored by "
@ -204,6 +211,7 @@ public static DecimalHandlingMode parse(String value, String defaultValue) {
public static final Field TABLE_INCLUDE_LIST = Field.create(TABLE_INCLUDE_LIST_NAME)
.withDisplayName("Include Tables")
.withType(Type.LIST)
.withGroup(Field.createGroupEntry(Field.Group.FILTERS, 2))
.withWidth(Width.LONG)
.withImportance(Importance.HIGH)
.withValidation(Field::isListOfRegex)
@ -230,6 +238,7 @@ public static DecimalHandlingMode parse(String value, String defaultValue) {
public static final Field TABLE_EXCLUDE_LIST = Field.create(TABLE_EXCLUDE_LIST_NAME)
.withDisplayName("Exclude Tables")
.withType(Type.LIST)
.withGroup(Field.createGroupEntry(Field.Group.FILTERS, 3))
.withWidth(Width.LONG)
.withImportance(Importance.MEDIUM)
.withValidation(Field::isListOfRegex, RelationalDatabaseConnectorConfig::validateTableExcludeList)
@ -253,6 +262,7 @@ public static DecimalHandlingMode parse(String value, String defaultValue) {
public static final Field TABLE_IGNORE_BUILTIN = Field.create("table.ignore.builtin")
.withDisplayName("Ignore system databases")
.withType(Type.BOOLEAN)
.withGroup(Field.createGroupEntry(Field.Group.FILTERS, 6))
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDefault(true)
@ -268,6 +278,7 @@ public static DecimalHandlingMode parse(String value, String defaultValue) {
public static final Field COLUMN_EXCLUDE_LIST = Field.create("column.exclude.list")
.withDisplayName("Exclude Columns")
.withType(Type.LIST)
.withGroup(Field.createGroupEntry(Field.Group.FILTERS, 5))
.withWidth(Width.LONG)
.withImportance(Importance.MEDIUM)
.withValidation(Field::isListOfRegex, RelationalDatabaseConnectorConfig::validateColumnExcludeList)
@ -295,6 +306,7 @@ public static DecimalHandlingMode parse(String value, String defaultValue) {
public static final Field COLUMN_INCLUDE_LIST = Field.create("column.include.list")
.withDisplayName("Include Columns")
.withType(Type.LIST)
.withGroup(Field.createGroupEntry(Field.Group.FILTERS, 4))
.withWidth(Width.LONG)
.withImportance(Importance.MEDIUM)
.withValidation(Field::isListOfRegex)
@ -316,6 +328,7 @@ public static DecimalHandlingMode parse(String value, String defaultValue) {
public static final Field MSG_KEY_COLUMNS = Field.create("message.key.columns")
.withDisplayName("Columns PK mapping")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 16))
.withWidth(Width.LONG)
.withImportance(Importance.MEDIUM)
.withValidation(RelationalDatabaseConnectorConfig::validateMessageKeyColumnsField)
@ -328,6 +341,7 @@ public static DecimalHandlingMode parse(String value, String defaultValue) {
public static final Field DECIMAL_HANDLING_MODE = Field.create("decimal.handling.mode")
.withDisplayName("Decimal Handling")
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 2))
.withEnum(DecimalHandlingMode.class, DecimalHandlingMode.PRECISE)
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
@ -339,6 +353,7 @@ public static DecimalHandlingMode parse(String value, String defaultValue) {
public static final Field SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE = Field.create("snapshot.select.statement.overrides")
.withDisplayName("List of tables where the default select statement used during snapshotting should be overridden.")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 8))
.withWidth(Width.LONG)
.withImportance(Importance.MEDIUM)
.withDescription(" This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on the"
@ -357,6 +372,7 @@ public static DecimalHandlingMode parse(String value, String defaultValue) {
public static final Field SCHEMA_INCLUDE_LIST = Field.create(SCHEMA_INCLUDE_LIST_NAME)
.withDisplayName("Include Schemas")
.withType(Type.LIST)
.withGroup(Field.createGroupEntry(Field.Group.FILTERS, 0))
.withWidth(Width.LONG)
.withImportance(Importance.HIGH)
.withValidation(Field::isListOfRegex)
@ -384,6 +400,7 @@ public static DecimalHandlingMode parse(String value, String defaultValue) {
public static final Field SCHEMA_EXCLUDE_LIST = Field.create(SCHEMA_EXCLUDE_LIST_NAME)
.withDisplayName("Exclude Schemas")
.withType(Type.LIST)
.withGroup(Field.createGroupEntry(Field.Group.FILTERS, 1))
.withWidth(Width.LONG)
.withImportance(Importance.MEDIUM)
.withValidation(Field::isListOfRegex, RelationalDatabaseConnectorConfig::validateSchemaExcludeList)
@ -410,6 +427,7 @@ public static DecimalHandlingMode parse(String value, String defaultValue) {
public static final Field DATABASE_INCLUDE_LIST = Field.create(DATABASE_INCLUDE_LIST_NAME)
.withDisplayName("Include Databases")
.withType(Type.LIST)
.withGroup(Field.createGroupEntry(Field.Group.FILTERS, 0))
.withWidth(Width.LONG)
.withImportance(Importance.HIGH)
.withDependents(TABLE_INCLUDE_LIST_NAME, TABLE_WHITELIST_NAME)
@ -433,6 +451,7 @@ public static DecimalHandlingMode parse(String value, String defaultValue) {
public static final Field DATABASE_EXCLUDE_LIST = Field.create(DATABASE_EXCLUDE_LIST_NAME)
.withDisplayName("Exclude Databases")
.withType(Type.LIST)
.withGroup(Field.createGroupEntry(Field.Group.FILTERS, 1))
.withWidth(Width.LONG)
.withImportance(Importance.MEDIUM)
.withValidation(Field::isListOfRegex, RelationalDatabaseConnectorConfig::validateDatabaseExcludeList)
@ -451,6 +470,7 @@ public static DecimalHandlingMode parse(String value, String defaultValue) {
public static final Field TIME_PRECISION_MODE = Field.create("time.precision.mode")
.withDisplayName("Time Precision")
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 4))
.withEnum(TemporalPrecisionMode.class, TemporalPrecisionMode.ADAPTIVE)
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
@ -462,8 +482,9 @@ public static DecimalHandlingMode parse(String value, String defaultValue) {
public static final Field SNAPSHOT_LOCK_TIMEOUT_MS = Field.create("snapshot.lock.timeout.ms")
.withDisplayName("Snapshot lock timeout (ms)")
.withWidth(Width.LONG)
.withType(Type.LONG)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 6))
.withWidth(Width.LONG)
.withImportance(Importance.MEDIUM)
.withDefault(DEFAULT_SNAPSHOT_LOCK_TIMEOUT_MILLIS)
.withDescription("The maximum number of millis to wait for table locks at the beginning of a snapshot. If locks cannot be acquired in this " +
@ -474,6 +495,7 @@ public static DecimalHandlingMode parse(String value, String defaultValue) {
public static final Field INCLUDE_SCHEMA_CHANGES = Field.create("include.schema.changes")
.withDisplayName("Include database schema changes")
.withType(Type.BOOLEAN)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 0))
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withDescription("Whether the connector should publish changes in the database schema to a Kafka topic with "
@ -485,6 +507,7 @@ public static DecimalHandlingMode parse(String value, String defaultValue) {
public static final Field MASK_COLUMN_WITH_HASH = Field.create("column.mask.hash.([^.]+).with.salt.(.+)")
.withDisplayName("Mask Columns Using Hash and Salt")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 13))
.withWidth(Width.LONG)
.withImportance(Importance.MEDIUM)
.withDescription("A comma-separated list of regular expressions matching fully-qualified names of columns that should "
@ -492,12 +515,15 @@ public static DecimalHandlingMode parse(String value, String defaultValue) {
public static final Field MASK_COLUMN = Field.create("column.mask.with.(d+).chars")
.withDisplayName("Mask Columns With n Asterisks")
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 12))
.withValidation(Field::isInteger)
.withDescription("A comma-separated list of regular expressions matching fully-qualified names of columns that should "
+ "be masked with configured amount of asterisk ('*') characters.");
public static final Field TRUNCATE_COLUMN = Field.create("column.truncate.to.(d+).chars")
.withDisplayName("Truncate Columns To n Characters")
.withType(Type.INT)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 11))
.withValidation(Field::isInteger)
.withDescription("A comma-separated list of regular expressions matching fully-qualified names of columns that should "
+ "be truncated to the configured amount of characters.");
@ -505,6 +531,7 @@ public static DecimalHandlingMode parse(String value, String defaultValue) {
public static final Field PROPAGATE_COLUMN_SOURCE_TYPE = Field.create("column.propagate.source.type")
.withDisplayName("Propagate Source Types by Columns")
.withType(Type.LIST)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 15))
.withValidation(Field::isListOfRegex)
.withDescription("A comma-separated list of regular expressions matching fully-qualified names of columns that "
+ " adds the columns original type and original length as parameters to the corresponding field schemas in the emitted change records.");
@ -512,6 +539,7 @@ public static DecimalHandlingMode parse(String value, String defaultValue) {
public static final Field PROPAGATE_DATATYPE_SOURCE_TYPE = Field.create("datatype.propagate.source.type")
.withDisplayName("Propagate Source Types by Data Type")
.withType(Type.LIST)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 14))
.withValidation(Field::isListOfRegex)
.withDescription("A comma-separated list of regular expressions matching the database-specific data type names that "
+ "adds the data type's original type and original length as parameters to the corresponding field schemas in the emitted change records.");
@ -519,6 +547,7 @@ public static DecimalHandlingMode parse(String value, String defaultValue) {
public static final Field SNAPSHOT_FULL_COLUMN_SCAN_FORCE = Field.createInternal("snapshot.scan.all.columns.force")
.withDisplayName("Snapshot force scan all columns of all tables")
.withType(Type.BOOLEAN)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 999))
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDescription("Restore pre 1.5 behaviour and scan all tables to discover columns."

View File

@ -38,7 +38,7 @@ public final class FileDatabaseHistory extends AbstractDatabaseHistory {
public static final Field FILE_PATH = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "file.filename")
.withDescription("The path to the file that will be used to record the database history")
.withValidation(Field::isRequired);
.required();
public static Collection<Field> ALL_FIELDS = Collect.arrayListOf(FILE_PATH);

View File

@ -97,6 +97,7 @@ public class KafkaDatabaseHistory extends AbstractDatabaseHistory {
public static final Field TOPIC = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "kafka.topic")
.withDisplayName("Database history topic name")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 32))
.withWidth(Width.LONG)
.withImportance(Importance.HIGH)
.withDescription("The name of the topic for the database schema history")
@ -105,6 +106,7 @@ public class KafkaDatabaseHistory extends AbstractDatabaseHistory {
public static final Field BOOTSTRAP_SERVERS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "kafka.bootstrap.servers")
.withDisplayName("Kafka broker addresses")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 31))
.withWidth(Width.LONG)
.withImportance(Importance.HIGH)
.withDescription("A list of host/port pairs that the connector will use for establishing the initial "
@ -117,6 +119,7 @@ public class KafkaDatabaseHistory extends AbstractDatabaseHistory {
+ "kafka.recovery.poll.interval.ms")
.withDisplayName("Poll interval during database history recovery (ms)")
.withType(Type.INT)
.withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 1))
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDescription("The number of milliseconds to wait while polling for persisted data during recovery.")
@ -126,6 +129,7 @@ public class KafkaDatabaseHistory extends AbstractDatabaseHistory {
public static final Field RECOVERY_POLL_ATTEMPTS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "kafka.recovery.attempts")
.withDisplayName("Max attempts to recovery database history")
.withType(Type.INT)
.withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 0))
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDescription("The number of attempts in a row that no data are returned from Kafka before recover completes. "

View File

@ -59,14 +59,16 @@ public class ByLogicalTableRouter<R extends ConnectRecord<R>> implements Transfo
.withType(ConfigDef.Type.STRING)
.withWidth(ConfigDef.Width.LONG)
.withImportance(ConfigDef.Importance.LOW)
.withValidation(Field::isRequired, Field::isRegex)
.required()
.withValidation(Field::isRegex)
.withDescription("The regex used for extracting the name of the logical table from the original topic name.");
private static final Field TOPIC_REPLACEMENT = Field.create("topic.replacement")
.withDisplayName("Topic replacement")
.withType(ConfigDef.Type.STRING)
.withWidth(ConfigDef.Width.LONG)
.withImportance(ConfigDef.Importance.LOW)
.withValidation(Field::isRequired)
.required()
.withDescription("The replacement string used in conjunction with " + TOPIC_REGEX.name() +
". This will be used to create the new topic name.");
private static final Field KEY_ENFORCE_UNIQUENESS = Field.create("key.enforce.uniqueness")

View File

@ -0,0 +1,55 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.config;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.junit.Test;
/**
* @author Randall Hauch
*
*/
public abstract class AbstractFieldTest {
private Field.Set allConnectorFields;
public void setAllConnectorFields(Field.Set allConnectorFields) {
this.allConnectorFields = allConnectorFields;
}
@Test
public void shouldNotAllowDuplicateGroupAssignment() {
Map<Field.Group, Map<Integer, String>> groups = new HashMap<>();
List<String> errors = new ArrayList<>();
allConnectorFields.forEach(field -> {
Field.GroupEntry currentGroupEntry = field.group();
if (!groups.containsKey(currentGroupEntry.getGroup())) {
groups.put(currentGroupEntry.getGroup(), new HashMap<>());
}
if (groups.get(currentGroupEntry.getGroup()).containsKey(currentGroupEntry.getPositionInGroup())) {
if (currentGroupEntry.getPositionInGroup() < 9999) {
errors.add("\"" + field.name() + "\" uses an occupied position in group \"" + currentGroupEntry.getGroup() + "\". Position no. "
+ currentGroupEntry.getPositionInGroup() + " is already taken by field: \""
+ groups.get(currentGroupEntry.getGroup()).get(currentGroupEntry.getPositionInGroup()));
}
}
else {
groups.get(currentGroupEntry.getGroup()).put(currentGroupEntry.getPositionInGroup(), field.name());
}
});
if (!errors.isEmpty()) {
fail("Duplicate field group assingnments found: " + String.join("\n", errors));
}
}
}

View File

@ -40,7 +40,6 @@ log4j.additivity.io.debezium=false
# Kafka is pretty verbose at INFO level, so for brevity use ERROR everywhere except INFO at kafka.server.KafkaServer
log4j.logger.org.apache.kafka=ERROR, kafka
log4j.additivity.org.apache.kafka=false
log4j.logger.org.apache.kafka.common.utils=WARN, kafka
log4j.additivity.org.apache.kafka=false
log4j.logger.kafka=ERROR, kafka

View File

@ -85,14 +85,14 @@ public final class EmbeddedEngine implements DebeziumEngine<SourceRecord> {
*/
public static final Field ENGINE_NAME = Field.create("name")
.withDescription("Unique name for this connector instance.")
.withValidation(Field::isRequired);
.required();
/**
* A required field for an embedded connector that specifies the name of the normal Debezium connector's Java class.
*/
public static final Field CONNECTOR_CLASS = Field.create("connector.class")
.withDescription("The Java class for the connector")
.withValidation(Field::isRequired);
.required();
/**
* An optional field that specifies the name of the class that implements the {@link OffsetBackingStore} interface,

View File

@ -27,7 +27,7 @@ public class ContentBasedRouter<R extends ConnectRecord<R>> extends ScriptingTra
.withType(ConfigDef.Type.STRING)
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.HIGH)
.withValidation(Field::isRequired)
.required()
.withDescription("An expression determining the new name of the topic the record should use. When null the record is delivered to the original topic.");
@Override

View File

@ -25,7 +25,7 @@ public class Filter<R extends ConnectRecord<R>> extends ScriptingTransformation<
.withType(ConfigDef.Type.STRING)
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.HIGH)
.withValidation(Field::isRequired)
.required()
.withDescription("An expression determining whether the record should be filtered out. When evaluated to true the record is removed.");
@Override

View File

@ -108,7 +108,7 @@ public static NullHandling parse(String value, String defaultValue) {
.withType(ConfigDef.Type.STRING)
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.HIGH)
.withValidation(Field::isRequired)
.required()
.withDescription("An expression language used to evaluate the expression. Must begin with 'jsr223.', e.g. 'jsr223.groovy' or 'jsr223.graal.js'.");
public static final Field NULL_HANDLING = Field.create("null.handling.mode")

18
pom.xml
View File

@ -1,13 +1,14 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.jboss</groupId>
<artifactId>jboss-parent</artifactId>
<version>36</version>
</parent>
<parent>
<groupId>org.jboss</groupId>
<artifactId>jboss-parent</artifactId>
<version>36</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>io.debezium</groupId>
<artifactId>debezium-build-parent</artifactId>
<version>1.7.0-SNAPSHOT</version>
@ -15,16 +16,19 @@
<description>Debezium is an open source change data capture platform</description>
<packaging>pom</packaging>
<url>https://debezium.io</url>
<scm>
<connection>scm:git:git@github.com:debezium/debezium.git</connection>
<developerConnection>scm:git:git@github.com:debezium/debezium.git</developerConnection>
<url>https://github.com/debezium/debezium</url>
<tag>HEAD</tag>
</scm>
<issueManagement>
<system>jira</system>
<url>http://issues.jboss.org/browse/DBZ</url>
</issueManagement>
<licenses>
<license>
<name>Apache Software License 2.0</name>