DBZ-655 Use cache to avoid using regexp replace

This commit is contained in:
Maciej Bryński 2018-03-13 21:50:42 +01:00 committed by Gunnar Morling
parent 6da83dc8c7
commit d5abb8415c

View File

@ -109,6 +109,8 @@ public class ByLogicalTableRouter<R extends ConnectRecord<R>> implements Transfo
private String keyFieldName;
private final Cache<Schema, Schema> keySchemaUpdateCache = new SynchronizedCache<>(new LRUCache<Schema, Schema>(16));
private final Cache<Schema, Schema> envelopeSchemaUpdateCache = new SynchronizedCache<>(new LRUCache<Schema, Schema>(16));
private final Cache<String, String> keyRegexReplaceCache = new SynchronizedCache<>(new LRUCache<String, String>(16));
private final Cache<String, String> topicRegexReplaceCache = new SynchronizedCache<>(new LRUCache<String, String>(16));
/**
* If KEY_FIELD_REGEX has a value that is really a regex, then the KEY_FIELD_REPLACEMENT must be a non-empty value.
@ -235,11 +237,18 @@ public ConfigDef config() {
* @return return the new topic name, if the regex applies. Otherwise, return null.
*/
private String determineNewTopic(String oldTopic) {
final Matcher matcher = topicRegex.matcher(oldTopic);
if (matcher.matches()) {
return matcher.replaceFirst(topicReplacement);
String newTopic = topicRegexReplaceCache.get(oldTopic);
if (newTopic != null) {
return newTopic;
} else {
final Matcher matcher = topicRegex.matcher(oldTopic);
if (matcher.matches()) {
newTopic = matcher.replaceFirst(topicReplacement);
topicRegexReplaceCache.put(oldTopic, newTopic);
return newTopic;
}
return null;
}
return null;
}
private Schema updateKeySchema(Schema oldKeySchema, String newTopicName) {
@ -269,9 +278,15 @@ private Struct updateKey(Schema newKeySchema, Struct oldKey, String oldTopic) {
String physicalTableIdentifier = oldTopic;
if (keyFieldRegex != null) {
final Matcher matcher = keyFieldRegex.matcher(oldTopic);
if (matcher.matches()) {
physicalTableIdentifier = matcher.replaceFirst(keyFieldReplacement);
physicalTableIdentifier = keyRegexReplaceCache.get(oldTopic);
if (physicalTableIdentifier == null) {
final Matcher matcher = keyFieldRegex.matcher(oldTopic);
if (matcher.matches()) {
physicalTableIdentifier = matcher.replaceFirst(keyFieldReplacement);
keyRegexReplaceCache.put(oldTopic, physicalTableIdentifier);
} else {
physicalTableIdentifier = oldTopic;
}
}
}