This connector is currently in an incubating state. In future versions, we might change the exact semantics, configuration options, and so forth, depending on the feedback that we receive.
The {prodname} JDBC connector is a Kafka Connect sink connector implementation that can consume events from multiple source topics, and then write those events to a relational database by using a JDBC driver.
This connector supports a wide variety of database dialects, including Db2, MySQL, Oracle, PostgreSQL, and SQL Server.
The connector periodically polls the Kafka topics that it subscribes to, consumes events from those topics, and then writes the events to the configured relational database.
By default, {prodname} source connectors produce complex, hierarchical change events.
When {prodname} connectors are used with other JDBC sink connector implementations, you might need to apply the `ExtractNewRecordState` single message transformation (SMT) to flatten the payload of change events, so that they can be consumed by the sink implementation.
If you run the {prodname} JDBC sink connector, it's not necessary to deploy the SMT, because the {prodname} sink connector can consume native {prodname} change events directly, without the use of a transformation.
When the JDBC sink connector consumes a complex change event from a {prodname} source connector, it extracts the values from the `after` section of the original `insert` or `update` event.
The {prodname} JDBC sink connector is not designed to read from schema change topics.
If your source connector is configured to capture schema changes, in the JDBC connector configuration, set the xref:jdbc-property-connection-topics[`topics`] or xref:jdbc-property-connection-topics-regex[`topics.regex`] properties so that the connector does not consume from schema change topics.
To enable the {prodname} JDBC sink connector to correctly map the data type from an inbound message field to an outbound message field, the connector requires information about the data type of each field that is present in the source event.
The connector supports a wide range of column type mappings across different database dialects.
To correctly convert the destination column type from the `type` metadata in an event field, the connector applies the data type mappings that are defined for the source database.
You can enhance the way that the connector resolves data types for a column by setting the `column.propagate.source.type` or `datatype.propagate.source.type` options in the source connector configuration.
When you enable these options, {prodname} includes extra parameter metadata, which assists the JDBC sink connector in more accurately resolving the data type of destination columns.
For the {prodname} JDBC sink connector to process events from a Kafka topic, the Kafka topic message key, when present, must be a primitive data type or a `Struct`.
In addition, the payload of the source message must be a `Struct` that has either a flattened structure with no nested `struct` types, or a nested `struct` layout that conforms to {prodname}'s complex, hierarchical structure.
If the structure of the events in the Kafka topic do not adhere to these rules, you must implement a custom single message transformation to convert the structure of the source events into a usable format.
By default, the {prodname} JDBC sink connector does not transform any of the fields in the source event into the primary key for the event.
Unfortunately, the lack of a stable primary key can complicate event processing, depending on your business requirements, or when the sink connector uses upsert semantics.
To define a consistent primary key, you can configure the connector to use one of the primary key modes described in the following table:
If the primary key is a primitive type, specify the name of the column to be used by setting the xref:jdbc-property-primary-key-fields[`primary.key.fields`] property.
If the primary key is a struct type, the fields in the struct are mapped as columns of the primary key.
You can use the `primary.key.fields` property to restrict the primary key to a subset of columns.
Because the value of a Kafka event is always a `Struct`, by default, all of the fields in the value become columns of the primary key.
To use a subset of fields in the primary key, set the xref:jdbc-property-primary-key-fields[`primary.key.fields`] property to specify a comma-separated list of fields in the value from which you want to derive the primary key columns.
|The primary key is composed of the Kafka event's headers. +
+
Kafka event's headers contains could contain multiple header that each one could be `Struct` or primitives data types,
the connectors makes a `Struct` of these headers. Hence, all fields in this `Struct` become columns of the primary key.
To use a subset of fields in the primary key, set the xref:jdbc-property-primary-key-fields[`primary.key.fields`] property to specify a comma-separated list of fields in the value from which you want to derive the primary key columns.
Some database dialects might throw an exception if you set the `primary.key.mode` to `kafka` and set `schema.evolution` to `basic`.
This exception occurs when a dialect maps a `STRING` data type mapping to a variable length string data type such as `TEXT` or `CLOB`, and the dialect does not allow primary key columns to have unbounded lengths.
To avoid this problem, apply the following settings in your environment:
If you want to support removal of rows, explicitly enable it in the connector configuration by setting `delete.enabled=true` in the connector configuration.
However, to use this mode you must set xref:jdbc-property-primary-key-fields[`primary.key.fields`] to a value other than `none`.
The preceding configuration is necessary, because deletes are executed based on the primary key mapping, so if a destination table has no primary key mapping, the connector is unable to delete rows.
The {prodname} JDBC sink connector supports idempotent writes, allowing the same records to be replayed repeatedly and the final database state to remain consistent.
When `schema.evolution` is set to `basic`, the connector automatically creates or alters the destination database table according to the structure of the incoming event.
When an event is received from a topic for the first time, and the destination table does not yet exist, the {prodname} JDBC sink connector uses the event's key, or the schema structure of the record to resolve the column structure of the table.
If schema evolution is enabled, the connector prepares and executes a `CREATE TABLE` SQL statement before it applies the DML event to the destination table.
When the {prodname} JDBC connector receives an event from a topic, if the schema structure of the record differs from the schema structure of the destination table, the connector uses either the event's key or its schema structure to identify which columns are new, and must be added to the database table.
If schema evolution is enabled, the connector prepares and executes an `ALTER TABLE` SQL statement before it applies the DML event to the destination table.
Because changing column data types, dropping columns, and adjusting primary keys can be considered dangerous operations, the connector is prohibited from performing these operations.
The schema of each field determines whether a column is `NULL` or `NOT NULL`.
The schema also defines the default values for each column.
If the connector attempts to create a table with a nullability setting or a default value that don't want, you must either create the table manually, ahead of time, or adjust the schema of the associated field before the sink connector processes the event.
To adjust nullability settings or default values, you can introduce a custom single message transformation that applies changes in the pipeline, or modifies the column state defined in the source database.
When you introduce new fields to the event structure of tables that already exist in the destination database, you must define the new fields as optional, or the fields must have a default value specified in the database schema.
If you want a field to be removed from the destination table, use one of the following options:
The {prodname} JDBC sink connector consumes Kafka messages by constructing either DDL (schema changes) or DML (data changes) SQL statements that are executed on the destination database.
By default, the connector uses the names of the source topic and the event fields as the basis for the table and column names in the destination table.
The constructed SQL does not automatically delimit identifiers with quotes to preserve the case of the original strings.
As a result, by default, the text case of table or column names in the destination database depends entirely on how the database handles name strings when the case is not specified.
For example, if the destination database dialect is Oracle and the event's topic is `orders`, the destination table will be created as `ORDERS` because Oracle defaults to upper-case names when the name is not quoted.
Similarly, if the destination database dialect is PostgreSQL and the event's topic is `ORDERS`, the destination table will be created as `orders` because PostgreSQL defaults to lower-case names when the name is not quoted.
By setting `quote.identifiers` to `true` in the connector configuration, you can explicitly set the case of the table and field names to preserve the case that is present in the Kafka event.
So if the incoming event is for a topic called `orders` and the destination database dialect is Oracle, if quoting is enabled -- that is, if `quote.identifiers` is set to `true` -- the connector creates a table with the name `orders`, because the constructed SQL defines the name of the table as `"orders"`.
When quoting is enabled, the behavior for creating column names works in the same manner.
The {prodname} JDBC sink connector resolves a column's data type by using a logical or primitive type-mapping system.
Primitive types include values such as integers, floating points, Booleans, strings, and bytes.
Typically, these types are represented with a specific Kafka Connect `Schema` type code only.
Logical data types are more often complex types, including values such as `Struct`-based types that have a fixed set of field names and schema, or values that are represented with a specific encoding, such as number of days since epoch.
In fact, {prodname} source connectors generate change events that have fields with similar logical types to represent a variety of different data types, including but not limited to, timestamps, dates, and even JSON data.
The {prodname} JDBC sink connector uses these primitive and logical types to resolve a column's type to a JDBC SQL code, which represents a column's type.
These JDBC SQL codes are then used by the underlying Hibernate persistence framework to resolve the column's type to a logical data type for the dialect in use.
The following tables illustrate the primitive and logical mappings between Kafka Connect and JDBC SQL types, and between {prodname} and JDBC SQL types.
The actual final column type varies with for each database type.
In addition to the primitive and logical mappings above, if the source of the change events is a {prodname} source connector, the resolution of the column type, along with its length, precision, and scale, can be further influenced by enabling column or data type propagation.
To enforce propagation, one of the following properties must be set in the source connector configuration:
* `column.propagate.source.type`
* `datatype.propagate.source.type`
The {prodname} JDBC sink connector applies the values with the higher precedence.
In the preceding example, if no schema parameters are set, the {prodname} JDBC sink connector would map this field to a column type of `Types.SMALLINT`.
`Types.SMALLINT` can have different logical database types, depending on the database dialect.
For MySQL, the example would convert to a `TINYINT` column type with no specified length.
If column or data type propagation is enabled for the source connector, the {prodname} JDBC sink connector uses the mapping information to refine the data type mapping process and create a column with the type `TINYINT(1)`.
Typically, the effect of using column or data type propagation is much greater when the same type of database is used for both the source and sink database.
We are continually looking at ways to improve this mapping across heterogeneous databases and the current type system allows us to continue to refine these mappings based on feedback.
If you find a mapping could be improved, please let us know.
====
[[jdbc-deployment]]
== Deployment
To deploy a {prodname} JDBC connector, you install the {prodname} JDBC connector archive, configure the connector, and start the connector by adding its configuration to Kafka Connect.
.Prerequisites
* link:https://zookeeper.apache.org/[Apache ZooKeeper], link:http://kafka.apache.org/[Apache Kafka], and link:{link-kafka-docs}.html#connect[Kafka Connect] are installed.
. Extract the files into your Kafka Connect environment.
. Optionally download the JDBC driver from Maven Central and extract the downloaded driver file to the directory that contains the JDBC sink connector JAR file.
+
NOTE: Drivers for Oracle and Db2 are not automatically shipped with the JDBC sink connector and must be manually installed.
The following example shows a JSON request for registering an instance of the {prodname} JDBC sink connector that consumes events from a topic called `orders` with the most common configuration settings:
For a complete list of configuration properties that you can set for the {prodname} JDBC connector, see xref:jdbc-connector-properties[JDBC connector properties].
Use this option only when you can be certain that the connector receives only events that apply to existing rows.
`upsert`:: Specifies that the connector adds events to the table using `upsert` semantics.
That is, if the primary key does not exist, the connector performs an `INSERT` operation, and if the key does exist, the connector performs an `UPDATE` operation.
When idempotent writes are required, the connector should be configured to use this option.
`record_key`:: Specifies that the primary key columns are sourced from the event's record key.
If the record key is a primitive type, the xref:jdbc-property-primary-key-fields[`primary.key.fields`] property is required to specify the name of the primary key column.
If the record key is a struct type, the xref:jdbc-property-primary-key-fields[`primary.key.fields`] property is optional, and can be used to specify a subset of columns from the event's key as the table's primary key.
`record_value`:: Specifies that the primary key columns is sourced from the event's value.
You can set the xref:jdbc-property-primary-key-fields[`primary.key.fields`] property to define the primary key as a subset of fields from the event's value; otherwise all fields are used by default.
When xref:jdbc-property-primary-key-mode[`primary.key.mode`] is set to `record_key` and the event's key is a primitive type, it is expected that this property specifies the column name to be used for the key. +
When the xref:jdbc-property-primary-key-mode[`primary.key.mode`] is set to `record_key` with a non-primitive key, or `record_value`, it is expected that this property specifies a comma-separated list of field names from either the key or value.
If the xref:jdbc-property-primary-key-mode[`primary.key.mode`] is set to `record_key` with a non-primitive key, or `record_value`, and this property is not specifies, the connector derives the primary key from all fields of either the record key or record value, depending on the specified mode.
|Specifies the schema name where the PostgreSQL PostGIS extension is installed.
The default is `public`; however, if the PostGIS extension was installed in another schema, this property should be used to specify the alternate schema name.
|Specifies the fully-qualified class name of a `ColumnNamingStrategy` implementation that the connector uses to resolve column names from event field names. +
|Specifies the fully-qualified class name of a `TableNamingStrategy` implementation that the connector uses to resolve table names from incoming event topic names. +
No, that is actually one of the differentiating factors of the {prodname} JDBC connector from other implementations.
While the connector is capable of ingesting flattened events like its competitors, it can also ingest {prodname}'s complex change event structure natively, without requiring any specific type of transformation.
The schema evolution supported by the connector is quite basic.
It simply compares the fields in the event structure to the table's column list, and then adds any fields that are not yet defined as columns in the table.
If a column's type or default value change, the connector does not adjust them in the destination database.
If a column is renamed, the old column is left as-is, and the connector appends a column with the new name to the table; however existing rows with data in the old column remain unchanged.
For details about how this type system resolves a specific field's schema definition to a JDBC type, see the xref:jdbc-data-and-type-mappings[] section.
If you want to apply a different data type mapping, define the table manually to explicitly obtain the preferred column type.
*How do you specify a prefix or a suffix to the table name without changing the Kafka topic name?*::
In order to add a prefix or a suffix to the destination table name, adjust the xref:jdbc-property-table-name-format[table.name.format] connector configuration property to apply the prefix or suffix that you want.
For example, when the xref:jdbc-property-primary-key-mode[primary.key.mode] is set to `kafka`, some databases only permit column names to begin with an underscore if the column's name is quoted.
Quoting behavior is dialect-specific, and varies among different types of database.