tet123/documentation/modules/ROOT/pages/connectors/jdbc.adoc

710 lines
34 KiB
Plaintext
Raw Normal View History

2023-04-05 03:05:08 +02:00
[id="debezium-connector-for-jdbc"]
= {prodname} connector for JDBC
:context: JDBC
:mbean-name: {context}
:toc:
:toc-placement: macro
:linkattrs:
:icons: font
:source-highlighter: highlight.js
toc::[]
[NOTE]
====
2023-04-11 01:23:53 +02:00
This connector is currently in an incubating state. In future versions, we might change the exact semantics, configuration options, and so forth, depending on the feedback that we receive.
2023-04-05 03:05:08 +02:00
Please let us know if you encounter any problems.
====
[[jdbc-overview]]
== Overview
2023-04-11 01:23:53 +02:00
The {prodname} JDBC connector is a Kafka Connect sink connector implementation that can consume events from multiple source topics, and then write those events to a relational database by using a JDBC driver.
This connector supports a wide variety of database dialects, including Db2, MySQL, Oracle, PostgreSQL, and SQL Server.
2023-04-05 03:05:08 +02:00
[[how-the-jdbc-connector-works]]
== How the JDBC connector works
The {prodname} JDBC connector is a Kafka Connect sink connector, and therefore requires the Kafka Connect runtime.
2023-04-11 01:23:53 +02:00
The connector periodically polls the Kafka topics that it subscribes to, consumes events from those topics, and then writes the events to the configured relational database.
2023-04-05 03:05:08 +02:00
The connector supports idempotent write operations by using upsert semantics and basic schema evolution.
The following features are supported:
* xref:jdbc-consume-complex-debezium-events[]
* xref:jdbc-at-least-once-delivery[]
* xref:jdbc-multiple-tasks[]
* xref:jdbc-data-and-type-mappings[]
* xref:jdbc-primary-key-handling[]
* xref:jdbc-delete-mode[]
* xref:jdbc-idempotent-writes[]
* xref:jdbc-schema-evolution[]
* xref:jdbc-quoting-case-sensitivity[]
[[jdbc-consume-complex-debezium-events]]
=== Consuming complex {prodname} change events
2023-04-11 01:23:53 +02:00
By default, {prodname} source connectors produce complex, hierarchical change events.
When {prodname} connectors are used with other JDBC sink connector implementations, you might need to apply the `ExtractNewRecordState` single message transformation (SMT) to flatten the payload of change events, so that they can be consumed by the sink implementation.
If you run the {prodname} JDBC sink connector, it's not necessary to deploy the SMT, because the {prodname} sink connector can consume native {prodname} change events directly, without the use of a transformation.
2023-04-05 03:05:08 +02:00
2023-04-11 01:23:53 +02:00
When the JDBC sink connector consumes a complex change event from a {prodname} source connector, it extracts the values from the `after` section of the original `insert` or `update` event.
2023-04-05 03:05:08 +02:00
When a delete event is consumed by the sink connector, no part of the event's payload is consulted.
[[jdbc-at-least-once-delivery]]
2023-04-11 01:23:53 +02:00
=== At-least-once delivery
2023-04-05 03:05:08 +02:00
2023-04-11 01:23:53 +02:00
The {prodname} JDBC sink connector guarantees that events that is consumes from Kafka topics are processed at least once.
2023-04-05 03:05:08 +02:00
[[jdbc-multiple-tasks]]
=== Multiple tasks
2023-04-11 01:23:53 +02:00
You can run the {prodname} JDBC sink connector across multiple Kafka Connect tasks.
To run the connector across multiple tasks, set the `tasks.max` configuration property to the number of tasks that you want the connector to use.
The Kafka Connect runtime starts the specified number of tasks, and runs one instance of the connector per task.
2023-04-05 03:05:08 +02:00
Multiple tasks can improve performance by reading and processing changes from multiple source topics in parallel.
[[jdbc-data-and-type-mappings]]
=== Data and column type mappings
2023-04-11 01:23:53 +02:00
To enable the {prodname} JDBC sink connector to correctly map the data type from an inbound message field to an outbound message field, the connector requires information about the data type of each field that is present in the source event.
The connector supports a wide range of column type mappings across different database dialects.
To correctly convert the destination column type from the `type` metadata in an event field, the connector applies the data type mappings that are defined for the source database.
You can enhance the way that the connector resolves data types for a column by setting the `column.propagate.source.type` or `datatype.propagate.source.type` options in the source connector configuration.
When you enable these options, {prodname} includes extra parameter metadata, which assists the JDBC sink connector in more accurately resolving the data type of destination columns.
2023-04-05 03:05:08 +02:00
2023-04-11 01:23:53 +02:00
For the {prodname} JDBC sink connector to process events from a Kafka topic, the Kafka topic message key, when present, must be a primitive data type or a `Struct`.
In addition, the payload of the source message must be a `Struct` that has either a flattened structure with no nested `struct` types, or a nested `struct` layout that conforms to {prodname}'s complex, hierarchical structure.
2023-04-05 03:05:08 +02:00
2023-04-11 01:23:53 +02:00
If the structure of the events in the Kafka topic do not adhere to these rules, you must implement a custom single message transformation to convert the structure of the source events into a usable format.
2023-04-05 03:05:08 +02:00
[[jdbc-primary-key-handling]]
=== Primary key handling
2023-04-11 01:23:53 +02:00
By default, the {prodname} JDBC sink connector does not transform any of the fields in the source event into the primary key for the event.
Unfortunately, the lack of a stable primary key can complicate event processing, depending on your business requirements, or when the sink connector uses upsert semantics.
To define a consistent primary key, you can configure the connector to use one of the primary key modes described in the following table:
2023-04-05 03:05:08 +02:00
|===
|Mode|Description
|`none`
|No primary key fields are specified when creating the table.
|`kafka`
2023-04-11 01:23:53 +02:00
a|The primary key consists of the following three columns:
* `__connect_topic`
* `__connect_partition`
* `__connect_offset`
The values for these columns are sourced from the coordinates of the Kafka event.
2023-04-05 03:05:08 +02:00
|`record_key`
2023-04-11 01:23:53 +02:00
|The primary key is composed of the Kafka event's key. +
2023-04-05 03:05:08 +02:00
+
2023-04-11 01:23:53 +02:00
If the primary key is a primitive type, specify the name of the column to be used by setting the xref:jdbc-property-primary-key-fields[`primary.key.fields`] property.
If the primary key is a struct type, the fields in the struct are mapped as columns of the primary key.
You can use the `primary.key.fields` property to restrict the primary key to a subset of columns.
2023-04-05 03:05:08 +02:00
|`record_value`
2023-04-11 01:23:53 +02:00
|The primary key is composed of the Kafka event's value. +
2023-04-05 03:05:08 +02:00
+
2023-04-11 01:23:53 +02:00
Because the value of a Kafka event is always a `Struct`, by default, all of the fields in the value become columns of the primary key.
To use a subset of fields in the primary key, set the xref:jdbc-property-primary-key-fields[`primary.key.fields`] property to specify a comma-separated list of fields in the value from which you want to derive the primary key columns.
2023-04-05 03:05:08 +02:00
|`record_header`
|The primary key is composed of the Kafka event's headers. +
+
Kafka event's headers contains could contain multiple header that each one could be `Struct` or primitives data types,
the connectors makes a `Struct` of these headers. Hence, all fields in this `Struct` become columns of the primary key.
To use a subset of fields in the primary key, set the xref:jdbc-property-primary-key-fields[`primary.key.fields`] property to specify a comma-separated list of fields in the value from which you want to derive the primary key columns.
2023-04-05 03:05:08 +02:00
|===
[IMPORTANT]
====
2023-04-11 01:23:53 +02:00
Some database dialects might throw an exception if you set the `primary.key.mode` to `kafka` and set `schema.evolution` to `basic`.
This exception occurs when a dialect maps a `STRING` data type mapping to a variable length string data type such as `TEXT` or `CLOB`, and the dialect does not allow primary key columns to have unbounded lengths.
To avoid this problem, apply the following settings in your environment:
2023-04-05 03:05:08 +02:00
* Do not set `schema.evolution` to `basic`.
* Create the database table and primary key mappings in advance.
====
[[jdbc-delete-mode]]
=== Delete mode
The {prodname} JDBC sink connector can delete rows in the destination database when a `DELETE` or _tombstone_ event is consumed.
2023-04-11 01:23:53 +02:00
By default, the JDBC sink connector does not enable delete mode.
2023-04-05 03:05:08 +02:00
2023-04-11 01:23:53 +02:00
If you want to support removal of rows, explicitly enable it in the connector configuration by setting `delete.enabled=true` in the connector configuration.
However, to use this mode you must set xref:jdbc-property-primary-key-fields[`primary.key.fields`] to a value other than `none`.
The preceding configuration is necessary, because deletes are executed based on the primary key mapping, so if a destination table has no primary key mapping, the connector is unable to delete rows.
2023-04-05 03:05:08 +02:00
[[jdbc-idempotent-writes]]
=== Idempotent writes
2023-04-11 01:23:53 +02:00
The {prodname} JDBC sink connector supports idempotent writes, allowing the same records to be replayed repeatedly and the final database state to remain consistent.
2023-04-05 03:05:08 +02:00
In order to support idempotent writes, the JDBC sink connector must be explicitly configured with the `insert.mode` set to `upsert`.
2023-04-11 01:23:53 +02:00
An `upsert` operation is applied as either an `update` or an `insert`, depending on whether the specified primary key already exists.
If the primary key value already exists, the operation updates values in the row.
If the specified primary key value doesn't exist, an `insert` adds a new row.
2023-04-05 03:05:08 +02:00
2023-04-11 01:23:53 +02:00
Each database dialect handles idempotent writes differently, because there is no SQL standard for _upsert_ operations.
The following illustrates the upsert database-specific DML syntax that is used by the supported database dialects:
2023-04-05 03:05:08 +02:00
|===
|Dialect |Upsert Syntax
|Db2
|`MERGE ...`
|MySQL
|`INSERT ... ON DUPLICATE KEY UPDATE ...`
|Oracle
|`MERGE ...`
|PostgreSQL
|`INSERT ... ON CONFLICT ... DO UPDATE SET ...`
|SQL Server
|`MERGE ...`
|===
[[jdbc-schema-evolution]]
=== Schema evolution
The {prodname} JDBC sink connector supports the following schema evolution modes:
|===
|Mode |Description
|`none`
|The connector does not perform any DDL schema evolution.
|`basic`
2023-04-11 01:23:53 +02:00
|The connector automatically detects fields that are in the event payload but that do not exist in the destination table.
The connector alters the destination table to add the new fields.
2023-04-05 03:05:08 +02:00
|===
2023-04-11 01:23:53 +02:00
When `schema.evolution` is set to `basic`, the connector automatically creates or alters the destination database table according to the structure of the incoming event.
2023-04-05 03:05:08 +02:00
2023-04-11 01:23:53 +02:00
When an event is received from a topic for the first time, and the destination table does not yet exist, the {prodname} JDBC sink connector uses the event's key, or the schema structure of the record to resolve the column structure of the table.
If schema evolution is enabled, the connector prepares and executes a `CREATE TABLE` SQL statement before it applies the DML event to the destination table.
2023-04-05 03:05:08 +02:00
2023-04-11 01:23:53 +02:00
When the {prodname} JDBC connector receives an event from a topic, if the schema structure of the record differs from the schema structure of the destination table, the connector uses either the event's key or its schema structure to identify which columns are new, and must be added to the database table.
If schema evolution is enabled, the connector prepares and executes an `ALTER TABLE` SQL statement before it applies the DML event to the destination table.
Because changing column data types, dropping columns, and adjusting primary keys can be considered dangerous operations, the connector is prohibited from performing these operations.
2023-04-05 03:05:08 +02:00
2023-04-11 01:23:53 +02:00
The schema of each field determines whether a column is `NULL` or `NOT NULL`.
The schema also defines the default values for each column.
If the connector attempts to create a table with a nullability setting or a default value that don't want, you must either create the table manually, ahead of time, or adjust the schema of the associated field before the sink connector processes the event.
To adjust nullability settings or default values, you can introduce a custom single message transformation that applies changes in the pipeline, or modifies the column state defined in the source database.
2023-04-05 03:05:08 +02:00
2023-04-11 01:23:53 +02:00
A field's data type is resolved based on a predefined set of mappings.
For more information, see xref:jdbc-field-types[].
2023-04-05 03:05:08 +02:00
[IMPORTANT]
====
2023-04-11 01:23:53 +02:00
When you introduce new fields to the event structure of tables that already exist in the destination database, you must define the new fields as optional, or the fields must have a default value specified in the database schema.
If you want a field to be removed from the destination table, use one of the following options:
* Remove the field manually.
* Drop the column.
* Assign a default value to the field.
* Define the field a nullable.
2023-04-05 03:05:08 +02:00
====
[[jdbc-quoting-case-sensitivity]]
=== Quoting and case sensitivity
The {prodname} JDBC sink connector consumes Kafka messages by constructing either DDL (schema changes) or DML (data changes) SQL statements that are executed on the destination database.
2023-04-11 01:23:53 +02:00
By default, the connector uses the names of the source topic and the event fields as the basis for the table and column names in the destination table.
The constructed SQL does not automatically delimit identifiers with quotes to preserve the case of the original strings.
As a result, by default, the text case of table or column names in the destination database depends entirely on how the database handles name strings when the case is not specified.
2023-04-05 03:05:08 +02:00
For example, if the destination database dialect is Oracle and the event's topic is `orders`, the destination table will be created as `ORDERS` because Oracle defaults to upper-case names when the name is not quoted.
Similarly, if the destination database dialect is PostgreSQL and the event's topic is `ORDERS`, the destination table will be created as `orders` because PostgreSQL defaults to lower-case names when the name is not quoted.
2023-04-11 01:23:53 +02:00
By setting `quote.identifiers` to `true` in the connector configuration, you can explicitly set the case of the table and field names to preserve the case that is present in the Kafka event.
So if the incoming event is for a topic called `orders` and the destination database dialect is Oracle, if quoting is enabled -- that is, if `quote.identifiers` is set to `true` -- the connector creates a table with the name `orders`, because the constructed SQL defines the name of the table as `"orders"`.
When quoting is enabled, the behavior for creating column names works in the same manner.
2023-04-05 03:05:08 +02:00
[[jdbc-field-types]]
== Data type mappings
2023-04-11 01:23:53 +02:00
The {prodname} JDBC sink connector resolves a column's data type by using a logical or primitive type-mapping system.
Primitive types include values such as integers, floating points, Booleans, strings, and bytes.
Typically, these types are represented with a specific Kafka Connect `Schema` type code only.
Logical data types are more often complex types, including values such as `Struct`-based types that have a fixed set of field names and schema, or values that are represented with a specific encoding, such as number of days since epoch.
2023-04-05 03:05:08 +02:00
2023-04-11 01:23:53 +02:00
The following examples show representative structures of primitive and logical data types:
2023-04-05 03:05:08 +02:00
.Primitive field schema
[source.json]
----
{
"schema": {
"type": "INT64"
}
}
----
.Logical field schema
[source,json]
----
[
"schema": {
"type": "INT64",
"name": "org.apache.kafka.connect.data.Date"
}
]
----
Kafka Connect is not the only source for these complex, logical types.
2023-04-11 01:23:53 +02:00
In fact, {prodname} source connectors generate change events that have fields with similar logical types to represent a variety of different data types, including but not limited to, timestamps, dates, and even JSON data.
2023-04-05 03:05:08 +02:00
The {prodname} JDBC sink connector uses these primitive and logical types to resolve a column's type to a JDBC SQL code, which represents a column's type.
These JDBC SQL codes are then used by the underlying Hibernate persistence framework to resolve the column's type to a logical data type for the dialect in use.
2023-04-11 01:23:53 +02:00
The following tables illustrate the primitive and logical mappings between Kafka Connect and JDBC SQL types, and between {prodname} and JDBC SQL types.
The actual final column type varies with for each database type.
2023-04-05 03:05:08 +02:00
2023-04-11 01:23:53 +02:00
. xref:jdbc-kafka-connect-primitive-mappings[]
. xref:jdbc-kafka-connect-logical-mappings[]
. xref:jdbc-debezium-logical-mappings[]
. xref:jdbc-debezium-logical-mappings-dialect-specific[]
2023-04-05 03:05:08 +02:00
[[jdbc-kafka-connect-primitive-mappings]]
2023-04-11 01:23:53 +02:00
.Mappings between Kafka Connect Primitives and Column Data Types
2023-04-05 03:05:08 +02:00
|===
|Primitive Type |JDBC SQL Type
|INT8
|Types.TINYINT
|INT16
|Types.SMALLINT
|INT32
|Types.INTEGER
|INT64
|Types.BIGINT
|FLOAT32
|Types.FLOAT
|FLOAT64
|Types.DOUBLE
|BOOLEAN
|Types.BOOLEAN
|STRING
|Types.CHAR, Types.NCHAR, Types.VARCHAR, Types.NVARCHAR
|BYTES
|Types.VARBINARY
|===
[[jdbc-kafka-connect-logical-mappings]]
2023-04-11 01:23:53 +02:00
.Mappings between Kafka Connect Logical Types and Column Data Types
2023-04-05 03:05:08 +02:00
|===
|Logical Type |JDBC SQL Type
|org.apache.kafka.connect.data.Decimal
|Types.DECIMAL
|org.apache.kafka.connect.data.Date
|Types.DATE
|org.apache.kafka.connect.data.Time
|Types.TIMESTAMP
|org.apache.kafka.connect.data.Timestamp
|Types.TIMESTAMP
|===
[[jdbc-debezium-logical-mappings]]
2023-04-11 01:23:53 +02:00
.Mappings between {prodname} Logical Types and Column Data Types
2023-04-05 03:05:08 +02:00
|===
|Logical Type |JDBC SQL Type
|io.debezium.time.Date
|Types.DATE
|io.debezium.time.Time
|Types.TIMESTAMP
|io.debezium.time.MicroTime
|Types.TIMESTAMP
|io.debezium.time.NanoTime
|Types.TIMESTAMP
|io.debezium.time.ZonedTime
|Types.TIME_WITH_TIMEZONE
|io.debezium.time.Timestamp
|Types.TIMESTAMP
|io.debezium.time.MicroTimestamp
|Types.TIMESTAMP
|io.debezium.time.NanoTimestamp
|Types.TIMESTAMP
|io.debezium.time.ZonedTimestamp
|Types.TIMESTAMP_WITH_TIMEZONE
|io.debezium.data.VariableScaleDecimal
|Types.DOUBLE
|===
[IMPORTANT]
====
2023-04-11 01:23:53 +02:00
If the database does not support time or timestamps with time zones, the mapping resolves to its equivalent without timezones.
2023-04-05 03:05:08 +02:00
====
[[jdbc-debezium-logical-mappings-dialect-specific]]
2023-04-11 01:23:53 +02:00
.Mappings between {prodname} dialect-specific Logical Types and Column Data Types
2023-04-05 03:05:08 +02:00
|===
|Logical Type |MySQL SQL Type |PostgreSQL SQL Type |SQL Server SQL Type
|io.debezium.data.Bits
|`bit(n)`
|`bit(n)` or `bit varying`
|`varbinary(n)`
|io.debezium.data.Enum
|`enum`
|Types.VARCHAR
|n/a
|io.debezium.data.Json
|`json`
|`json`
|n/a
|io.debezium.data.EnumSet
|`set`
|n/a
|n/a
|io.debezium.time.Year
|`year(n)`
|n/a
|n/a
|io.debezium.time.MicroDuration
|n/a
|`interval`
|n/a
|io.debezium.data.Ltree
|n/a
|`ltree`
|n/a
|io.debezium.data.Uuid
|n/a
|`uuid`
|n/a
|io.debezium.data.Xml
|n/a
|`xml`
|`xml`
|===
2023-04-11 01:23:53 +02:00
In addition to the primitive and logical mappings above, if the source of the change events is a {prodname} source connector, the resolution of the column type, along with its length, precision, and scale, can be further influenced by enabling column or data type propagation.
To enforce propagation, one of the following properties must be set in the source connector configuration:
* `column.propagate.source.type`
* `datatype.propagate.source.type`
The {prodname} JDBC sink connector applies the values with the higher precedence.
2023-04-05 03:05:08 +02:00
For example, let's say the following field schema is included in a change event:
2023-04-11 01:23:53 +02:00
.{prodname} change event field schema with column or data type propagation enabled
2023-04-05 03:05:08 +02:00
[source,json]
----
{
"schema": {
"type": "INT8",
"parameters": {
"__debezium.source.column.type": "TINYINT",
"__debezium.source.column.length": "1"
}
}
}
----
2023-04-11 01:23:53 +02:00
In the preceding example, if no schema parameters are set, the {prodname} JDBC sink connector would map this field to a column type of `Types.SMALLINT`.
`Types.SMALLINT` can have different logical database types, depending on the database dialect.
For MySQL, the example would convert to a `TINYINT` column type with no specified length.
If column or data type propagation is enabled for the source connector, the {prodname} JDBC sink connector uses the mapping information to refine the data type mapping process and create a column with the type `TINYINT(1)`.
2023-04-05 03:05:08 +02:00
[NOTE]
====
2023-04-11 01:23:53 +02:00
Typically, the effect of using column or data type propagation is much greater when the same type of database is used for both the source and sink database.
2023-04-05 03:05:08 +02:00
We are continually looking at ways to improve this mapping across heterogeneous databases and the current type system allows us to continue to refine these mappings based on feedback.
If you find a mapping could be improved, please let us know.
====
[[jdbc-deployment]]
== Deployment
To deploy a {prodname} JDBC connector, you install the {prodname} JDBC connector archive, configure the connector, and start the connector by adding its configuration to Kafka Connect.
.Prerequisites
* link:https://zookeeper.apache.org/[Apache ZooKeeper], link:http://kafka.apache.org/[Apache Kafka], and link:{link-kafka-docs}.html#connect[Kafka Connect] are installed.
2023-04-07 19:11:16 +02:00
* Destination database is installed and configured to accept JDBC connections.
2023-04-05 03:05:08 +02:00
.Procedure
. Download the {prodname} https://repo1.maven.org/maven2/io/debezium/debezium-connector-jdbc/{debezium-version}/debezium-connector-jdbc-{debezium-version}-plugin.tar.gz[JDBC connector plug-in archive].
2023-04-05 03:05:08 +02:00
. Extract the files into your Kafka Connect environment.
. Optionally download the JDBC driver from Maven Central and extract the downloaded driver file to the directory that contains the JDBC sink connector JAR file.
+
NOTE: Drivers for Oracle and Db2 are not automatically shipped with the JDBC sink connector and must be manually installed.
2023-04-07 19:11:16 +02:00
. Add the driver JAR files to the path where the JDBC sink connector has been installed.
. Make sure that the path where the JDBC sink connector is installed is part of {link-kafka-docs}/#connectconfigs[Kafka Connect's `plugin.path`].
2023-04-05 03:05:08 +02:00
. Restart your Kafka Connect process to pick up the new JAR files.
[[jdbc-connector-configuration]]
=== {prodname} JDBC connector configuration
Typically, you register a {prodname} JDBC connector by submitting a JSON request that specifies the configuration properties for the connector.
2023-04-11 01:23:53 +02:00
The following example shows a JSON request for registering an instance of the {prodname} JDBC sink connector that consumes events from a topic called `orders` with the most common configuration settings:
2023-04-05 03:05:08 +02:00
.Example: {prodname} JDBC connector configuration
[source,json,indent=0,subs="+quotes"]
----
{
"name": "jdbc-connector", // <1>
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector", // <2>
"tasks.max": "1", // <3>
"connection.url": "jdbc:postgresql://localhost/db", // <4>
"connection.username": "pguser", // <5>
"connection.password": "pgpassword", // <6>
"insert.mode": "upsert", // <7>
"delete.enabled": "true", // <8>
"primary.key.mode": "record_key", // <9>
"schema.evolution": "basic", // <10>
"database.time_zone": "UTC" // <11>
}
}
----
<1> The name that is assigned to the connector when you register it with Kafka Connect service.
<2> The name of the JDBC sink connector class.
<3> The maximum number of tasks to create for this connector.
2023-04-11 01:23:53 +02:00
<4> The JDBC URL that the connector uses to connect to the sink database that it writes to.
2023-04-05 03:05:08 +02:00
<5> The name of the database user used for authentication.
<6> The password of the database user used for authentication.
2023-04-11 01:23:53 +02:00
<7> The xref:jdbc-property-insert-mode[insert.mode] that the connector uses.
<8> Enables the deletion of records in the database.
For more information, see the xref:jdbc-property-delete-enabled[delete.enabled] configuration property.
<9> Specifies the method used to resolve primary key columns.
For more information, see the xref:jdbc-property-primary-key-mode[primary.key.mode] configuration property.
<10> Enables the connector to evolve the destination database's schema.
For more information, see the xref:jdbc-property-schema-evolution[schema.evolution] configuration property.
2023-04-05 03:05:08 +02:00
<11> Specifies the timezone used when writing temporal field types.
2023-04-11 01:23:53 +02:00
For a complete list of configuration properties that you can set for the {prodname} JDBC connector, see xref:jdbc-connector-properties[JDBC connector properties].
2023-04-05 03:05:08 +02:00
You can send this configuration with a `POST` command to a running Kafka Connect service.
The service records the configuration and starts a sink connector task(s) that performs the following operations:
2023-04-11 01:23:53 +02:00
* Connects to the database.
* Consumes events from subscribed Kafka topics.
2023-04-05 03:05:08 +02:00
* Writes the events to the configured database.
[[jdbc-connector-properties]]
== Connector properties
2023-04-11 01:23:53 +02:00
The {prodname} JDBC sink connector has several configuration properties that you can use to achieve the connector behavior that meets your needs.
2023-04-05 03:05:08 +02:00
Many properties have default values.
Information about the properties is organized as follows:
* xref:jdbc-connector-properties-connection[]
* xref:jdbc-connector-properties-runtime[]
* xref:jdbc-connector-properties-extendable[]
[[jdbc-connector-properties-connection]]
.Connection properties
[cols="30%a,25%a,45%a"]
|===
|Property |Default |Description
|[[jdbc-property-connection-url]]<<jdbc-property-connection-url, `+connection.url+`>>
|No default
|The JDBC connection URL used to connect to the database.
|[[jdbc-property-connection-username]]<<jdbc-property-connection-username, `+connection.username+`>>
|No default
|The name of the database user account that the connector uses to connect to the database.
|[[jdbc-property-connection-password]]<<jdbc-property-connection-password, `+connection.password+`>>
|No default
2023-04-11 01:23:53 +02:00
|The password that the connector uses to connect to the database.
2023-04-05 03:05:08 +02:00
|[[jdbc-property-connection-pool-min-size]]<<jdbc-property-connection-pool-min-size, `+connection.pool.min_size+`>>
|`5`
2023-04-11 01:23:53 +02:00
|Specifies the minimum number of connections in the pool.
2023-04-05 03:05:08 +02:00
|[[jdbc-property-connection-pool-max-size]]<<jdbc-property-connection-pool-max-size, `+connection.pool.min_size+`>>
|`32`
2023-04-11 01:23:53 +02:00
|Specifies the maximum number of concurrent connections that the pool maintains.
2023-04-05 03:05:08 +02:00
|[[jdbc-property-connection-pool-acquire-increment]]<<jdbc-property-connection-pool-acquire-increment, `+connection.pool.acquire_increment+`>>
|`32`
2023-04-11 01:23:53 +02:00
|Specifies the number of connections that the connector attempts to acquire if the connection pool exceeds its maximum size.
2023-04-05 03:05:08 +02:00
|[[jdbc-property-connection-pool-timeout]]<<jdbc-property-connection-pool-timeout, `+connection.pool.timeout+`>>
|`1800`
2023-04-11 01:23:53 +02:00
|Specifies the number of seconds that an unused connection is kept before it is discarded.
2023-04-05 03:05:08 +02:00
|===
[[jdbc-connector-properties-runtime]]
.Runtime properties
[cols="30%a,25%a,45%a"]
|===
|Property |Default |Description
|[[jdbc-property-database-time-zone]]<<jdbc-property-database-time-zone, `+database.time_zone+`>>
|`UTC`
|Specifies the timezone used when inserting JDBC temporal values.
|[[jdbc-property-delete-enabled]]<<jdbc-property-delete-enabled, `+delete.enabled+`>>
|`false`
2023-04-11 01:23:53 +02:00
|Specifies whether the connector processes `DELETE` or _tombstone_ events and removes the corresponding row from the database.
Use of this option requires that you set the xref:jdbc-property-primary-key-mode[`primary.key.mode`] to `record.key`.
2023-04-05 03:05:08 +02:00
|[[jdbc-property-insert-mode]]<<jdbc-property-insert-mode, `+insert.mode+`>>
|`insert`
|Specifies the strategy used to insert events into the database.
2023-04-11 01:23:53 +02:00
The following options are available:
2023-04-05 03:05:08 +02:00
`insert`:: Specifies that all events should construct `INSERT`-based SQL statements.
2023-04-11 01:23:53 +02:00
Use this option only when no primary key is used, or when you can be certain that no updates can occur to rows with existing primary key values.
2023-04-05 03:05:08 +02:00
`update`:: Specifies that all events should construct `UPDATE`-based SQL statements.
2023-04-11 01:23:53 +02:00
Use this option only when you can be certain that the connector receives only events that apply to existing rows.
`upsert`:: Specifies that the connector adds events to the table using `upsert` semantics.
That is, if the primary key does not exist, the connector performs an `INSERT` operation, and if the key does exist, the connector performs an `UPDATE` operation.
When idempotent writes are required, the connector should be configured to use this option.
2023-04-05 03:05:08 +02:00
|[[jdbc-property-primary-key-mode]]<<jdbc-property-primary-key-mode, `+primary.key.mode+`>>
|`none`
2023-04-11 01:23:53 +02:00
|Specifies how the connector resolves the primary key columns from the event.
`none`:: Specifies that no primary key columns are created.
`kafka`:: Specifies that the connector uses Kafka coordinates as the primary key columns.
The key coordinates are defined from the topic name, partition, and offset of the event, and are mapped to columns with the following names:
* `__connect_topic`
* `__connect_partition`
* `__connect_offset`
2023-04-05 03:05:08 +02:00
2023-04-11 01:23:53 +02:00
`record_key`:: Specifies that the primary key columns are sourced from the event's record key.
If the record key is a primitive type, the xref:jdbc-property-primary-key-fields[`primary.key.fields`] property is required to specify the name of the primary key column.
If the record key is a struct type, the xref:jdbc-property-primary-key-fields[`primary.key.fields`] property is optional, and can be used to specify a subset of columns from the event's key as the table's primary key.
`record_value`:: Specifies that the primary key columns is sourced from the event's value.
You can set the xref:jdbc-property-primary-key-fields[`primary.key.fields`] property to define the primary key as a subset of fields from the event's value; otherwise all fields are used by default.
2023-04-05 03:05:08 +02:00
|[[jdbc-property-primary-key-fields]]<<jdbc-property-primary-key-fields, `+primary.key.fields+`>>
|No default
|Either the name of the primary key column or a comma-separated list of fields to derive the primary key from. +
+
2023-04-11 01:23:53 +02:00
When xref:jdbc-property-primary-key-mode[`primary.key.mode`] is set to `record_key` and the event's key is a primitive type, it is expected that this property specifies the column name to be used for the key. +
2023-04-05 03:05:08 +02:00
+
2023-04-11 01:23:53 +02:00
When the xref:jdbc-property-primary-key-mode[`primary.key.mode`] is set to `record_key` with a non-primitive key, or `record_value`, it is expected that this property specifies a comma-separated list of field names from either the key or value.
If the xref:jdbc-property-primary-key-mode[`primary.key.mode`] is set to `record_key` with a non-primitive key, or `record_value`, and this property is not specifies, the connector derives the primary key from all fields of either the record key or record value, depending on the specified mode.
2023-04-05 03:05:08 +02:00
|[[jdbc-property-quote-identifiers]]<<jdbc-property-quote-identifiers, `+quote.identifiers+`>>
|`false`
2023-04-11 01:23:53 +02:00
|Specifies whether generated SQL statements use quotation marks to delimit table and column names.
2023-04-05 03:05:08 +02:00
See the xref:jdbc-quoting-case-sensitivity[] section for more details.
|[[jdbc-property-schema-evolution]]<<jdbc-property-schema-evolution, `+schema.evolution+`>>
|`none`
2023-04-11 01:23:53 +02:00
|Specifies how the connector evolves the destination table schemas.
For more information, see xref:jdbc-schema-evolution[].
The following options are available:
2023-04-05 03:05:08 +02:00
2023-04-11 01:23:53 +02:00
`none`:: Specifies that the connector does not evolve the destination schema.
`basic`:: Specifies that basic evolution occurs.
The connector adds missing columns to the table by comparing the incoming event's record schema to the database table structure.
2023-04-05 03:05:08 +02:00
|[[jdbc-property-table-name-format]]<<jdbc-property-table-name-format, `+table.name.format+`>>
|`${topic}`
2023-04-11 01:23:53 +02:00
|Specifies a string that determines how the destination table name is formatted, based on the topic name of the event.
The placeholder, `${topic}`, is replaced by the topic name.
2023-04-05 03:05:08 +02:00
|===
[[jdbc-connector-properties-extendable]]
.Extendable properties
[cols="30%a,25%a,45%a"]
|===
|Property |Default |Description
|[[jdbc-property-column-naming-strategy]]<<jdbc-property-column-naming-strategy, `+column.naming.strategy+`>>
|`i.d.c.j.n.DefaultColumnNamingStrategy`
2023-04-11 01:23:53 +02:00
|Specifies the fully-qualified class name of a `ColumnNamingStrategy` implementation that the connector uses to resolve column names from event field names. +
2023-04-05 03:05:08 +02:00
+
2023-04-11 01:23:53 +02:00
By default, the connector uses the field name as the column name.
2023-04-05 03:05:08 +02:00
|[[jdbc-property-table-naming-strategy]]<<jdbc-property-table-naming-strategy, `+table.naming.strategy+`>>
|`i.d.c.j.n.DefaultTableNamingStrategy`
2023-04-11 01:23:53 +02:00
|Specifies the fully-qualified class name of a `TableNamingStrategy` implementation that the connector uses to resolve table names from incoming event topic names. +
2023-04-05 03:05:08 +02:00
+
The default behavior is to: +
2023-04-11 01:23:53 +02:00
* Replace the `${topic}` placeholder in the xref:jdbc-property-table-name-format[`table.name.format`] configuration property with the event's topic.
2023-04-05 03:05:08 +02:00
* Sanitize the table name by replacing dots (`.`) with underscores (`_`).
|===
[[jdbc-faq]]
== Frequently asked questions
2023-04-11 01:23:53 +02:00
*Is the* `ExtractNewRecordState` *single message transformation required?*::
No, that is actually one of the differentiating factors of the {prodname} JDBC connector from other implementations.
While the connector is capable of ingesting flattened events like its competitors, it can also ingest {prodname}'s complex change event structure natively, without requiring any specific type of transformation.
2023-04-05 03:05:08 +02:00
2023-04-11 01:23:53 +02:00
*If a column's type is changed, or if a column is renamed or dropped, is this handled by schema evolution?*::
2023-04-05 03:05:08 +02:00
No, the {prodname} JDBC connector does not make any changes to existing columns.
2023-04-11 01:23:53 +02:00
The schema evolution supported by the connector is quite basic.
It simply compares the fields in the event structure to the table's column list, and then adds any fields that are not yet defined as columns in the table.
If a column's type or default value change, the connector does not adjust them in the destination database.
If a column is renamed, the old column is left as-is, and the connector appends a column with the new name to the table; however existing rows with data in the old column remain unchanged.
2023-04-05 03:05:08 +02:00
These types of schema changes should be handled manually.
2023-04-11 01:23:53 +02:00
*If a column's type does not resolve to the type that I want, how can I enforce mapping to a different data type?*::
2023-04-05 03:05:08 +02:00
The {prodname} JDBC connector uses a sophisticated type system to resolve a column's data type.
2023-04-11 01:23:53 +02:00
For details about how this type system resolves a specific field's schema definition to a JDBC type, see the xref:jdbc-data-and-type-mappings[] section.
If you want to apply a different data type mapping, define the table manually to explicitly obtain the preferred column type.
2023-04-05 03:05:08 +02:00
2023-04-11 01:23:53 +02:00
*How do you specify a prefix or a suffix to the table name without changing the Kafka topic name?*::
In order to add a prefix or a suffix to the destination table name, adjust the xref:jdbc-property-table-name-format[table.name.format] connector configuration property to apply the prefix or suffix that you want.
2023-04-05 03:05:08 +02:00
For example, to prefix all table names with `jdbc_`, specify the `table.name.format` configuration property with a value of `jdbc_${topic}`.
2023-04-11 01:23:53 +02:00
If the connector is subscribed to a topic called `orders`, the resulting table is created as `jdbc_orders`.
2023-04-05 03:05:08 +02:00
2023-04-11 01:23:53 +02:00
*Why are some columns automatically quoted, even though identifier quoting is not enabled?*::
In some situations, specific column or table names might be explicitly quoted, even when `quote.identifiers` is not enabled.
2023-04-05 03:05:08 +02:00
This is often necessary when the column or table name starts with or uses a specific convention that would otherwise be considered illegal syntax.
2023-04-11 01:23:53 +02:00
For example, when the xref:jdbc-property-primary-key-mode[primary.key.mode] is set to `kafka`, some databases only permit column names to begin with an underscore if the column's name is quoted.
Quoting behavior is dialect-specific, and varies among different types of database.
2023-04-05 03:05:08 +02:00