tet123/documentation/modules/ROOT/pages/connectors/jdbc.adoc
2024-02-20 09:44:31 -05:00

891 lines
42 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Category: debezium-using
// Type: assembly
[id="debezium-connector-for-jdbc"]
= {prodname} connector for JDBC
:context: JDBC
:mbean-name: {context}
ifdef::community[]
:toc:
:toc-placement: macro
:linkattrs:
:icons: font
:source-highlighter: highlight.js
toc::[]
[[jdbc-overview]]
== Overview
endif::community[]
The {prodname} JDBC connector is a Kafka Connect sink connector implementation that can consume events from multiple source topics, and then write those events to a relational database by using a JDBC driver.
This connector supports a wide variety of database dialects, including Db2, MySQL, Oracle, PostgreSQL, and SQL Server.
// Type: assembly
// ModuleID: debezium-jdbc-connector-how-the-debezium-connector-works
// Title: How the {prodname} JDBC connector works
[[how-the-jdbc-connector-works]]
== How the JDBC connector works
The {prodname} JDBC connector is a Kafka Connect sink connector, and therefore requires the Kafka Connect runtime.
The connector periodically polls the Kafka topics that it subscribes to, consumes events from those topics, and then writes the events to the configured relational database.
The connector supports idempotent write operations by using upsert semantics and basic schema evolution.
The {prodname} JDBC connector provides the following features:
* xref:jdbc-consume-complex-debezium-events[]
* xref:jdbc-at-least-once-delivery[]
* xref:jdbc-multiple-tasks[]
* xref:jdbc-data-and-type-mappings[]
* xref:jdbc-primary-key-handling[]
* xref:jdbc-delete-mode[]
* xref:jdbc-idempotent-writes[]
* xref:jdbc-schema-evolution[]
* xref:jdbc-quoting-case-sensitivity[]
// Type: concept
// Title: Description of how the {prodname} JDBC connector consumes complex change events
// ModuleID: debezium-jdbc-connector-description-of-how-the-connector-consumes-complex-change-events
[[jdbc-consume-complex-debezium-events]]
=== Consuming complex {prodname} change events
By default, {prodname} source connectors produce complex, hierarchical change events.
When {prodname} connectors are used with other JDBC sink connector implementations, you might need to apply the `ExtractNewRecordState` single message transformation (SMT) to flatten the payload of change events, so that they can be consumed by the sink implementation.
If you run the {prodname} JDBC sink connector, it's not necessary to deploy the SMT, because the {prodname} sink connector can consume native {prodname} change events directly, without the use of a transformation.
When the JDBC sink connector consumes a complex change event from a {prodname} source connector, it extracts the values from the `after` section of the original `insert` or `update` event.
When a delete event is consumed by the sink connector, no part of the event's payload is consulted.
[IMPORTANT]
====
The {prodname} JDBC sink connector is not designed to read from schema change topics.
If your source connector is configured to capture schema changes, in the JDBC connector configuration, set the xref:jdbc-property-connection-topics[`topics`] or xref:jdbc-property-connection-topics-regex[`topics.regex`] properties so that the connector does not consume from schema change topics.
====
// Type:concept
// Title: Description of {prodname} JDBC connector at-least-once delivery
// ModuleID: debezium-jdbc-connector-description-of-at-least-once-delivery
[[jdbc-at-least-once-delivery]]
=== At-least-once delivery
The {prodname} JDBC sink connector guarantees that events that is consumes from Kafka topics are processed at least once.
// Type: concept
// Title: Description of {prodname} JDBC use of multiple tasks
// ModuleID: debezium-jdbc-connector-description-of-the-connector-use-of-multiple-tasks
[[jdbc-multiple-tasks]]
=== Multiple tasks
You can run the {prodname} JDBC sink connector across multiple Kafka Connect tasks.
To run the connector across multiple tasks, set the `tasks.max` configuration property to the number of tasks that you want the connector to use.
The Kafka Connect runtime starts the specified number of tasks, and runs one instance of the connector per task.
Multiple tasks can improve performance by reading and processing changes from multiple source topics in parallel.
// Type: concept
// Title: Description of {prodname} JDBC connector data and column type mappings
// ModuleID: debezium-jdbc-connector-description-of-data-and-column-type-mappings
[[jdbc-data-and-type-mappings]]
=== Data and column type mappings
To enable the {prodname} JDBC sink connector to correctly map the data type from an inbound message field to an outbound message field, the connector requires information about the data type of each field that is present in the source event.
The connector supports a wide range of column type mappings across different database dialects.
To correctly convert the destination column type from the `type` metadata in an event field, the connector applies the data type mappings that are defined for the source database.
You can enhance the way that the connector resolves data types for a column by setting the `column.propagate.source.type` or `datatype.propagate.source.type` options in the source connector configuration.
When you enable these options, {prodname} includes extra parameter metadata, which assists the JDBC sink connector in more accurately resolving the data type of destination columns.
For the {prodname} JDBC sink connector to process events from a Kafka topic, the Kafka topic message key, when present, must be a primitive data type or a `Struct`.
In addition, the payload of the source message must be a `Struct` that has either a flattened structure with no nested `struct` types, or a nested `struct` layout that conforms to {prodname}'s complex, hierarchical structure.
If the structure of the events in the Kafka topic do not adhere to these rules, you must implement a custom single message transformation to convert the structure of the source events into a usable format.
// Type: concept
// Title: Description of how the {prodname} JDBC connector handles primary keys in source events
// ModuleID: debezium-jdbc-connector-description-of-how-the-connector-handles-primary-keys-in-source-events
[[jdbc-primary-key-handling]]
=== Primary key handling
By default, the {prodname} JDBC sink connector does not transform any of the fields in the source event into the primary key for the event.
Unfortunately, the lack of a stable primary key can complicate event processing, depending on your business requirements, or when the sink connector uses upsert semantics.
To define a consistent primary key, you can configure the connector to use one of the primary key modes described in the following table:
|===
|Mode|Description
|`none`
|No primary key fields are specified when creating the table.
|`kafka`
a|The primary key consists of the following three columns:
* `__connect_topic`
* `__connect_partition`
* `__connect_offset`
The values for these columns are sourced from the coordinates of the Kafka event.
|`record_key`
|The primary key is composed of the Kafka event's key. +
+
If the primary key is a primitive type, specify the name of the column to be used by setting the xref:jdbc-property-primary-key-fields[`primary.key.fields`] property.
If the primary key is a struct type, the fields in the struct are mapped as columns of the primary key.
You can use the `primary.key.fields` property to restrict the primary key to a subset of columns.
|`record_value`
|The primary key is composed of the Kafka event's value. +
+
Because the value of a Kafka event is always a `Struct`, by default, all of the fields in the value become columns of the primary key.
To use a subset of fields in the primary key, set the xref:jdbc-property-primary-key-fields[`primary.key.fields`] property to specify a comma-separated list of fields in the value from which you want to derive the primary key columns.
|`record_header`
|The primary key is composed of the Kafka event's headers. +
+
Kafka event's headers contains could contain multiple header that each one could be `Struct` or primitives data types,
the connectors makes a `Struct` of these headers. Hence, all fields in this `Struct` become columns of the primary key.
To use a subset of fields in the primary key, set the xref:jdbc-property-primary-key-fields[`primary.key.fields`] property to specify a comma-separated list of fields in the value from which you want to derive the primary key columns.
|===
[IMPORTANT]
====
Some database dialects might throw an exception if you set the `primary.key.mode` to `kafka` and set `schema.evolution` to `basic`.
This exception occurs when a dialect maps a `STRING` data type mapping to a variable length string data type such as `TEXT` or `CLOB`, and the dialect does not allow primary key columns to have unbounded lengths.
To avoid this problem, apply the following settings in your environment:
* Do not set `schema.evolution` to `basic`.
* Create the database table and primary key mappings in advance.
====
// Type: procedure
// Title: Configuring the {prodname} JDBC connector to delete rows when consuming `DELETE` or _tombstone_ events
// ModuleID: debezium-jdbc-connector-configuring-the-connector-to-delete-rows
[[jdbc-delete-mode]]
=== Delete mode
The {prodname} JDBC sink connector can delete rows in the destination database when a `DELETE` or _tombstone_ event is consumed.
By default, the JDBC sink connector does not enable delete mode.
If you want to the connector to remove rows, you must explicitly set `delete.enabled=true` in the connector configuration.
To use this mode you must also set xref:jdbc-property-primary-key-fields[`primary.key.fields`] to a value other than `none`.
The preceding configuration is necessary, because deletes are executed based on the primary key mapping, so if a destination table has no primary key mapping, the connector is unable to delete rows.
// Type: procedure
// Title: Enabling the connector to perform idempotent writes
// ModuleID: debezium-jdbc-connector-enabling-idempotent-writes
[[jdbc-idempotent-writes]]
=== Idempotent writes
The {prodname} JDBC sink connector can perform idempotent writes, enabling it to replay the same records repeatedly and not change the final database state.
To enable the connector to perform idempotent writes, you must be explicitly set the `insert.mode` for the connector to `upsert`.
An `upsert` operation is applied as either an `update` or an `insert`, depending on whether the specified primary key already exists.
If the primary key value already exists, the operation updates values in the row.
If the specified primary key value doesn't exist, an `insert` adds a new row.
Each database dialect handles idempotent writes differently, because there is no SQL standard for _upsert_ operations.
The following table shows the `upsert` DML syntax for the database dialects that {prodname} supports:
|===
|Dialect |Upsert Syntax
|Db2
|`MERGE ...`
|MySQL
|`INSERT ... ON DUPLICATE KEY UPDATE ...`
|Oracle
|`MERGE ...`
|PostgreSQL
|`INSERT ... ON CONFLICT ... DO UPDATE SET ...`
|SQL Server
|`MERGE ...`
|===
// Type: reference
// Title: Schema evolution modes for the {prodname} JDBC connector
// ModuleID: debezium-jdbc-connector-schema-evolution-modes
[[jdbc-schema-evolution]]
=== Schema evolution
You can use the following schema evolution modes with the {prodname} JDBC sink connector:
|===
|Mode |Description
|`none`
|The connector does not perform any DDL schema evolution.
|`basic`
|The connector automatically detects fields that are in the event payload but that do not exist in the destination table.
The connector alters the destination table to add the new fields.
|===
When `schema.evolution` is set to `basic`, the connector automatically creates or alters the destination database table according to the structure of the incoming event.
When an event is received from a topic for the first time, and the destination table does not yet exist, the {prodname} JDBC sink connector uses the event's key, or the schema structure of the record to resolve the column structure of the table.
If schema evolution is enabled, the connector prepares and executes a `CREATE TABLE` SQL statement before it applies the DML event to the destination table.
When the {prodname} JDBC connector receives an event from a topic, if the schema structure of the record differs from the schema structure of the destination table, the connector uses either the event's key or its schema structure to identify which columns are new, and must be added to the database table.
If schema evolution is enabled, the connector prepares and executes an `ALTER TABLE` SQL statement before it applies the DML event to the destination table.
Because changing column data types, dropping columns, and adjusting primary keys can be considered dangerous operations, the connector is prohibited from performing these operations.
The schema of each field determines whether a column is `NULL` or `NOT NULL`.
The schema also defines the default values for each column.
If the connector attempts to create a table with a nullability setting or a default value that don't want, you must either create the table manually, ahead of time, or adjust the schema of the associated field before the sink connector processes the event.
To adjust nullability settings or default values, you can introduce a custom single message transformation that applies changes in the pipeline, or modifies the column state defined in the source database.
A field's data type is resolved based on a predefined set of mappings.
For more information, see xref:jdbc-field-types[].
[IMPORTANT]
====
When you introduce new fields to the event structure of tables that already exist in the destination database, you must define the new fields as optional, or the fields must have a default value specified in the database schema.
If you want a field to be removed from the destination table, use one of the following options:
* Remove the field manually.
* Drop the column.
* Assign a default value to the field.
* Define the field a nullable.
====
// Type: procedure
// Title: Specifying options to define the letter case of destination table and column names
// ModuleID: debezium-jdbc-connector-specifying-options-to-define-letter-case-of-destination-names
[[jdbc-quoting-case-sensitivity]]
=== Quoting and case sensitivity
The {prodname} JDBC sink connector consumes Kafka messages by constructing either DDL (schema changes) or DML (data changes) SQL statements that are executed on the destination database.
By default, the connector uses the names of the source topic and the event fields as the basis for the table and column names in the destination table.
The constructed SQL does not automatically delimit identifiers with quotes to preserve the case of the original strings.
As a result, by default, the text case of table or column names in the destination database depends entirely on how the database handles name strings when the case is not specified.
For example, if the destination database dialect is Oracle and the event's topic is `orders`, the destination table will be created as `ORDERS` because Oracle defaults to upper-case names when the name is not quoted.
Similarly, if the destination database dialect is PostgreSQL and the event's topic is `ORDERS`, the destination table will be created as `orders` because PostgreSQL defaults to lower-case names when the name is not quoted.
To explicitly preserve the case of the table and field names that are present in a Kafka event, in the connector configuration, set the value of the `quote.identifiers` property to `true`.
When this options is set, when an incoming event is for a topic called `orders`, and the destination database dialect is Oracle, the connector creates a table with the name `orders`, because the constructed SQL defines the name of the table as `"orders"`.
Enabling quoting results in the same behavior when the connector creates column names.
// Type: reference
// ModuleID: debezium-jdbc-connector-how-the-connector-maps-data-types
// Title: How the {prodname} JDBC connector maps data types
[[jdbc-field-types]]
== Data type mappings
The {prodname} JDBC sink connector resolves a column's data type by using a logical or primitive type-mapping system.
Primitive types include values such as integers, floating points, Booleans, strings, and bytes.
Typically, these types are represented with a specific Kafka Connect `Schema` type code only.
Logical data types are more often complex types, including values such as `Struct`-based types that have a fixed set of field names and schema, or values that are represented with a specific encoding, such as number of days since epoch.
The following examples show representative structures of primitive and logical data types:
.Primitive field schema
[source.json]
----
{
"schema": {
"type": "INT64"
}
}
----
.Logical field schema
[source,json]
----
[
"schema": {
"type": "INT64",
"name": "org.apache.kafka.connect.data.Date"
}
]
----
Kafka Connect is not the only source for these complex, logical types.
In fact, {prodname} source connectors generate change events that have fields with similar logical types to represent a variety of different data types, including but not limited to, timestamps, dates, and even JSON data.
The {prodname} JDBC sink connector uses these primitive and logical types to resolve a column's type to a JDBC SQL code, which represents a column's type.
These JDBC SQL codes are then used by the underlying Hibernate persistence framework to resolve the column's type to a logical data type for the dialect in use.
The following tables illustrate the primitive and logical mappings between Kafka Connect and JDBC SQL types, and between {prodname} and JDBC SQL types.
The actual final column type varies with for each database type.
. xref:jdbc-kafka-connect-primitive-mappings[]
. xref:jdbc-kafka-connect-logical-mappings[]
. xref:jdbc-debezium-logical-mappings[]
. xref:jdbc-debezium-logical-mappings-dialect-specific[]
[[jdbc-kafka-connect-primitive-mappings]]
.Mappings between Kafka Connect Primitives and Column Data Types
|===
|Primitive Type |JDBC SQL Type
|INT8
|Types.TINYINT
|INT16
|Types.SMALLINT
|INT32
|Types.INTEGER
|INT64
|Types.BIGINT
|FLOAT32
|Types.FLOAT
|FLOAT64
|Types.DOUBLE
|BOOLEAN
|Types.BOOLEAN
|STRING
|Types.CHAR, Types.NCHAR, Types.VARCHAR, Types.NVARCHAR
|BYTES
|Types.VARBINARY
|===
[[jdbc-kafka-connect-logical-mappings]]
.Mappings between Kafka Connect Logical Types and Column Data Types
|===
|Logical Type |JDBC SQL Type
|org.apache.kafka.connect.data.Decimal
|Types.DECIMAL
|org.apache.kafka.connect.data.Date
|Types.DATE
|org.apache.kafka.connect.data.Time
|Types.TIMESTAMP
|org.apache.kafka.connect.data.Timestamp
|Types.TIMESTAMP
|===
[[jdbc-debezium-logical-mappings]]
.Mappings between {prodname} Logical Types and Column Data Types
|===
|Logical Type |JDBC SQL Type
|io.debezium.time.Date
|Types.DATE
|io.debezium.time.Time
|Types.TIMESTAMP
|io.debezium.time.MicroTime
|Types.TIMESTAMP
|io.debezium.time.NanoTime
|Types.TIMESTAMP
|io.debezium.time.ZonedTime
|Types.TIME_WITH_TIMEZONE
|io.debezium.time.Timestamp
|Types.TIMESTAMP
|io.debezium.time.MicroTimestamp
|Types.TIMESTAMP
|io.debezium.time.NanoTimestamp
|Types.TIMESTAMP
|io.debezium.time.ZonedTimestamp
|Types.TIMESTAMP_WITH_TIMEZONE
|io.debezium.data.VariableScaleDecimal
|Types.DOUBLE
|===
[IMPORTANT]
====
If the database does not support time or timestamps with time zones, the mapping resolves to its equivalent without timezones.
====
[[jdbc-debezium-logical-mappings-dialect-specific]]
.Mappings between {prodname} dialect-specific Logical Types and Column Data Types
|===
|Logical Type |MySQL SQL Type |PostgreSQL SQL Type |SQL Server SQL Type
|io.debezium.data.Bits
|`bit(n)`
|`bit(n)` or `bit varying`
|`varbinary(n)`
|io.debezium.data.Enum
|`enum`
|Types.VARCHAR
|n/a
|io.debezium.data.Json
|`json`
|`json`
|n/a
|io.debezium.data.EnumSet
|`set`
|n/a
|n/a
|io.debezium.time.Year
|`year(n)`
|n/a
|n/a
|io.debezium.time.MicroDuration
|n/a
|`interval`
|n/a
|io.debezium.data.Ltree
|n/a
|`ltree`
|n/a
|io.debezium.data.Uuid
|n/a
|`uuid`
|n/a
|io.debezium.data.Xml
|n/a
|`xml`
|`xml`
|===
In addition to the primitive and logical mappings above, if the source of the change events is a {prodname} source connector, the resolution of the column type, along with its length, precision, and scale, can be further influenced by enabling column or data type propagation.
To enforce propagation, one of the following properties must be set in the source connector configuration:
* `column.propagate.source.type`
* `datatype.propagate.source.type`
The {prodname} JDBC sink connector applies the values with the higher precedence.
For example, let's say the following field schema is included in a change event:
.{prodname} change event field schema with column or data type propagation enabled
[source,json]
----
{
"schema": {
"type": "INT8",
"parameters": {
"__debezium.source.column.type": "TINYINT",
"__debezium.source.column.length": "1"
}
}
}
----
In the preceding example, if no schema parameters are set, the {prodname} JDBC sink connector maps this field to a column type of `Types.SMALLINT`.
`Types.SMALLINT` can have different logical database types, depending on the database dialect.
For MySQL, the column type in the example converts to a `TINYINT` column type with no specified length.
If column or data type propagation is enabled for the source connector, the {prodname} JDBC sink connector uses the mapping information to refine the data type mapping process and create a column with the type `TINYINT(1)`.
[NOTE]
====
Typically, the effect of using column or data type propagation is much greater when the same type of database is used for both the source and sink database.
ifdef::community[]
We are continually looking at ways to improve this mapping across heterogeneous databases and the current type system allows us to continue to refine these mappings based on feedback.
If you find a mapping could be improved, please let us know.
endif::community[]
====
// Type: assembly
// ModuleID: debezium-jdbc-connector-deployment
// Title: Deployment of {prodname} JDBC connectors
[[jdbc-deployment]]
== Deployment
To deploy a {prodname} JDBC connector, you install the {prodname} JDBC connector archive, configure the connector, and start the connector by adding its configuration to Kafka Connect.
.Prerequisites
* link:https://zookeeper.apache.org/[Apache ZooKeeper], link:http://kafka.apache.org/[Apache Kafka], and link:{link-kafka-docs}.html#connect[Kafka Connect] are installed.
* A destination database is installed and configured to accept JDBC connections.
.Procedure
. Download the {prodname} https://repo1.maven.org/maven2/io/debezium/debezium-connector-jdbc/{debezium-version}/debezium-connector-jdbc-{debezium-version}-plugin.tar.gz[JDBC connector plug-in archive].
. Extract the files into your Kafka Connect environment.
. Optionally download the JDBC driver from Maven Central and extract the downloaded driver file to the directory that contains the JDBC sink connector JAR file.
+
[NOTE]
====
Drivers for Oracle and Db2 are not included with the JDBC sink connector.
You must download the drivers and install them manually.
====
. Add the driver JAR files to the path where the JDBC sink connector has been installed.
. Make sure that the path where you install the JDBC sink connector is part of the {link-kafka-docs}/#connectconfigs[Kafka Connect `plugin.path`].
. Restart the Kafka Connect process to pick up the new JAR files.
// ModuleID: debezium-jdbc-connector-configuration
// Type: reference
[[jdbc-connector-configuration]]
=== {prodname} JDBC connector configuration
Typically, you register a {prodname} JDBC connector by submitting a JSON request that specifies the configuration properties for the connector.
The following example shows a JSON request for registering an instance of the {prodname} JDBC sink connector that consumes events from a topic called `orders` with the most common configuration settings:
.Example: {prodname} JDBC connector configuration
[source,json,indent=0,subs="+quotes"]
----
{
"name": "jdbc-connector", // <1>
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector", // <2>
"tasks.max": "1", // <3>
"connection.url": "jdbc:postgresql://localhost/db", // <4>
"connection.username": "pguser", // <5>
"connection.password": "pgpassword", // <6>
"insert.mode": "upsert", // <7>
"delete.enabled": "true", // <8>
"primary.key.mode": "record_key", // <9>
"schema.evolution": "basic", // <10>
"database.time_zone": "UTC", // <11>
"topics": "orders" // <12>
}
}
----
Descriptions of JDBC connector configuration settings
[cols="1,7",options="header",subs="+attributes"]
|===
|Item |Description
|1
|The name that is assigned to the connector when you register it with Kafka Connect service.
|2
|The name of the JDBC sink connector class.
|3
|The maximum number of tasks to create for this connector.
|4
|The JDBC URL that the connector uses to connect to the sink database that it writes to.
|5
|The name of the database user that is used for authentication.
|6
|The password of the database user used for authentication.
|7
|The xref:jdbc-property-insert-mode[insert.mode] that the connector uses.
|8
|Enables the deletion of records in the database.
For more information, see the xref:jdbc-property-delete-enabled[delete.enabled] configuration property.
|9
|Specifies the method used to resolve primary key columns.
For more information, see the xref:jdbc-property-primary-key-mode[primary.key.mode] configuration property.
|10
|Enables the connector to evolve the destination database's schema.
For more information, see the xref:jdbc-property-schema-evolution[schema.evolution] configuration property.
|11
|Specifies the timezone used when writing temporal field types.
|12
|List of topics to consume, separated by commas.
|===
For a complete list of configuration properties that you can set for the {prodname} JDBC connector, see xref:jdbc-connector-properties[JDBC connector properties].
You can send this configuration with a `POST` command to a running Kafka Connect service.
The service records the configuration and starts a sink connector task(s) that performs the following operations:
* Connects to the database.
* Consumes events from subscribed Kafka topics.
* Writes the events to the configured database.
// Type: reference
// Title: Descriptions of {prodname} JDBC connector configuration properties
// ModuleID: debezium-jdbc-connector-descriptions-of-connector-configuration-properties
[[jdbc-connector-properties]]
== Connector properties
The {prodname} JDBC sink connector has several configuration properties that you can use to achieve the connector behavior that meets your needs.
Many properties have default values.
Information about the properties is organized as follows:
* xref:jdbc-connector-properties-generic[JCBC connector generic properties]
* xref:jdbc-connector-properties-connection[JDBC connector connection properties]
* xref:jdbc-connector-properties-runtime[JDBC connector runtime properties]
* xref:jdbc-connector-properties-extendable[JDBC connector extendable properties]
[[jdbc-connector-properties-generic]]
.Generic properties
[cols="30%a,25%a,45%a"]
|===
|Property |Default |Description
|[[jdbc-property-connection-name]]<<jdbc-property-connection-name, `+name+`>>
|No default
|Unique name for the connector.
A failure results if you attempt to reuse this name when registering a connector.
This property is required by all Kafka Connect connectors.
|[[jdbc-property-connection-class]]<<jdbc-property-connection-class, `+connector.class+`>>
|No default
|The name of the Java class for the connector.
For the {prodname} JDBC connector, specify the value `io.debezium.connector.jdbc.JdbcSinkConnector`.
|[[jdbc-property-connection-task]]<<jdbc-property-connection-task, `+tasks.max+`>>
|1
|Maximum number of tasks to use for this connector.
|[[jdbc-property-connection-topics]]<<jdbc-property-connection-topics, `+topics+`>>
|No default
|List of topics to consume, separated by commas.
Do not use this property in combination with the xref:jdbc-property-connection-topics-regex[`topics.regex`] property.
|[[jdbc-property-connection-topics-regex]]<<jdbc-property-connection-topics-regex, `+topics.regex+`>>
|No default
|A regular expression that specifies the topics to consume.
Internally, the regular expression is compiled to a `java.util.regex.Pattern`.
Do not use this property in combination with the xref:jdbc-property-connection-topics[`topics`] property.
|===
[[jdbc-connector-properties-connection]]
.JDBC connector connection properties
[cols="30%a,25%a,45%a"]
|===
|Property |Default |Description
|[[jdbc-property-connection-provider]]<<jdbc-property-connection-provider, `+connection.provider+`>>
|`org.hibernate.c3p0.internal.C3P0ConnectionProvider`
|The connection provider implementation to use.
|[[jdbc-property-connection-url]]<<jdbc-property-connection-url, `+connection.url+`>>
|No default
|The JDBC connection URL used to connect to the database.
|[[jdbc-property-connection-username]]<<jdbc-property-connection-username, `+connection.username+`>>
|No default
|The name of the database user account that the connector uses to connect to the database.
|[[jdbc-property-connection-password]]<<jdbc-property-connection-password, `+connection.password+`>>
|No default
|The password that the connector uses to connect to the database.
|[[jdbc-property-connection-pool-min-size]]<<jdbc-property-connection-pool-min-size, `+connection.pool.min_size+`>>
|`5`
|Specifies the minimum number of connections in the pool.
|[[jdbc-property-connection-pool-max-size]]<<jdbc-property-connection-pool-max-size, `+connection.pool.max_size+`>>
|`32`
|Specifies the maximum number of concurrent connections that the pool maintains.
|[[jdbc-property-connection-pool-acquire-increment]]<<jdbc-property-connection-pool-acquire-increment, `+connection.pool.acquire_increment+`>>
|`32`
|Specifies the number of connections that the connector attempts to acquire if the connection pool exceeds its maximum size.
|[[jdbc-property-connection-pool-timeout]]<<jdbc-property-connection-pool-timeout, `+connection.pool.timeout+`>>
|`1800`
|Specifies the number of seconds that an unused connection is kept before it is discarded.
|===
[[jdbc-connector-properties-runtime]]
.JDBC connector runtime properties
[cols="30%a,25%a,45%a"]
|===
|Property |Default |Description
|[[jdbc-property-database-time-zone]]<<jdbc-property-database-time-zone, `+database.time_zone+`>>
|`UTC`
|Specifies the timezone used when inserting JDBC temporal values.
|[[jdbc-property-delete-enabled]]<<jdbc-property-delete-enabled, `+delete.enabled+`>>
|`false`
|Specifies whether the connector processes `DELETE` or _tombstone_ events and removes the corresponding row from the database.
Use of this option requires that you set the xref:jdbc-property-primary-key-mode[`primary.key.mode`] to `record.key`.
|[[jdbc-property-truncate-enabled]]<<jdbc-property-truncate-enabled, `+truncate.enabled+`>>
|`false`
|Specifies whether the connector processes `TRUNCATE` events and truncates the corresponding tables from the database.
[NOTE]
====
Although support for `TRUNCATE` statements has been available in Db2 since link:https://www.ibm.com/support/pages/apar/JR37942/[version 9.7],
currently, the JDBC connector is unable to process standard `TRUNCATE` events that the Db2 connector emits.
To ensure that the JDBC connector can process `TRUNCATE` events received from Db2, perform the truncation by using an alternative to the standard `TRUNCATE TABLE` statement.
For example:
`ALTER TABLE _<table_name>_ ACTIVATE NOT LOGGED INITIALLY WITH EMPTY TABLE`
The user account that submits the preceding query requires `ALTER` privileges on the table to be truncated.
====
|[[jdbc-property-insert-mode]]<<jdbc-property-insert-mode, `+insert.mode+`>>
|`insert`
|Specifies the strategy used to insert events into the database.
The following options are available:
`insert`:: Specifies that all events should construct `INSERT`-based SQL statements.
Use this option only when no primary key is used, or when you can be certain that no updates can occur to rows with existing primary key values.
`update`:: Specifies that all events should construct `UPDATE`-based SQL statements.
Use this option only when you can be certain that the connector receives only events that apply to existing rows.
`upsert`:: Specifies that the connector adds events to the table using `upsert` semantics.
That is, if the primary key does not exist, the connector performs an `INSERT` operation, and if the key does exist, the connector performs an `UPDATE` operation.
When idempotent writes are required, the connector should be configured to use this option.
|[[jdbc-property-primary-key-mode]]<<jdbc-property-primary-key-mode, `+primary.key.mode+`>>
|`none`
|Specifies how the connector resolves the primary key columns from the event.
`none`:: Specifies that no primary key columns are created.
`kafka`:: Specifies that the connector uses Kafka coordinates as the primary key columns.
The key coordinates are defined from the topic name, partition, and offset of the event, and are mapped to columns with the following names:
* `__connect_topic`
* `__connect_partition`
* `__connect_offset`
`record_key`:: Specifies that the primary key columns are sourced from the event's record key.
If the record key is a primitive type, the xref:jdbc-property-primary-key-fields[`primary.key.fields`] property is required to specify the name of the primary key column.
If the record key is a struct type, the xref:jdbc-property-primary-key-fields[`primary.key.fields`] property is optional, and can be used to specify a subset of columns from the event's key as the table's primary key.
`record_value`:: Specifies that the primary key columns is sourced from the event's value.
You can set the xref:jdbc-property-primary-key-fields[`primary.key.fields`] property to define the primary key as a subset of fields from the event's value; otherwise all fields are used by default.
|[[jdbc-property-primary-key-fields]]<<jdbc-property-primary-key-fields, `+primary.key.fields+`>>
|No default
|Either the name of the primary key column or a comma-separated list of fields to derive the primary key from. +
+
When xref:jdbc-property-primary-key-mode[`primary.key.mode`] is set to `record_key` and the event's key is a primitive type, it is expected that this property specifies the column name to be used for the key. +
+
When the xref:jdbc-property-primary-key-mode[`primary.key.mode`] is set to `record_key` with a non-primitive key, or `record_value`, it is expected that this property specifies a comma-separated list of field names from either the key or value.
If the xref:jdbc-property-primary-key-mode[`primary.key.mode`] is set to `record_key` with a non-primitive key, or `record_value`, and this property is not specifies, the connector derives the primary key from all fields of either the record key or record value, depending on the specified mode.
|[[jdbc-property-quote-identifiers]]<<jdbc-property-quote-identifiers, `+quote.identifiers+`>>
|`false`
|Specifies whether generated SQL statements use quotation marks to delimit table and column names.
See the xref:jdbc-quoting-case-sensitivity[] section for more details.
|[[jdbc-property-schema-evolution]]<<jdbc-property-schema-evolution, `+schema.evolution+`>>
|`none`
|Specifies how the connector evolves the destination table schemas.
For more information, see xref:jdbc-schema-evolution[].
The following options are available:
`none`:: Specifies that the connector does not evolve the destination schema.
`basic`:: Specifies that basic evolution occurs.
The connector adds missing columns to the table by comparing the incoming event's record schema to the database table structure.
|[[jdbc-property-table-name-format]]<<jdbc-property-table-name-format, `+table.name.format+`>>
|`+${topic}+`
|Specifies a string that determines how the destination table name is formatted, based on the topic name of the event.
The placeholder, `+${topic}+`, is replaced by the topic name.
|[[jdbc-property-dialect-postgres-postgis-schema]]<<jdbc-property-dialect-postgres-postgis-schema, `+dialect.postgres.postgis.schema+`>>
|`public`
|Specifies the schema name where the PostgreSQL PostGIS extension is installed.
The default is `public`; however, if the PostGIS extension was installed in another schema, this property should be used to specify the alternate schema name.
|[[jdbc-property-dialect-sqlserver-identity-insert]]<<jdbc-property-dialect-sqlserver-identity-insert, `+dialect.sqlserver.identity.insert+`>>
|`false`
|Specifies whether the connector automatically sets an `IDENTITY_INSERT` before an `INSERT` or `UPSERT` operation into the identity column of SQL Server tables, and then unsets it immediately after the operation.
When the default setting (`false`) is in effect, an `INSERT` or `UPSERT` operation into the `IDENTITY` column of a table results in a SQL exception.
|[[jdbc-property-batch-size]]<<jdbc-property-batch-size, `+batch.size+`>>
|`500`
|Specifies how many records to attempt to batch together into the destination table.
[NOTE]
====
Note that if you set `_consumer.max.poll.records_` in the Connect worker properties to a value lower than `_batch.size_`, batch processing will be caped by `_consumer.max.poll.records_` and the desired `_batch.size_` wont be reached.
You can also configure the connectors underlying consumers `_max.poll.records_` using `_consumer.override.max.poll.records_` in the connector configuration.
====
|[[jdbc-property-field-include-list]]<<jdbc-property-field-include-list, `+field.include.list+`>>
|_empty string_
|An optional, comma-separated list of field names that match the fully-qualified names of fields to include from the change event value.
Fully-qualified names for fields are of the form `_fieldName_` or `_topicName_:_fieldName_`. +
+
If you include this property in the configuration, do not set the `field.exclude.list` property.
|[[jdbc-property-field-exclude-list]]<<jdbc-property-field-exclude-list, `+field.exclude.list+`>>
|_empty string_
|An optional, comma-separated list of field names that match the fully-qualified names of fields to exclude from the change event value.
Fully-qualified names for fields are of the form `_fieldName_` or `_topicName_:_fieldName_`. +
+
If you include this property in the configuration, do not set the `field.include.list` property.
|===
[[jdbc-connector-properties-extendable]]
.JDBC connector extendable properties
[cols="30%a,25%a,45%a"]
|===
|Property |Default |Description
|[[jdbc-property-column-naming-strategy]]<<jdbc-property-column-naming-strategy, `+column.naming.strategy+`>>
|`i.d.c.j.n.DefaultColumnNamingStrategy`
|Specifies the fully-qualified class name of a `ColumnNamingStrategy` implementation that the connector uses to resolve column names from event field names. +
+
By default, the connector uses the field name as the column name.
|[[jdbc-property-table-naming-strategy]]<<jdbc-property-table-naming-strategy, `+table.naming.strategy+`>>
|`i.d.c.j.n.DefaultTableNamingStrategy`
|Specifies the fully-qualified class name of a `TableNamingStrategy` implementation that the connector uses to resolve table names from incoming event topic names. +
+
The default behavior is to: +
* Replace the `+${topic}+` placeholder in the xref:jdbc-property-table-name-format[`table.name.format`] configuration property with the event's topic.
* Sanitize the table name by replacing dots (`.`) with underscores (`_`).
|===
// Type: concept
// ModuleID: debezium-jdbc-connector-frequently-asked-questions
// Title: JDBC connector frequently asked questions
[[jdbc-faq]]
== Frequently asked questions
*Is the* `ExtractNewRecordState` *single message transformation required?*::
No, that is actually one of the differentiating factors of the {prodname} JDBC connector from other implementations.
While the connector is capable of ingesting flattened events like its competitors, it can also ingest {prodname}'s complex change event structure natively, without requiring any specific type of transformation.
*If a column's type is changed, or if a column is renamed or dropped, is this handled by schema evolution?*::
No, the {prodname} JDBC connector does not make any changes to existing columns.
The schema evolution supported by the connector is quite basic.
It simply compares the fields in the event structure to the table's column list, and then adds any fields that are not yet defined as columns in the table.
If a column's type or default value change, the connector does not adjust them in the destination database.
If a column is renamed, the old column is left as-is, and the connector appends a column with the new name to the table; however existing rows with data in the old column remain unchanged.
These types of schema changes should be handled manually.
*If a column's type does not resolve to the type that I want, how can I enforce mapping to a different data type?*::
The {prodname} JDBC connector uses a sophisticated type system to resolve a column's data type.
For details about how this type system resolves a specific field's schema definition to a JDBC type, see the xref:jdbc-data-and-type-mappings[] section.
If you want to apply a different data type mapping, define the table manually to explicitly obtain the preferred column type.
*How do you specify a prefix or a suffix to the table name without changing the Kafka topic name?*::
In order to add a prefix or a suffix to the destination table name, adjust the xref:jdbc-property-table-name-format[table.name.format] connector configuration property to apply the prefix or suffix that you want.
For example, to prefix all table names with `jdbc_`, specify the `table.name.format` configuration property with a value of `+jdbc_${topic}+`.
If the connector is subscribed to a topic called `orders`, the resulting table is created as `jdbc_orders`.
*Why are some columns automatically quoted, even though identifier quoting is not enabled?*::
In some situations, specific column or table names might be explicitly quoted, even when `quote.identifiers` is not enabled.
This is often necessary when the column or table name starts with or uses a specific convention that would otherwise be considered illegal syntax.
For example, when the xref:jdbc-property-primary-key-mode[primary.key.mode] is set to `kafka`, some databases only permit column names to begin with an underscore if the column's name is quoted.
Quoting behavior is dialect-specific, and varies among different types of database.