DBZ-8151 Allow record_value with no primary.key.fields

This commit is contained in:
Chris Cranford 2024-08-16 06:08:00 -04:00 committed by Jiri Pechanec
parent b77f0663b8
commit 40114d1219
2 changed files with 15 additions and 7 deletions

View File

@ -689,13 +689,6 @@ else if (!config.hasKey(PRIMARY_KEY_MODE)) {
LOGGER.error("When using UPSERT, please define '{}'.", PRIMARY_KEY_MODE);
return 1;
}
else if (!config.hasKey(PRIMARY_KEY_FIELDS)) {
final PrimaryKeyMode primaryKeyMode = PrimaryKeyMode.parse(config.getString(PRIMARY_KEY_MODE_FIELD));
if (PrimaryKeyMode.RECORD_VALUE.equals(primaryKeyMode)) {
LOGGER.error("When using UPSERT, please define '{}'.", PRIMARY_KEY_FIELDS);
return 1;
}
}
}
return 0;
}

View File

@ -134,6 +134,21 @@ public void testOverrideHibernateConfigurationProperties() {
assertThat(ormProperties.get(AvailableSettings.PASS)).isEqualTo("pass");
}
@Test
@FixFor("DBZ-8151")
public void testPrimaryKeyRecordValueDoesNotRequirePrimaryKeyFields() {
final Map<String, String> properties = new HashMap<>();
properties.put(JdbcSinkConnectorConfig.CONNECTION_PROVIDER, "io.debezium.AcmeConnectionProvider");
properties.put(JdbcSinkConnectorConfig.CONNECTION_URL, "jdbc://url");
properties.put(JdbcSinkConnectorConfig.CONNECTION_USER, "user");
properties.put(JdbcSinkConnectorConfig.CONNECTION_PASSWORD, "pass");
properties.put(JdbcSinkConnectorConfig.INSERT_MODE, "upsert");
properties.put(JdbcSinkConnectorConfig.PRIMARY_KEY_MODE, "record_value");
final JdbcSinkConnectorConfig config = new JdbcSinkConnectorConfig(properties);
assertThat(config.validateAndRecord(List.of(JdbcSinkConnectorConfig.INSERT_MODE_FIELD), LOGGER::error)).isTrue();
}
// @Test
// public void testNonDefaultSchemaEvolutionProperty() {
// final Map<String, String> properties = new HashMap<>();