DBZ-7481 SnapshotterServiceProvider will check if snapshot mode class is related to the running connector.

This commit is contained in:
mfvitale 2024-02-12 16:25:32 +01:00 committed by Jiri Pechanec
parent af92291b48
commit cb5a4d7a1a
6 changed files with 29 additions and 4 deletions

View File

@ -84,6 +84,7 @@
import io.debezium.connector.postgresql.snapshot.mode.InitialOnlySnapshotter;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.converters.CloudEventsConverterTest;
import io.debezium.custom.snapshotter.CustomTestSnapshot;
import io.debezium.data.Envelope;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;

View File

@ -40,6 +40,7 @@
import io.debezium.config.Configuration.Builder;
import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode;
import io.debezium.custom.snapshotter.CustomTestSnapshot;
import io.debezium.data.Bits;
import io.debezium.data.Enum;
import io.debezium.data.Envelope;

View File

@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql;
package io.debezium.custom.snapshotter;
import java.util.List;
import java.util.Optional;
@ -12,6 +12,8 @@
import io.debezium.bean.StandardBeanNames;
import io.debezium.bean.spi.BeanRegistry;
import io.debezium.bean.spi.BeanRegistryAware;
import io.debezium.connector.postgresql.PostgresOffsetContext;
import io.debezium.connector.postgresql.PostgresPartition;
import io.debezium.connector.postgresql.snapshot.query.SelectAllSnapshotQuery;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.spi.snapshot.Snapshotter;

View File

@ -1,3 +1,3 @@
io.debezium.connector.postgresql.CustomTestSnapshot
io.debezium.custom.snapshotter.CustomTestSnapshot
io.debezium.connector.postgresql.CustomPartialTableTestSnapshot
io.debezium.connector.postgresql.CustomStartFromStreamingTestSnapshot

View File

@ -1,4 +1,4 @@
io.debezium.connector.postgresql.CustomTestSnapshot
io.debezium.custom.snapshotter.CustomTestSnapshot
io.debezium.connector.postgresql.CustomLifecycleHookTestSnapshot
io.debezium.connector.postgresql.CustomPartialTableTestSnapshot
io.debezium.connector.postgresql.CustomStartFromStreamingTestSnapshot

View File

@ -7,6 +7,7 @@
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.function.Predicate;
import java.util.stream.StreamSupport;
import io.debezium.DebeziumException;
@ -38,15 +39,19 @@ public SnapshotterService createService(Configuration configuration, ServiceRegi
final String snapshotModeCustomName = commonConnectorConfig.getSnapshotModeCustomName();
String snapshotMode;
Predicate<Snapshotter> implementationFilter;
if ("custom".equals(configuredSnapshotMode) && !snapshotModeCustomName.isEmpty()) {
snapshotMode = snapshotModeCustomName;
implementationFilter = s -> s.name().equals(snapshotMode);
}
else {
snapshotMode = configuredSnapshotMode;
implementationFilter = s -> s.name().equals(snapshotMode) &&
isForCurrentConnector(configuration, s);
}
Optional<Snapshotter> snapshotter = StreamSupport.stream(ServiceLoader.load(Snapshotter.class).spliterator(), false)
.filter(s -> s.name().equals(snapshotMode))
.filter(implementationFilter)
.findAny();
final SnapshotQuery snapshotQueryService = serviceRegistry.tryGetService(SnapshotQuery.class);
@ -57,6 +62,22 @@ public SnapshotterService createService(Configuration configuration, ServiceRegi
}
// This is required for DebeziumServer since it loads all connectors and until all modes will be moved into the core (if possible)
private boolean isForCurrentConnector(Configuration configuration, Snapshotter s) {
return s.getClass().getCanonicalName().contains(getConnectorClassPackage(configuration));
}
private String getConnectorClassPackage(Configuration config) {
try {
return Class.forName(config.getString("connector.class")).getPackageName();
}
catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
private static SnapshotterService getSnapshotterService(Configuration configuration, Snapshotter s, BeanRegistry beanRegistry, SnapshotQuery snapshotQueryService,
SnapshotLock snapshotLockService) {
s.configure(configuration.asMap());