DBZ-7693 Assign binlog field groups

This commit is contained in:
Chris Cranford 2024-04-01 11:53:48 -04:00 committed by Jiri Pechanec
parent 72d252d1ac
commit 317bd68f10

View File

@ -357,6 +357,7 @@ public interface SnapshotLockingStrategy {
.withType(ConfigDef.Type.STRING)
.withWidth(ConfigDef.Width.LONG)
.withImportance(ConfigDef.Importance.LOW)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED, 4))
.withDescription("A semicolon separated list of SQL statements to be executed when a JDBC connection "
+ "(not binlog reading connection) to the database is established. Note that the connector may "
+ "establish JDBC connections at its own discretion, so this should typically be used for "
@ -370,6 +371,7 @@ public interface SnapshotLockingStrategy {
.withImportance(ConfigDef.Importance.HIGH)
.required()
.withValidation(Field::isPositiveLong)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 1))
.withDescription("A numeric ID of this database client, which must be unique across all "
+ "currently-running database processes in the cluster. This connector joins the "
+ "database cluster as another server (with this unique ID) so it can read the binlog.");
@ -380,6 +382,7 @@ public interface SnapshotLockingStrategy {
.withWidth(ConfigDef.Width.LONG)
.withImportance(ConfigDef.Importance.HIGH)
.withDefault(10000L)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED, 0))
.withDescription("Only relevant if parallel snapshotting is configured. During parallel snapshotting, "
+ "multiple (4) connections open to the database client, and they each need their own unique "
+ "connection ID. This offset is used to generate those IDs from the base configured cluster ID.");
@ -389,6 +392,7 @@ public interface SnapshotLockingStrategy {
.withEnum(SecureConnectionMode.class, SecureConnectionMode.PREFERRED)
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.MEDIUM)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_SSL, 0))
.withDescription("Whether to use an encrypted connection to the database. Options include: "
+ "'disabled' to use an unencrypted connection; "
+ "'preferred' (the default) to establish a secure (encrypted) connection if the server supports "
@ -404,6 +408,7 @@ public interface SnapshotLockingStrategy {
.withType(ConfigDef.Type.STRING)
.withWidth(ConfigDef.Width.LONG)
.withImportance(ConfigDef.Importance.MEDIUM)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_SSL, 1))
.withDescription("The location of the key store file. "
+ "This is optional and can be used for two-way authentication between the client and the database.");
@ -412,6 +417,7 @@ public interface SnapshotLockingStrategy {
.withType(ConfigDef.Type.PASSWORD)
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.MEDIUM)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_SSL, 2))
.withDescription("The password for the key store file. "
+ "This is optional and only needed if 'database.ssl.keystore' is configured.");
@ -420,6 +426,7 @@ public interface SnapshotLockingStrategy {
.withType(ConfigDef.Type.STRING)
.withWidth(ConfigDef.Width.LONG)
.withImportance(ConfigDef.Importance.MEDIUM)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_SSL, 3))
.withDescription("The location of the trust store file for the server certificate verification.");
public static final Field SSL_TRUSTSTORE_PASSWORD = Field.create("database.ssl.truststore.password")
@ -427,6 +434,7 @@ public interface SnapshotLockingStrategy {
.withType(ConfigDef.Type.PASSWORD)
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.MEDIUM)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_SSL, 4))
.withDescription("The password for the trust store file. "
+ "Used to check the integrity of the truststore, and unlock the truststore.");
@ -437,6 +445,7 @@ public interface SnapshotLockingStrategy {
.withImportance(ConfigDef.Importance.MEDIUM)
.withDefault(30000)
.withValidation(Field::isPositiveInteger)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED, 1))
.withDescription("Maximum time to wait after trying to connect to the database before timing out, "
+ "given in milliseconds. Defaults to 30 seconds (30,000 ms).");
@ -447,6 +456,7 @@ public interface SnapshotLockingStrategy {
.withImportance(ConfigDef.Importance.LOW)
.withDefault(true)
.withValidation(Field::isBoolean)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED, 2))
.withDescription("Whether a separate thread should be used to ensure the connection is kept alive.");
public static final Field KEEP_ALIVE_INTERVAL_MS = Field.create("connect.keep.alive.interval.ms")
@ -456,6 +466,7 @@ public interface SnapshotLockingStrategy {
.withImportance(ConfigDef.Importance.LOW)
.withDefault(Duration.ofMinutes(1).toMillis())
.withValidation(Field::isPositiveInteger)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED, 3))
.withDescription("Interval for connection checking if keep alive thread is used, given in milliseconds "
+ "Defaults to 1 minute (60,000 ms).");
@ -466,6 +477,7 @@ public interface SnapshotLockingStrategy {
.withImportance(ConfigDef.Importance.LOW)
.withDefault(1_000)
.withValidation(Field::isNonNegativeLong)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 2))
.withDescription("The number of rows a table must contain to stream results rather than pull "
+ "all into memory during snapshots. Defaults to 1,000. Use 0 to stream all results "
+ "and completely avoid checking the size of each table.");
@ -477,6 +489,7 @@ public interface SnapshotLockingStrategy {
.withImportance(ConfigDef.Importance.MEDIUM)
.withDefault(DEFAULT_BINLOG_BUFFER_SIZE)
.withValidation(Field::isNonNegativeInteger)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 3))
.withDescription("The size of a look-ahead buffer used by the binlog reader to decide whether the "
+ "transaction in progress is going to be committed or rolled back. Use 0 to disable look-ahead "
+ "buffering. Defaults to " + DEFAULT_BINLOG_BUFFER_SIZE + " (i.e. buffering is disabled.");
@ -496,6 +509,7 @@ public interface SnapshotLockingStrategy {
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.MEDIUM)
.withDefault(false)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 0))
.withDescription("Whether the connector should include the original SQL query that generated the change "
+ "event. Note: This option requires the database to be configured using the server option "
+ "binlog_rows_query_log_events (MySQL) or binlog_annotate_row_events (MariaDB) set to ON."
@ -508,6 +522,7 @@ public interface SnapshotLockingStrategy {
.withEnum(SnapshotMode.class, SnapshotMode.INITIAL)
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.LOW)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 0))
.withDescription("The criteria for running a snapshot upon startup of the connector. "
+ "Select one of the following snapshot options: "
+ "'when_needed': On startup, the connector runs a snapshot if one is needed.; "
@ -522,6 +537,7 @@ public interface SnapshotLockingStrategy {
.withDisplayName("The time precision mode to be used")
.withEnum(TemporalPrecisionMode.class, TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS)
.withValidation(BinlogConnectorConfig::validateTimePrecisionMode)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 26))
.withDescription("Time, date and timestamps can be represented with different kinds of precisions, including: "
+ "'adaptive_time_microseconds': the precision of date and timestamp values is based the database column's precision; but time fields always use microseconds precision; "
+ "'connect': always represents time, date and timestamp values using Kafka Connect's built-in representations for Time, Date, and Timestamp, "
@ -532,6 +548,7 @@ public interface SnapshotLockingStrategy {
.withEnum(BigIntUnsignedHandlingMode.class, BigIntUnsignedHandlingMode.LONG)
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.MEDIUM)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 27))
.withDescription("Specify how BIGINT UNSIGNED columns should be represented in change events, including: "
+ "'precise' uses java.math.BigDecimal to represent values, which are encoded in the change events using a binary representation and Kafka Connect's 'org.apache.kafka.connect.data.Decimal' type; "
+ "'long' (the default) represents values using Java's 'long', which may not offer the precision but will be far easier to use in consumers.");
@ -546,6 +563,7 @@ public interface SnapshotLockingStrategy {
.withValidation(BinlogConnectorConfig::validateEventDeserializationFailureHandlingModeNotSet)
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.MEDIUM)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 21))
.withDescription("Specify how failures during deserialization of binlog events (i.e. when encountering a corrupted event) should be handled, including: "
+ "'fail' (the default) an exception indicating the problematic event and its binlog position is raised, causing the connector to be stopped; "
+ "'warn' the problematic event and its binlog position will be logged and the event will be skipped; "
@ -556,6 +574,7 @@ public interface SnapshotLockingStrategy {
.withEnum(EventProcessingFailureHandlingMode.class, EventProcessingFailureHandlingMode.FAIL)
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.MEDIUM)
.withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 2))
.withDescription("Specify how binlog events that belong to a table missing from internal schema representation "
+ "(i.e. internal representation is not consistent with database) should be handled, including: "
+ "'fail' (the default) an exception indicating the problematic event and its binlog position is raised, causing the connector to be stopped; "
@ -568,6 +587,7 @@ public interface SnapshotLockingStrategy {
.withDefault(true)
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.LOW)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 22))
.withDescription("The database allows the user to insert year value as either 2-digit or 4-digit. "
+ "In case of two digit the value is automatically mapped into 1970 - 2069."
+ "false - delegates the implicit conversion to the database; "
@ -593,7 +613,8 @@ public interface SnapshotLockingStrategy {
.withType(ConfigDef.Type.LIST)
.withWidth(ConfigDef.Width.LONG)
.withImportance(ConfigDef.Importance.HIGH)
.withDependents(TABLE_INCLUDE_LIST_NAME);
.withDependents(TABLE_INCLUDE_LIST_NAME)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 24));
/**
* Specifies the GTID ranges to be excluded.<p></p>
@ -608,7 +629,8 @@ public interface SnapshotLockingStrategy {
.withWidth(ConfigDef.Width.LONG)
.withImportance(ConfigDef.Importance.MEDIUM)
.withValidation(BinlogConnectorConfig::validateGtidSetExcludes)
.withInvisibleRecommender();
.withInvisibleRecommender()
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 25));
/**
* When set to {@code true}, the connector will produce DML events for transactions that were written by
@ -625,6 +647,7 @@ public interface SnapshotLockingStrategy {
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.MEDIUM)
.withDefault(true)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 23))
.withDescription("When set to true, only produce DML events for transactions that were written on the "
+ "server with matching GTIDs defined by the `gtid.source.includes` or `gtid.source.excludes`, "
+ "if they were specified.");