diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcSinkConnectorConfig.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcSinkConnectorConfig.java index 0f8ccb18c..308214c97 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcSinkConnectorConfig.java +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcSinkConnectorConfig.java @@ -61,6 +61,7 @@ public class JdbcSinkConnectorConfig { public static final String COLUMN_NAMING_STRATEGY = "column.naming.strategy"; public static final String DATABASE_TIME_ZONE = "database.time_zone"; public static final String POSTGRES_POSTGIS_SCHEMA = "dialect.postgres.postgis.schema"; + public static final String SQLSERVER_IDENTITY_TABLE_NAMES = "dialect.sqlserver.identity.tables.names"; // todo add support for the ValueConverter contract @@ -241,6 +242,15 @@ public class JdbcSinkConnectorConfig { .withDefault("public") .withDescription("Name of the schema where postgis extension is installed. Default is public"); + public static final Field SQLSERVER_IDENTITY_TABLE_NAMES_FIELD = Field.create(SQLSERVER_IDENTITY_TABLE_NAMES) + .withDisplayName("Comma-separated list of indentity field including table's name where sqlserver takes care for identity inserts") + .withType(Type.STRING) + .withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 4)) + .withWidth(ConfigDef.Width.MEDIUM) + .withImportance(ConfigDef.Importance.MEDIUM) + .withDefault("") + .withDescription("A comma-separated list of table names which has identity field."); + protected static final ConfigDefinition CONFIG_DEFINITION = ConfigDefinition.editor() .connector( CONNECTION_URL_FIELD, @@ -261,7 +271,8 @@ public class JdbcSinkConnectorConfig { TABLE_NAMING_STRATEGY_FIELD, COLUMN_NAMING_STRATEGY_FIELD, DATABASE_TIME_ZONE_FIELD, - POSTGRES_POSTGIS_SCHEMA_FIELD) + POSTGRES_POSTGIS_SCHEMA_FIELD, + SQLSERVER_IDENTITY_TABLE_NAMES_FIELD) .create(); /** @@ -432,6 +443,8 @@ public String getValue() { private final String databaseTimezone; private final String postgresPostgisSchema; + private final Set sqlServerIdentityTableNames; + public JdbcSinkConnectorConfig(Map props) { config = Configuration.from(props); this.insertMode = InsertMode.parse(config.getString(INSERT_MODE)); @@ -446,6 +459,7 @@ public JdbcSinkConnectorConfig(Map props) { this.columnNamingStrategy = config.getInstance(COLUMN_NAMING_STRATEGY_FIELD, ColumnNamingStrategy.class); this.databaseTimezone = config.getString(DATABASE_TIME_ZONE_FIELD); this.postgresPostgisSchema = config.getString(POSTGRES_POSTGIS_SCHEMA_FIELD); + this.sqlServerIdentityTableNames = Strings.setOf(config.getString(SQLSERVER_IDENTITY_TABLE_NAMES_FIELD), String::new); } public void validate() { @@ -497,6 +511,10 @@ public boolean isQuoteIdentifiers() { return quoteIdentifiers; } + public Set getSqlServerIdentityTableNames() { + return sqlServerIdentityTableNames; + } + // public Set getDataTypeMapping() { // return dataTypeMapping; // } diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/GeneralDatabaseDialect.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/GeneralDatabaseDialect.java index f0dbfa309..567628c32 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/GeneralDatabaseDialect.java +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/GeneralDatabaseDialect.java @@ -318,7 +318,7 @@ public String getInsertStatement(TableDescriptor table, SinkRecordDescriptor rec builder.appendLists(", ", record.getKeyFieldNames(), record.getNonKeyFieldNames(), (name) -> columnQueryBindingFromField(name, table, record)); - builder.append(")"); + builder.append(");"); return builder.build(); } diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/sqlserver/SqlServerDatabaseDialect.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/sqlserver/SqlServerDatabaseDialect.java index 80a0dfb42..fc15b6c6d 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/sqlserver/SqlServerDatabaseDialect.java +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/dialect/sqlserver/SqlServerDatabaseDialect.java @@ -47,6 +47,27 @@ private SqlServerDatabaseDialect(JdbcSinkConnectorConfig config, SessionFactory super(config, sessionFactory); } + @Override + public String getInsertStatement(TableDescriptor table, SinkRecordDescriptor record) { + String insertStatement = super.getInsertStatement(table, record); + return wrapWithIdentityInsert(table, insertStatement); + } + + private String wrapWithIdentityInsert(TableDescriptor table, String sqlStatement) { + boolean contains = getConfig().getSqlServerIdentityTableNames().contains(table.getId().getTableName()); + if (!contains) { + return sqlStatement; + } + + String qualifiedTableName = getQualifiedTableName(table.getId()); + return new StringBuilder() + .append("SET IDENTITY_INSERT ").append(qualifiedTableName).append(" ON ;") + .append(sqlStatement) + .append("SET IDENTITY_INSERT ").append(qualifiedTableName).append(" OFF ;") + .toString(); + + } + @Override protected Optional getDatabaseTimeZoneQuery() { return Optional.of("SELECT CURRENT_TIMEZONE()"); @@ -111,7 +132,7 @@ public String getUpsertStatement(TableDescriptor table, SinkRecordDescriptor rec builder.append(")"); builder.append(";"); // SQL server requires this to be terminated this way. - return builder.build(); + return wrapWithIdentityInsert(table, builder.build()); } @Override diff --git a/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/JdbcSinkConnectorConfigTest.java b/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/JdbcSinkConnectorConfigTest.java index 6ae9d9b5f..0e686f322 100644 --- a/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/JdbcSinkConnectorConfigTest.java +++ b/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/JdbcSinkConnectorConfigTest.java @@ -103,6 +103,16 @@ public void testNonDefaultPrimaryKeyFieldsProperty() { assertThat(config.getPrimaryKeyFields()).contains("id", "name"); } + @Test + public void testNonDefaultSqlSelverIdentityTableNamesProperty() { + final Map properties = new HashMap<>(); + properties.put(JdbcSinkConnectorConfig.SQLSERVER_IDENTITY_TABLE_NAMES, "tableA,tableB"); + + final JdbcSinkConnectorConfig config = new JdbcSinkConnectorConfig(properties); + assertThat(config.validateAndRecord(List.of(JdbcSinkConnectorConfig.SQLSERVER_IDENTITY_TABLE_NAMES_FIELD), LOGGER::error)).isTrue(); + assertThat(config.getSqlServerIdentityTableNames()).contains("tableA", "tableB"); + } + // @Test // public void testNonDefaultSchemaEvolutionProperty() { // final Map properties = new HashMap<>();