DBZ-6801
added new Configuration as SQLSERVER_IDENTITY_TABLE_NAMES added new method wrapWithIdentityInsert for SqlServerDatabaseDialect if the configured identity tables name exist in the list
This commit is contained in:
parent
9edfd05b9e
commit
64dea22276
@ -61,6 +61,7 @@ public class JdbcSinkConnectorConfig {
|
||||
public static final String COLUMN_NAMING_STRATEGY = "column.naming.strategy";
|
||||
public static final String DATABASE_TIME_ZONE = "database.time_zone";
|
||||
public static final String POSTGRES_POSTGIS_SCHEMA = "dialect.postgres.postgis.schema";
|
||||
public static final String SQLSERVER_IDENTITY_TABLE_NAMES = "dialect.sqlserver.identity.tables.names";
|
||||
|
||||
// todo add support for the ValueConverter contract
|
||||
|
||||
@ -241,6 +242,15 @@ public class JdbcSinkConnectorConfig {
|
||||
.withDefault("public")
|
||||
.withDescription("Name of the schema where postgis extension is installed. Default is public");
|
||||
|
||||
public static final Field SQLSERVER_IDENTITY_TABLE_NAMES_FIELD = Field.create(SQLSERVER_IDENTITY_TABLE_NAMES)
|
||||
.withDisplayName("Comma-separated list of indentity field including table's name where sqlserver takes care for identity inserts")
|
||||
.withType(Type.STRING)
|
||||
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 4))
|
||||
.withWidth(ConfigDef.Width.MEDIUM)
|
||||
.withImportance(ConfigDef.Importance.MEDIUM)
|
||||
.withDefault("")
|
||||
.withDescription("A comma-separated list of table names which has identity field.");
|
||||
|
||||
protected static final ConfigDefinition CONFIG_DEFINITION = ConfigDefinition.editor()
|
||||
.connector(
|
||||
CONNECTION_URL_FIELD,
|
||||
@ -261,7 +271,8 @@ public class JdbcSinkConnectorConfig {
|
||||
TABLE_NAMING_STRATEGY_FIELD,
|
||||
COLUMN_NAMING_STRATEGY_FIELD,
|
||||
DATABASE_TIME_ZONE_FIELD,
|
||||
POSTGRES_POSTGIS_SCHEMA_FIELD)
|
||||
POSTGRES_POSTGIS_SCHEMA_FIELD,
|
||||
SQLSERVER_IDENTITY_TABLE_NAMES_FIELD)
|
||||
.create();
|
||||
|
||||
/**
|
||||
@ -432,6 +443,8 @@ public String getValue() {
|
||||
private final String databaseTimezone;
|
||||
private final String postgresPostgisSchema;
|
||||
|
||||
private final Set<String> sqlServerIdentityTableNames;
|
||||
|
||||
public JdbcSinkConnectorConfig(Map<String, String> props) {
|
||||
config = Configuration.from(props);
|
||||
this.insertMode = InsertMode.parse(config.getString(INSERT_MODE));
|
||||
@ -446,6 +459,7 @@ public JdbcSinkConnectorConfig(Map<String, String> props) {
|
||||
this.columnNamingStrategy = config.getInstance(COLUMN_NAMING_STRATEGY_FIELD, ColumnNamingStrategy.class);
|
||||
this.databaseTimezone = config.getString(DATABASE_TIME_ZONE_FIELD);
|
||||
this.postgresPostgisSchema = config.getString(POSTGRES_POSTGIS_SCHEMA_FIELD);
|
||||
this.sqlServerIdentityTableNames = Strings.setOf(config.getString(SQLSERVER_IDENTITY_TABLE_NAMES_FIELD), String::new);
|
||||
}
|
||||
|
||||
public void validate() {
|
||||
@ -497,6 +511,10 @@ public boolean isQuoteIdentifiers() {
|
||||
return quoteIdentifiers;
|
||||
}
|
||||
|
||||
public Set<String> getSqlServerIdentityTableNames() {
|
||||
return sqlServerIdentityTableNames;
|
||||
}
|
||||
|
||||
// public Set<String> getDataTypeMapping() {
|
||||
// return dataTypeMapping;
|
||||
// }
|
||||
|
@ -318,7 +318,7 @@ public String getInsertStatement(TableDescriptor table, SinkRecordDescriptor rec
|
||||
|
||||
builder.appendLists(", ", record.getKeyFieldNames(), record.getNonKeyFieldNames(), (name) -> columnQueryBindingFromField(name, table, record));
|
||||
|
||||
builder.append(")");
|
||||
builder.append(");");
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
|
@ -47,6 +47,27 @@ private SqlServerDatabaseDialect(JdbcSinkConnectorConfig config, SessionFactory
|
||||
super(config, sessionFactory);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getInsertStatement(TableDescriptor table, SinkRecordDescriptor record) {
|
||||
String insertStatement = super.getInsertStatement(table, record);
|
||||
return wrapWithIdentityInsert(table, insertStatement);
|
||||
}
|
||||
|
||||
private String wrapWithIdentityInsert(TableDescriptor table, String sqlStatement) {
|
||||
boolean contains = getConfig().getSqlServerIdentityTableNames().contains(table.getId().getTableName());
|
||||
if (!contains) {
|
||||
return sqlStatement;
|
||||
}
|
||||
|
||||
String qualifiedTableName = getQualifiedTableName(table.getId());
|
||||
return new StringBuilder()
|
||||
.append("SET IDENTITY_INSERT ").append(qualifiedTableName).append(" ON ;")
|
||||
.append(sqlStatement)
|
||||
.append("SET IDENTITY_INSERT ").append(qualifiedTableName).append(" OFF ;")
|
||||
.toString();
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Optional<String> getDatabaseTimeZoneQuery() {
|
||||
return Optional.of("SELECT CURRENT_TIMEZONE()");
|
||||
@ -111,7 +132,7 @@ public String getUpsertStatement(TableDescriptor table, SinkRecordDescriptor rec
|
||||
builder.append(")");
|
||||
builder.append(";"); // SQL server requires this to be terminated this way.
|
||||
|
||||
return builder.build();
|
||||
return wrapWithIdentityInsert(table, builder.build());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -103,6 +103,16 @@ public void testNonDefaultPrimaryKeyFieldsProperty() {
|
||||
assertThat(config.getPrimaryKeyFields()).contains("id", "name");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNonDefaultSqlSelverIdentityTableNamesProperty() {
|
||||
final Map<String, String> properties = new HashMap<>();
|
||||
properties.put(JdbcSinkConnectorConfig.SQLSERVER_IDENTITY_TABLE_NAMES, "tableA,tableB");
|
||||
|
||||
final JdbcSinkConnectorConfig config = new JdbcSinkConnectorConfig(properties);
|
||||
assertThat(config.validateAndRecord(List.of(JdbcSinkConnectorConfig.SQLSERVER_IDENTITY_TABLE_NAMES_FIELD), LOGGER::error)).isTrue();
|
||||
assertThat(config.getSqlServerIdentityTableNames()).contains("tableA", "tableB");
|
||||
}
|
||||
|
||||
// @Test
|
||||
// public void testNonDefaultSchemaEvolutionProperty() {
|
||||
// final Map<String, String> properties = new HashMap<>();
|
||||
|
Loading…
Reference in New Issue
Block a user