DBZ-978 Added last event and captured tables metrics

This commit is contained in:
Jiri Pechanec 2018-12-05 09:17:16 +01:00 committed by Gunnar Morling
parent 098c6dd28d
commit 208d6aa324
4 changed files with 39 additions and 41 deletions

View File

@ -62,7 +62,20 @@ public void start(Configuration config) {
} }
OracleConnectorConfig connectorConfig = new OracleConnectorConfig(config); OracleConnectorConfig connectorConfig = new OracleConnectorConfig(config);
taskContext = new OracleTaskContext(connectorConfig); TopicSelector<TableId> topicSelector = OracleTopicSelector.defaultSelector(connectorConfig);
SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(LOGGER);
Configuration jdbcConfig = config.subset("database.", true);
jdbcConnection = new OracleConnection(jdbcConfig, new OracleConnectionFactory());
this.schema = new OracleDatabaseSchema(connectorConfig, schemaNameAdjuster, topicSelector, jdbcConnection);
this.schema.initializeStorage();
OffsetContext previousOffset = getPreviousOffset(new OracleOffsetContext.Loader(connectorConfig.getLogicalName()));
if (previousOffset != null) {
schema.recover(previousOffset);
}
taskContext = new OracleTaskContext(connectorConfig, schema);
Clock clock = Clock.system(); Clock clock = Clock.system();
@ -75,20 +88,6 @@ public void start(Configuration config) {
.build(); .build();
errorHandler = new ErrorHandler(OracleConnector.class, connectorConfig.getLogicalName(), queue, this::cleanupResources); errorHandler = new ErrorHandler(OracleConnector.class, connectorConfig.getLogicalName(), queue, this::cleanupResources);
TopicSelector<TableId> topicSelector = OracleTopicSelector.defaultSelector(connectorConfig);
Configuration jdbcConfig = config.subset("database.", true);
jdbcConnection = new OracleConnection(jdbcConfig, new OracleConnectionFactory());
SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(LOGGER);
this.schema = new OracleDatabaseSchema(connectorConfig, schemaNameAdjuster, topicSelector, jdbcConnection);
this.schema.initializeStorage();
OffsetContext previousOffset = getPreviousOffset(new OracleOffsetContext.Loader(connectorConfig.getLogicalName()));
if (previousOffset != null) {
schema.recover(previousOffset);
}
EventDispatcher<TableId> dispatcher = new EventDispatcher<>(connectorConfig, topicSelector, schema, queue, EventDispatcher<TableId> dispatcher = new EventDispatcher<>(connectorConfig, topicSelector, schema, queue,
connectorConfig.getTableFilters().dataCollectionFilter(), DataChangeEvent::new); connectorConfig.getTableFilters().dataCollectionFilter(), DataChangeEvent::new);

View File

@ -9,7 +9,7 @@
public class OracleTaskContext extends CdcSourceTaskContext { public class OracleTaskContext extends CdcSourceTaskContext {
public OracleTaskContext(OracleConnectorConfig config) { public OracleTaskContext(OracleConnectorConfig config, OracleDatabaseSchema schema) {
super("Oracle", config.getLogicalName()); super("Oracle", config.getLogicalName(), schema::tableIds);
} }
} }

View File

@ -72,7 +72,27 @@ public void start(Configuration config) {
} }
final SqlServerConnectorConfig connectorConfig = new SqlServerConnectorConfig(config); final SqlServerConnectorConfig connectorConfig = new SqlServerConnectorConfig(config);
taskContext = new SqlServerTaskContext(connectorConfig); final TopicSelector<TableId> topicSelector = SqlServerTopicSelector.defaultSelector(connectorConfig);
final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(LOGGER);
final Configuration jdbcConfig = config.filter(x -> !(x.startsWith(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING) || x.equals(HistorizedRelationalDatabaseConnectorConfig.DATABASE_HISTORY.name())))
.subset("database.", true);
jdbcConnection = new SqlServerConnection(jdbcConfig);
try {
jdbcConnection.setAutoCommit(false);
}
catch (SQLException e) {
throw new ConnectException(e);
}
this.schema = new SqlServerDatabaseSchema(connectorConfig, schemaNameAdjuster, topicSelector, jdbcConnection);
this.schema.initializeStorage();
final OffsetContext previousOffset = getPreviousOffset(new SqlServerOffsetContext.Loader(connectorConfig.getLogicalName()));
if (previousOffset != null) {
schema.recover(previousOffset);
}
taskContext = new SqlServerTaskContext(connectorConfig, schema);
final Clock clock = Clock.system(); final Clock clock = Clock.system();
@ -85,27 +105,6 @@ public void start(Configuration config) {
.build(); .build();
errorHandler = new ErrorHandler(SqlServerConnector.class, connectorConfig.getLogicalName(), queue, this::cleanupResources); errorHandler = new ErrorHandler(SqlServerConnector.class, connectorConfig.getLogicalName(), queue, this::cleanupResources);
final TopicSelector<TableId> topicSelector = SqlServerTopicSelector.defaultSelector(connectorConfig);
final Configuration jdbcConfig = config.filter(x -> !(x.startsWith(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING) || x.equals(HistorizedRelationalDatabaseConnectorConfig.DATABASE_HISTORY.name())))
.subset("database.", true);
jdbcConnection = new SqlServerConnection(jdbcConfig);
try {
jdbcConnection.setAutoCommit(false);
}
catch (SQLException e) {
throw new ConnectException(e);
}
final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(LOGGER);
this.schema = new SqlServerDatabaseSchema(connectorConfig, schemaNameAdjuster, topicSelector, jdbcConnection);
this.schema.initializeStorage();
final OffsetContext previousOffset = getPreviousOffset(new SqlServerOffsetContext.Loader(connectorConfig.getLogicalName()));
if (previousOffset != null) {
schema.recover(previousOffset);
}
final EventDispatcher<TableId> dispatcher = new EventDispatcher<>( final EventDispatcher<TableId> dispatcher = new EventDispatcher<>(
connectorConfig, connectorConfig,

View File

@ -15,7 +15,7 @@
*/ */
public class SqlServerTaskContext extends CdcSourceTaskContext { public class SqlServerTaskContext extends CdcSourceTaskContext {
public SqlServerTaskContext(SqlServerConnectorConfig config) { public SqlServerTaskContext(SqlServerConnectorConfig config, SqlServerDatabaseSchema schema) {
super("SQL Server", config.getLogicalName()); super("SQLServer", config.getLogicalName(), schema::tableIds);
} }
} }