From 1683f776bd45fc69299a6268aeba3e8542d8a992 Mon Sep 17 00:00:00 2001 From: Damian Mooyman Date: Thu, 16 Jul 2015 18:18:04 +1200 Subject: [PATCH] API QueuedJob support for Solr_Reindex API Incremental clear and re-index of records rather than clearing all records from SolrIndex up front --- _config.php | 2 + _config/processor.yml | 8 + code/search/SearchVariant.php | 8 + .../SearchVariantSiteTreeSubsitesPolyhome.php | 2 +- code/search/SearchVariantVersioned.php | 2 +- code/solr/Solr.php | 286 ++++++---- code/solr/SolrIndex.php | 246 +++++--- .../solr/reindex/handlers/SolrReindexBase.php | 230 ++++++++ .../reindex/handlers/SolrReindexHandler.php | 42 ++ .../handlers/SolrReindexImmediateHandler.php | 74 +++ .../handlers/SolrReindexMessageHandler.php | 40 ++ .../handlers/SolrReindexQueuedHandler.php | 94 ++++ .../jobs/SolrReindexGroupQueuedJob.php | 117 ++++ .../reindex/jobs/SolrReindexQueuedJob.php | 91 +++ .../reindex/jobs/SolrReindexQueuedJobBase.php | 123 ++++ code/utils/logging/MonologFactory.php | 98 ++++ code/utils/logging/QueuedJobLogHandler.php | 53 ++ code/utils/logging/SearchLogFactory.php | 23 + composer.json | 3 +- docs/en/Solr.md | 21 +- tests/SolrReindexQueuedTest.php | 219 ++++++++ tests/SolrReindexTest.php | 531 ++++++++++++++++++ 22 files changed, 2109 insertions(+), 204 deletions(-) create mode 100644 code/solr/reindex/handlers/SolrReindexBase.php create mode 100644 code/solr/reindex/handlers/SolrReindexHandler.php create mode 100644 code/solr/reindex/handlers/SolrReindexImmediateHandler.php create mode 100644 code/solr/reindex/handlers/SolrReindexMessageHandler.php create mode 100644 code/solr/reindex/handlers/SolrReindexQueuedHandler.php create mode 100644 code/solr/reindex/jobs/SolrReindexGroupQueuedJob.php create mode 100644 code/solr/reindex/jobs/SolrReindexQueuedJob.php create mode 100644 code/solr/reindex/jobs/SolrReindexQueuedJobBase.php create mode 100644 code/utils/logging/MonologFactory.php create mode 100644 code/utils/logging/QueuedJobLogHandler.php create mode 100644 code/utils/logging/SearchLogFactory.php create mode 100644 tests/SolrReindexQueuedTest.php create mode 100644 tests/SolrReindexTest.php diff --git a/_config.php b/_config.php index 3a5121a..c9e422a 100644 --- a/_config.php +++ b/_config.php @@ -2,3 +2,5 @@ global $databaseConfig; if (isset($databaseConfig['type'])) SearchUpdater::bind_manipulation_capture(); + +Deprecation::notification_version('1.0.0', 'fulltextsearch'); diff --git a/_config/processor.yml b/_config/processor.yml index 44902d0..430ec5e 100644 --- a/_config/processor.yml +++ b/_config/processor.yml @@ -4,6 +4,10 @@ Name: defaultprocessor Injector: SearchUpdateProcessor: class: SearchUpdateImmediateProcessor + SolrReindexHandler: + class: SolrReindexImmediateHandler + SearchLogFactory: + class: 'MonologFactory' --- Name: messagequeueprocessor Only: @@ -14,6 +18,8 @@ Except: Injector: SearchUpdateProcessor: class: SearchUpdateMessageQueueProcessor + SolrReindexHandler: + class: SolrReindexMessageHandler --- Name: queuedjobprocessor Only: @@ -24,3 +30,5 @@ Except: Injector: SearchUpdateProcessor: class: SearchUpdateQueuedJobProcessor + SolrReindexHandler: + class: SolrReindexQueuedHandler diff --git a/code/search/SearchVariant.php b/code/search/SearchVariant.php index 425cf82..e7e7436 100644 --- a/code/search/SearchVariant.php +++ b/code/search/SearchVariant.php @@ -39,6 +39,14 @@ abstract class SearchVariant { */ abstract function activateState($state); + /** + * Apply this variant to a search query + * + * @param SearchQuery $query + * @param SearchIndex $index + */ + abstract public function alterQuery($query, $index); + /*** OVERRIDES end here*/ /** Holds a cache of all variants */ diff --git a/code/search/SearchVariantSiteTreeSubsitesPolyhome.php b/code/search/SearchVariantSiteTreeSubsitesPolyhome.php index 1171221..480bfa2 100644 --- a/code/search/SearchVariantSiteTreeSubsitesPolyhome.php +++ b/code/search/SearchVariantSiteTreeSubsitesPolyhome.php @@ -48,7 +48,7 @@ class SearchVariantSiteTreeSubsitesPolyhome extends SearchVariant { ); } - function alterQuery($query, $index) { + public function alterQuery($query, $index) { $subsite = Subsite::currentSubsiteID(); $query->filter('_subsite', array($subsite, SearchQuery::$missing)); } diff --git a/code/search/SearchVariantVersioned.php b/code/search/SearchVariantVersioned.php index d197f33..e160f55 100644 --- a/code/search/SearchVariantVersioned.php +++ b/code/search/SearchVariantVersioned.php @@ -28,7 +28,7 @@ class SearchVariantVersioned extends SearchVariant { ); } - function alterQuery($query) { + public function alterQuery($query, $index) { $stage = Versioned::current_stage(); $query->filter('_versionedstage', array($stage, SearchQuery::$missing)); } diff --git a/code/solr/Solr.php b/code/solr/Solr.php index 53f0530..8363e42 100644 --- a/code/solr/Solr.php +++ b/code/solr/Solr.php @@ -1,5 +1,10 @@ logger; + } + + /** + * Assign a new logger + * + * @param LoggerInterface $logger + */ + public function setLogger(LoggerInterface $logger) { + $this->logger = $logger; + } + + /** + * @return SearchLogFactory + */ + protected function getLoggerFactory() { + return Injector::inst()->get('SearchLogFactory'); + } + + /** + * Setup task + * + * @param SS_HTTPReqest $request + */ + public function run($request) { + $name = get_class($this); + $verbose = $request->getVar('verbose'); + + // Set new logger + $logger = $this + ->getLoggerFactory() + ->getOutputLogger($name, $verbose); + $this->setLogger($logger); + } +} + + +class Solr_Configure extends Solr_BuildTask { + + protected $enabled = true; public function run($request) { + parent::run($request); + // Find the IndexStore handler, which will handle uploading config files to Solr $store = $this->getSolrConfigStore(); $indexes = Solr::get_indexes(); @@ -158,7 +223,9 @@ class Solr_Configure extends BuildTask { $this->updateIndex($instance, $store); } catch(Exception $e) { // We got an exception. Warn, but continue to next index. - $this->log("Failure: " . $e->getMessage()); + $this + ->getLogger() + ->error("Failure: " . $e->getMessage()); } } } @@ -171,26 +238,25 @@ class Solr_Configure extends BuildTask { */ protected function updateIndex($instance, $store) { $index = $instance->getIndexName(); - $this->log("Configuring $index."); - $this->log("Uploading configuration ... "); - + $this->getLogger()->info("Configuring $index."); // Upload the config files for this index + $this->getLogger()->info("Uploading configuration ..."); $instance->uploadConfig($store); // Then tell Solr to use those config files $service = Solr::service(); if ($service->coreIsActive($index)) { - $this->log("Reloading core ..."); + $this->getLogger()->info("Reloading core ..."); $service->coreReload($index); } else { - $this->log("Creating core ..."); + $this->getLogger()->info("Creating core ..."); $service->coreCreate($index, $store->instanceDir($index)); } - $this->log("Done"); + $this->getLogger()->info("Done"); } - + /** * Get config store * @@ -217,19 +283,26 @@ class Solr_Configure extends BuildTask { } } - - protected function log($message) { - if(Director::is_cli()) { - echo $message . "\n"; - } else { - echo Convert::raw2xml($message) . "
"; - } - flush(); - } } +/** + * Task used for both initiating a new reindex, as well as for processing incremental batches + * within a reindex. + * + * When running a complete reindex you can provide any of the following + * - class (to limit to a single class) + * - verbose (optional) + * + * When running with a single batch, provide the following querystring arguments: + * - start + * - index + * - class + * - variantstate + * - verbose (optional) + */ +class Solr_Reindex extends Solr_BuildTask { -class Solr_Reindex extends BuildTask { + protected $enabled = true; /** * Number of records to load and index per request @@ -239,117 +312,94 @@ class Solr_Reindex extends BuildTask { */ private static $recordsPerRequest = 200; - public function run($request) { - increase_time_limit_to(); - $self = get_class($this); - $verbose = isset($_GET['verbose']); - - $originalState = SearchVariant::current_state(); - - if (isset($_GET['start'])) { - $this->runFrom(singleton($_GET['index']), $_GET['class'], $_GET['start'], json_decode($_GET['variantstate'], true)); - } - else { - foreach(array('framework','sapphire') as $dirname) { - $script = sprintf("%s%s$dirname%scli-script.php", BASE_PATH, DIRECTORY_SEPARATOR, DIRECTORY_SEPARATOR); - if(file_exists($script)) { - break; - } - } - $class = get_class($this); - - foreach (Solr::get_indexes() as $index => $instance) { - echo "Rebuilding {$instance->getIndexName()}\n\n"; - - $classes = $instance->getClasses(); - if($request->getVar('class')) { - $limitClasses = explode(',', $request->getVar('class')); - $classes = array_intersect_key($classes, array_combine($limitClasses, $limitClasses)); - } - - if($classes) { - Solr::service($index)->deleteByQuery('ClassHierarchy:(' . implode(' OR ', array_keys($classes)) . ')'); - } - - foreach ($classes as $class => $options) { - $includeSubclasses = $options['include_children']; - - foreach (SearchVariant::reindex_states($class, $includeSubclasses) as $state) { - if ($instance->variantStateExcluded($state)) continue; - - SearchVariant::activate_state($state); - - $filter = $includeSubclasses ? "" : '"ClassName" = \''.$class."'"; - $singleton = singleton($class); - $query = $singleton->get($class,$filter,null); - $dtaQuery = $query->dataQuery(); - $sqlQuery = $dtaQuery->getFinalisedQuery(); - $singleton->extend('augmentSQL',$sqlQuery,$dtaQuery); - $total = $query->count(); - - $statevar = json_encode($state); - echo "Class: $class, total: $total"; - echo ($statevar) ? " in state $statevar\n" : "\n"; - - if (strpos(PHP_OS, "WIN") !== false) $statevar = '"'.str_replace('"', '\\"', $statevar).'"'; - else $statevar = "'".$statevar."'"; - - for ($offset = 0; $offset < $total; $offset += $this->stat('recordsPerRequest')) { - echo "$offset.."; - - $cmd = "php $script dev/tasks/$self index=$index class=$class start=$offset variantstate=$statevar"; - - if($verbose) { - echo "\n Running '$cmd'\n"; - $cmd .= " verbose=1 2>&1"; - } - - $res = $verbose ? passthru($cmd) : `$cmd`; - if($verbose) echo " ".preg_replace('/\r\n|\n/', '$0 ', $res)."\n"; - - // If we're in dev mode, commit more often for fun and profit - if (Director::isDev()) Solr::service($index)->commit(); - - // This will slow down things a tiny bit, but it is done so that we don't timeout to the database during a reindex - DB::query('SELECT 1'); - } - - echo "\n"; - } - } - - Solr::service($index)->commit(); - } - } - - $originalState = SearchVariant::current_state(); + /** + * Get the reindex handler + * + * @return SolrReindexHandler + */ + protected function getHandler() { + return Injector::inst()->get('SolrReindexHandler'); } - protected function runFrom($index, $class, $start, $variantstate) { - $classes = $index->getClasses(); - $options = $classes[$class]; - $verbose = isset($_GET['verbose']); + /** + * @param SS_HTTPRequest $request + */ + public function run($request) { + parent::run($request); + + // Reset state + $originalState = SearchVariant::current_state(); + $this->doReindex($request); + SearchVariant::activate_state($originalState); + } + /** + * @param SS_HTTPRequest $request + */ + protected function doReindex($request) { + $class = $request->getVar('class'); + + // Deprecated reindex mechanism + $start = $request->getVar('start'); + if ($start !== null) { + // Run single batch directly + $indexInstance = singleton($request->getVar('index')); + $state = json_decode($request->getVar('variantstate'), true); + $this->runFrom($indexInstance, $class, $start, $state); + return; + } + + // Check if we are re-indexing a single group + // If not using queuedjobs, we need to invoke Solr_Reindex as a separate process + // Otherwise each group is processed via a SolrReindexGroupJob + $groups = $request->getVar('groups'); + $handler = $this->getHandler(); + if($groups) { + // Run grouped batches (id % groups = group) + $group = $request->getVar('group'); + $indexInstance = singleton($request->getVar('index')); + $state = json_decode($request->getVar('variantstate'), true); + + $handler->runGroup($this->getLogger(), $indexInstance, $state, $class, $groups, $group); + return; + } + + // If run at the top level, delegate to appropriate handler + $self = get_class($this); + $handler->triggerReindex($this->getLogger(), $this->config()->recordsPerRequest, $self, $class); + } + + /** + * @deprecated since version 2.0.0 + */ + protected function runFrom($index, $class, $start, $variantstate) { + DeprecationTest_Deprecation::notice('2.0.0', 'Solr_Reindex now uses a new grouping mechanism'); + + // Set time limit and state + increase_time_limit_to(); SearchVariant::activate_state($variantstate); - $includeSubclasses = $options['include_children']; - $filter = $includeSubclasses ? "" : '"ClassName" = \''.$class."'"; - + // Generate filtered list $items = DataList::create($class) - ->where($filter) - ->limit($this->stat('recordsPerRequest'), $start); + ->limit($this->config()->recordsPerRequest, $start); - if($verbose) echo "Adding $class"; - foreach ($items as $item) { - if($verbose) echo $item->ID . ' '; + // Add child filter + $classes = $index->getClasses(); + $options = $classes[$class]; + if(!$options['include_children']) { + $items = $items->filter('ClassName', $class); + } + + // Process selected records in this class + $this->getLogger()->info("Adding $class"); + foreach ($items->sort("ID") as $item) { + $this->getLogger()->debug($item->ID); // See SearchUpdater_ObjectHandler::triggerReindex $item->triggerReindex(); - $item->destroy(); } - if($verbose) echo "Done "; + $this->getLogger()->info("Done"); } - } diff --git a/code/solr/SolrIndex.php b/code/solr/SolrIndex.php index 0a72114..24728a6 100644 --- a/code/solr/SolrIndex.php +++ b/code/solr/SolrIndex.php @@ -511,6 +511,39 @@ abstract class SolrIndex extends SearchIndex { } } + /** + * Clear all records which do not match the given classname whitelist. + * + * Can also be used to trim an index when reducing to a narrower set of classes. + * + * Ignores current state / variant. + * + * @param array $classes List of non-obsolete classes in the same format as SolrIndex::getClasses() + * @return bool Flag if successful + */ + public function clearObsoleteClasses($classes) { + if(empty($classes)) { + return false; + } + + // Delete all records which do not match the necessary classname rules + $conditions = array(); + foreach ($classes as $class => $options) { + if ($options['include_children']) { + $conditions[] = "ClassHierarchy:{$class}"; + } else { + $conditions[] = "ClassName:{$class}"; + } + } + + // Delete records which don't match any of these conditions in this index + $deleteQuery = "-(" . implode(' ', $conditions) . ")"; + $this + ->getService() + ->deleteByQuery($deleteQuery); + return true; + } + function commit() { try { $this->getService()->commit(false, false, false); @@ -543,32 +576,8 @@ abstract class SolrIndex extends SearchIndex { $hlq = array(); // Highlight query // Build the search itself + $q = $this->getQueryComponent($query, $hlq); - foreach ($query->search as $search) { - $text = $search['text']; - preg_match_all('/"[^"]*"|\S+/', $text, $parts); - - $fuzzy = $search['fuzzy'] ? '~' : ''; - - foreach ($parts[0] as $part) { - $fields = (isset($search['fields'])) ? $search['fields'] : array(); - if(isset($search['boost'])) { - $fields = array_merge($fields, array_keys($search['boost'])); - } - if ($fields) { - $searchq = array(); - foreach ($fields as $field) { - $boost = (isset($search['boost'][$field])) ? '^' . $search['boost'][$field] : ''; - $searchq[] = "{$field}:".$part.$fuzzy.$boost; - } - $q[] = '+('.implode(' OR ', $searchq).')'; - } - else { - $q[] = '+'.$part.$fuzzy; - } - $hlq[] = $part; - } - } // If using boosting, set the clean term separately for highlighting. // See https://issues.apache.org/jira/browse/SOLR-2632 if(array_key_exists('hl', $params) && !array_key_exists('hl.q', $params)) { @@ -576,64 +585,17 @@ abstract class SolrIndex extends SearchIndex { } // Filter by class if requested - $classq = array(); - foreach ($query->classes as $class) { - if (!empty($class['includeSubclasses'])) $classq[] = 'ClassHierarchy:'.$class['class']; + if (!empty($class['includeSubclasses'])) { + $classq[] = 'ClassHierarchy:'.$class['class']; + } else $classq[] = 'ClassName:'.$class['class']; } - if ($classq) $fq[] = '+('.implode(' ', $classq).')'; - + // Filter by filters - - foreach ($query->require as $field => $values) { - $requireq = array(); - - foreach ($values as $value) { - if ($value === SearchQuery::$missing) { - $requireq[] = "(*:* -{$field}:[* TO *])"; - } - else if ($value === SearchQuery::$present) { - $requireq[] = "{$field}:[* TO *]"; - } - else if ($value instanceof SearchQuery_Range) { - $start = $value->start; if ($start === null) $start = '*'; - $end = $value->end; if ($end === null) $end = '*'; - $requireq[] = "$field:[$start TO $end]"; - } - else { - $requireq[] = $field.':"'.$value.'"'; - } - } - - $fq[] = '+('.implode(' ', $requireq).')'; - } - - foreach ($query->exclude as $field => $values) { - $excludeq = array(); - $missing = false; - - foreach ($values as $value) { - if ($value === SearchQuery::$missing) { - $missing = true; - } - else if ($value === SearchQuery::$present) { - $excludeq[] = "{$field}:[* TO *]"; - } - else if ($value instanceof SearchQuery_Range) { - $start = $value->start; if ($start === null) $start = '*'; - $end = $value->end; if ($end === null) $end = '*'; - $excludeq[] = "$field:[$start TO $end]"; - } - else { - $excludeq[] = $field.':"'.$value.'"'; - } - } - - $fq[] = ($missing ? "+{$field}:[* TO *] " : '') . '-('.implode(' ', $excludeq).')'; - } + $fq = array_merge($fq, $this->getFiltersComponent($query)); // Prepare query fields unless specified explicitly if(isset($params['qf'])) { @@ -739,6 +701,136 @@ abstract class SolrIndex extends SearchIndex { return new ArrayData($ret); } + + /** + * Get the query (q) component for this search + * + * @param SearchQuery $searchQuery + * @param array &$hlq Highlight query returned by reference + * @return array + */ + protected function getQueryComponent(SearchQuery $searchQuery, &$hlq = array()) { + $q = array(); + foreach ($searchQuery->search as $search) { + $text = $search['text']; + preg_match_all('/"[^"]*"|\S+/', $text, $parts); + + $fuzzy = $search['fuzzy'] ? '~' : ''; + + foreach ($parts[0] as $part) { + $fields = (isset($search['fields'])) ? $search['fields'] : array(); + if(isset($search['boost'])) { + $fields = array_merge($fields, array_keys($search['boost'])); + } + if ($fields) { + $searchq = array(); + foreach ($fields as $field) { + $boost = (isset($search['boost'][$field])) ? '^' . $search['boost'][$field] : ''; + $searchq[] = "{$field}:".$part.$fuzzy.$boost; + } + $q[] = '+('.implode(' OR ', $searchq).')'; + } + else { + $q[] = '+'.$part.$fuzzy; + } + $hlq[] = $part; + } + } + return $q; + } + + /** + * Parse all require constraints for inclusion in a filter query + * + * @param SearchQuery $searchQuery + * @return array List of parsed string values for each require + */ + protected function getRequireFiltersComponent(SearchQuery $searchQuery) { + $fq = array(); + foreach ($searchQuery->require as $field => $values) { + $requireq = array(); + + foreach ($values as $value) { + if ($value === SearchQuery::$missing) { + $requireq[] = "(*:* -{$field}:[* TO *])"; + } + else if ($value === SearchQuery::$present) { + $requireq[] = "{$field}:[* TO *]"; + } + else if ($value instanceof SearchQuery_Range) { + $start = $value->start; + if ($start === null) { + $start = '*'; + } + $end = $value->end; + if ($end === null) { + $end = '*'; + } + $requireq[] = "$field:[$start TO $end]"; + } + else { + $requireq[] = $field.':"'.$value.'"'; + } + } + + $fq[] = '+('.implode(' ', $requireq).')'; + } + return $fq; + } + + /** + * Parse all exclude constraints for inclusion in a filter query + * + * @param SearchQuery $searchQuery + * @return array List of parsed string values for each exclusion + */ + protected function getExcludeFiltersComponent(SearchQuery $searchQuery) { + $fq = array(); + foreach ($searchQuery->exclude as $field => $values) { + $excludeq = array(); + $missing = false; + + foreach ($values as $value) { + if ($value === SearchQuery::$missing) { + $missing = true; + } + else if ($value === SearchQuery::$present) { + $excludeq[] = "{$field}:[* TO *]"; + } + else if ($value instanceof SearchQuery_Range) { + $start = $value->start; + if ($start === null) { + $start = '*'; + } + $end = $value->end; + if ($end === null) { + $end = '*'; + } + $excludeq[] = "$field:[$start TO $end]"; + } + else { + $excludeq[] = $field.':"'.$value.'"'; + } + } + + $fq[] = ($missing ? "+{$field}:[* TO *] " : '') . '-('.implode(' ', $excludeq).')'; + } + return $fq; + } + + /** + * Get all filter conditions for this search + * + * @param SearchQuery $searchQuery + * @return array + */ + public function getFiltersComponent(SearchQuery $searchQuery) { + return array_merge( + $this->getRequireFiltersComponent($searchQuery), + $this->getExcludeFiltersComponent($searchQuery) + ); + } + protected $service; /** diff --git a/code/solr/reindex/handlers/SolrReindexBase.php b/code/solr/reindex/handlers/SolrReindexBase.php new file mode 100644 index 0000000..44f6b81 --- /dev/null +++ b/code/solr/reindex/handlers/SolrReindexBase.php @@ -0,0 +1,230 @@ +processIndex($logger, $indexInstance, $batchSize, $taskName, $classes); + } + } + + /** + * Process index for a single SolrIndex instance + * + * @param LoggerInterface $logger + * @param SolrIndex $indexInstance + * @param int $batchSize + * @param string $taskName + * @param string $classes + */ + protected function processIndex( + LoggerInterface $logger, SolrIndex $indexInstance, $batchSize, $taskName, $classes = null + ) { + // Filter classes for this index + $indexClasses = $this->getClassesForIndex($indexInstance, $classes); + + // Clear all records in this index which do not contain the given classes + $logger->info("Clearing obsolete classes from ".$indexInstance->getIndexName()); + $indexInstance->clearObsoleteClasses($indexClasses); + + // Build queue for each class + foreach ($indexClasses as $class => $options) { + $includeSubclasses = $options['include_children']; + + foreach (SearchVariant::reindex_states($class, $includeSubclasses) as $state) { + $this->processVariant($logger, $indexInstance, $state, $class, $includeSubclasses, $batchSize, $taskName); + } + } + } + + /** + * Get valid classes and options for an index with an optional filter + * + * @param SolrIndex $index + * @param string|array $filterClasses Optional class or classes to limit to + * @return array List of classes, where the key is the classname and value is list of options + */ + protected function getClassesForIndex(SolrIndex $index, $filterClasses = null) { + // Get base classes + $classes = $index->getClasses(); + if(!$filterClasses) { + return $classes; + } + + // Apply filter + if(!is_array($filterClasses)) { + $filterClasses = explode(',', $filterClasses); + } + return array_intersect_key($classes, array_combine($filterClasses, $filterClasses)); + } + + /** + * Process re-index for a given variant state and class + * + * @param LoggerInterface $logger + * @param SolrIndex $indexInstance + * @param array $state Variant state + * @param string $class + * @param bool $includeSubclasses + * @param int $batchSize + * @param string $taskName + */ + protected function processVariant( + LoggerInterface $logger, SolrIndex $indexInstance, $state, + $class, $includeSubclasses, $batchSize, $taskName + ) { + // Set state + SearchVariant::activate_state($state); + + // Count records + $query = $class::get(); + if(!$includeSubclasses) { + $query = $query->filter('ClassName', $class); + } + $total = $query->count(); + + // Skip this variant if nothing to process, or if there are no records + if ($total == 0 || $indexInstance->variantStateExcluded($state)) { + // Remove all records in the current state, since there are no groups to process + $logger->info("Clearing all records of type {$class} in the current state: " . json_encode($state)); + $this->clearRecords($indexInstance, $class); + return; + } + + // For each group, run processing + $groups = (int)(($total + $batchSize - 1) / $batchSize); + for ($group = 0; $group < $groups; $group++) { + $this->processGroup($logger, $indexInstance, $state, $class, $groups, $group, $taskName); + } + } + + /** + * Initiate the processing of a single group + * + * @param LoggerInterface $logger + * @param SolrIndex $indexInstance Index instance + * @param array $state Variant state + * @param string $class Class to index + * @param int $groups Total groups + * @param int $group Index of group to process + * @param string $taskName Name of task script to run + */ + abstract protected function processGroup( + LoggerInterface $logger, SolrIndex $indexInstance, $state, $class, $groups, $group, $taskName + ); + + /** + * Explicitly invoke the process that performs the group + * processing. Can be run either by a background task or a queuedjob. + * + * Does not commit changes to the index, so this must be controlled externally. + * + * @param LoggerInterface $logger + * @param SolrIndex $indexInstance + * @param array $state + * @param string $class + * @param int $groups + * @param int $group + */ + public function runGroup( + LoggerInterface $logger, SolrIndex $indexInstance, $state, $class, $groups, $group + ) { + // Set time limit and state + increase_time_limit_to(); + SearchVariant::activate_state($state); + $logger->info("Adding $class"); + + // Prior to adding these records to solr, delete existing solr records + $this->clearRecords($indexInstance, $class, $groups, $group); + + // Process selected records in this class + $items = $this->getRecordsInGroup($indexInstance, $class, $groups, $group); + $processed = array(); + foreach ($items as $item) { + $processed[] = $item->ID; + + // By this point, obsolete classes/states have been removed in processVariant + // and obsolete records have been removed in clearRecords + $indexInstance->add($item); + $item->destroy(); + } + $logger->info("Updated ".implode(',', $processed)); + + // This will slow down things a tiny bit, but it is done so that we don't timeout to the database during a reindex + DB::query('SELECT 1'); + + $logger->info("Done"); + } + + /** + * Gets the datalist of records in the given group in the current state + * + * Assumes that the desired variant state is in effect. + * + * @param SolrIndex $indexInstance + * @param string $class + * @param int $groups + * @param int $group + * @return DataList + */ + protected function getRecordsInGroup(SolrIndex $indexInstance, $class, $groups, $group) { + // Generate filtered list of local records + $baseClass = ClassInfo::baseDataClass($class); + $items = DataList::create($class) + ->where(sprintf( + '"%s"."ID" %% \'%d\' = \'%d\'', + $baseClass, + intval($groups), + intval($group) + )) + ->sort("ID"); + + // Add child filter + $classes = $indexInstance->getClasses(); + $options = $classes[$class]; + if(!$options['include_children']) { + $items = $items->filter('ClassName', $class); + } + + return $items; + } + + /** + * Clear all records of the given class in the current state ONLY. + * + * Optionally delete from a given group (where the group is defined as the ID % total groups) + * + * @param SolrIndex $indexInstance Index instance + * @param string $class Class name + * @param int $groups Number of groups, if clearing from a striped group + * @param int $group Group number, if clearing from a striped group + */ + protected function clearRecords(SolrIndex $indexInstance, $class, $groups = null, $group = null) { + // Clear by classname + $conditions = array("+(ClassHierarchy:{$class})"); + + // If grouping, delete from this group only + if($groups) { + $conditions[] = "+_query_:\"{!frange l={$group} u={$group}}mod(ID, {$groups})\""; + } + + // Also filter by state (suffix on document ID) + $query = new SearchQuery(); + SearchVariant::with($class) + ->call('alterQuery', $query, $indexInstance); + if($query->isfiltered()) { + $conditions = array_merge($conditions, $indexInstance->getFiltersComponent($query)); + } + + // Invoke delete on index + $deleteQuery = implode(' ', $conditions); + $indexInstance + ->getService() + ->deleteByQuery($deleteQuery); + } +} diff --git a/code/solr/reindex/handlers/SolrReindexHandler.php b/code/solr/reindex/handlers/SolrReindexHandler.php new file mode 100644 index 0000000..e57bb2b --- /dev/null +++ b/code/solr/reindex/handlers/SolrReindexHandler.php @@ -0,0 +1,42 @@ +runReindex($logger, $batchSize, $taskName, $classes); + } + + protected function processIndex( + LoggerInterface $logger, SolrIndex $indexInstance, $batchSize, $taskName, $classes = null + ) { + parent::processIndex($logger, $indexInstance, $batchSize, $taskName, $classes); + + // Immediate processor needs to immediately commit after each index + $indexInstance->getService()->commit(); + } + + /** + * Process a single group. + * + * Without queuedjobs, it's necessary to shell this out to a background task as this is + * very memory intensive. + * + * The sub-process will then invoke $processor->runGroup() in {@see Solr_Reindex::doReindex} + * + * @param LoggerInterface $logger + * @param SolrIndex $indexInstance Index instance + * @param array $state Variant state + * @param string $class Class to index + * @param int $groups Total groups + * @param int $group Index of group to process + * @param string $taskName Name of task script to run + */ + protected function processGroup( + LoggerInterface $logger, SolrIndex $indexInstance, $state, $class, $groups, $group, $taskName + ) { + // Build state + $statevar = json_encode($state); + if (strpos(PHP_OS, "WIN") !== false) { + $statevar = '"'.str_replace('"', '\\"', $statevar).'"'; + } else { + $statevar = "'".$statevar."'"; + } + + // Build script + $indexName = $indexInstance->getIndexName(); + $scriptPath = sprintf("%s%sframework%scli-script.php", BASE_PATH, DIRECTORY_SEPARATOR, DIRECTORY_SEPARATOR); + $scriptTask = "php {$scriptPath} dev/tasks/{$taskName}"; + $cmd = "{$scriptTask} index={$indexName} class={$class} group={$group} groups={$groups} variantstate={$statevar}"; + $cmd .= " verbose=1 2>&1"; + $logger->info("Running '$cmd'"); + + // Execute script via shell + $res = $logger ? passthru($cmd) : `$cmd`; + if($logger) { + $logger->info(preg_replace('/\r\n|\n/', '$0 ', $res)); + } + + // If we're in dev mode, commit more often for fun and profit + if (Director::isDev()) { + Solr::service($indexName)->commit(); + } + + // This will slow down things a tiny bit, but it is done so that we don't timeout to the database during a reindex + DB::query('SELECT 1'); + } +} diff --git a/code/solr/reindex/handlers/SolrReindexMessageHandler.php b/code/solr/reindex/handlers/SolrReindexMessageHandler.php new file mode 100644 index 0000000..0631e2c --- /dev/null +++ b/code/solr/reindex/handlers/SolrReindexMessageHandler.php @@ -0,0 +1,40 @@ +get(__CLASS__, 'reindex_queue'); + + $logger->info('Queuing message'); + MessageQueue::send( + $queue, + new MethodInvocationMessage('SolrReindexMessageHandler', 'run_reindex', $batchSize, $taskName, $classes) + ); + } + + /** + * Entry point for message queue + * + * @param int $batchSize + * @param string $taskName + * @param array|string|null $classes + */ + public static function run_reindex($batchSize, $taskName, $classes = null) { + // @todo Logger for message queue? + $logger = Injector::inst()->createWithArgs('Monolog\Logger', array(strtolower(get_class()))); + + $inst = Injector::inst()->get(get_class()); + $inst->runReindex($logger, $batchSize, $taskName, $classes); + } +} diff --git a/code/solr/reindex/handlers/SolrReindexQueuedHandler.php b/code/solr/reindex/handlers/SolrReindexQueuedHandler.php new file mode 100644 index 0000000..388f0de --- /dev/null +++ b/code/solr/reindex/handlers/SolrReindexQueuedHandler.php @@ -0,0 +1,94 @@ +cancelExistingJobs('SolrReindexQueuedJob'); + $groups = $this->cancelExistingJobs('SolrReindexGroupQueuedJob'); + $logger->info("Cancelled {$queues} re-index tasks and {$groups} re-index groups"); + + // Although this class is used as a service (singleton) it may also be instantiated + // as a queuedjob + $job = Injector::inst()->create('SolrReindexQueuedJob', $batchSize, $taskName, $classes); + $this + ->getQueuedJobService() + ->queueJob($job); + + $title = $job->getTitle(); + $logger->info("Queued {$title}"); + } + + protected function processGroup( + LoggerInterface $logger, SolrIndex $indexInstance, $state, $class, $groups, $group, $taskName + ) { + // Trigger another job for this group + $job = Injector::inst()->create( + 'SolrReindexGroupQueuedJob', + $indexInstance->getIndexName(), $state, $class, $groups, $group + ); + $this + ->getQueuedJobService() + ->queueJob($job); + + $title = $job->getTitle(); + $logger->info("Queued {$title}"); + } + + public function runGroup( + LoggerInterface $logger, SolrIndex $indexInstance, $state, $class, $groups, $group + ) { + parent::runGroup($logger, $indexInstance, $state, $class, $groups, $group); + + // After any changes have been made, mark all indexes as dirty for commit + // see http://stackoverflow.com/questions/7512945/how-to-fix-exceeded-limit-of-maxwarmingsearchers + $logger->info("Queuing commit on all changes"); + SearchUpdateCommitJobProcessor::queue(); + } + +} diff --git a/code/solr/reindex/jobs/SolrReindexGroupQueuedJob.php b/code/solr/reindex/jobs/SolrReindexGroupQueuedJob.php new file mode 100644 index 0000000..666f24d --- /dev/null +++ b/code/solr/reindex/jobs/SolrReindexGroupQueuedJob.php @@ -0,0 +1,117 @@ +indexName = $indexName; + $this->state = $state; + $this->class = $class; + $this->groups = $groups; + $this->group = $group; + } + + public function getJobData() { + $data = parent::getJobData(); + + // Custom data + $data->jobData->indexName = $this->indexName; + $data->jobData->state = $this->state; + $data->jobData->class = $this->class; + $data->jobData->groups = $this->groups; + $data->jobData->group = $this->group; + + return $data; + } + + public function setJobData($totalSteps, $currentStep, $isComplete, $jobData, $messages) { + parent::setJobData($totalSteps, $currentStep, $isComplete, $jobData, $messages); + + // Custom data + $this->indexName = $jobData->indexName; + $this->state = $jobData->state; + $this->class = $jobData->class; + $this->groups = $jobData->groups; + $this->group = $jobData->group; + } + + public function getSignature() { + return md5(get_class($this) . time() . mt_rand(0, 100000)); + } + + public function getTitle() { + return sprintf( + 'Solr Reindex Group (%d/%d) of %s in %s', + ($this->group+1), + $this->groups, + $this->class, + json_encode($this->state) + ); + } + + public function process() { + $logger = $this->getLogger(); + if($this->jobFinished()) { + $logger->notice("reindex group already complete"); + return; + } + + // Get instance of index + $indexInstance = singleton($this->indexName); + + // Send back to processor + $logger->info("Beginning reindex group"); + $this + ->getHandler() + ->runGroup($logger, $indexInstance, $this->state, $this->class, $this->groups, $this->group); + $logger->info("Completed reindex group"); + $this->isComplete = true; + } + +} diff --git a/code/solr/reindex/jobs/SolrReindexQueuedJob.php b/code/solr/reindex/jobs/SolrReindexQueuedJob.php new file mode 100644 index 0000000..b9c4250 --- /dev/null +++ b/code/solr/reindex/jobs/SolrReindexQueuedJob.php @@ -0,0 +1,91 @@ +batchSize = $batchSize; + $this->taskName = $taskName; + $this->classes = $classes; + parent::__construct(); + } + + public function getJobData() { + $data = parent::getJobData(); + + // Custom data + $data->jobData->batchSize = $this->batchSize; + $data->jobData->taskName = $this->taskName; + $data->jobData->classes = $this->classes; + + return $data; + } + + public function setJobData($totalSteps, $currentStep, $isComplete, $jobData, $messages) { + parent::setJobData($totalSteps, $currentStep, $isComplete, $jobData, $messages); + + // Custom data + $this->batchSize = $jobData->batchSize; + $this->taskName = $jobData->taskName; + $this->classes = $jobData->classes; + } + + public function getSignature() { + return __CLASS__; + } + + public function getTitle() { + return 'Solr Reindex Job'; + } + + public function process() { + $logger = $this->getLogger(); + if($this->jobFinished()) { + $logger->notice("reindex already complete"); + return; + } + + // Send back to processor + $logger->info("Beginning init of reindex"); + $this + ->getHandler() + ->runReindex($logger, $this->batchSize, $this->taskName, $this->classes); + $logger->info("Completed init of reindex"); + $this->isComplete = true; + } + + /** + * Get size of batch + * + * @return int + */ + public function getBatchSize() { + return $this->batchSize; + } +} diff --git a/code/solr/reindex/jobs/SolrReindexQueuedJobBase.php b/code/solr/reindex/jobs/SolrReindexQueuedJobBase.php new file mode 100644 index 0000000..74b569f --- /dev/null +++ b/code/solr/reindex/jobs/SolrReindexQueuedJobBase.php @@ -0,0 +1,123 @@ +isComplete = false; + $this->messages = array(); + } + + /** + * @return SearchLogFactory + */ + protected function getLoggerFactory() { + return Injector::inst()->get('SearchLogFactory'); + } + + /** + * Gets a logger for this job + * + * @return LoggerInterface + */ + protected function getLogger() { + if($this->logger) { + return $this->logger; + } + + // Set logger for this job + $this->logger = $this + ->getLoggerFactory() + ->getQueuedJobLogger($this); + return $this->logger; + } + + /** + * Assign custom logger for this job + * + * @param LoggerInterface $logger + */ + public function setLogger($logger) { + $this->logger = $logger; + } + + public function getJobData() { + $data = new stdClass(); + + // Standard fields + $data->totalSteps = 1; + $data->currentStep = $this->isComplete ? 0 : 1; + $data->isComplete = $this->isComplete; + $data->messages = $this->messages; + + // Custom data + $data->jobData = new stdClass(); + return $data; + } + + public function setJobData($totalSteps, $currentStep, $isComplete, $jobData, $messages) { + $this->isComplete = $isComplete; + $this->messages = $messages; + } + + /** + * Get the reindex handler + * + * @return SolrReindexHandler + */ + protected function getHandler() { + return Injector::inst()->get('SolrReindexHandler'); + } + + public function jobFinished() { + return $this->isComplete; + } + + public function prepareForRestart() { + // NOOP + } + + public function setup() { + // NOOP + } + + public function afterComplete() { + // NOOP + } + + public function getJobType() { + return QueuedJob::QUEUED; + } + + public function addMessage($message) { + $this->messages[] = $message; + } +} diff --git a/code/utils/logging/MonologFactory.php b/code/utils/logging/MonologFactory.php new file mode 100644 index 0000000..dc2d8d2 --- /dev/null +++ b/code/utils/logging/MonologFactory.php @@ -0,0 +1,98 @@ +getLoggerFor($name); + $formatter = $this->getFormatter(); + + // Notice handling + if($verbose) { + $messageHandler = $this->getStreamHandler($formatter, 'php://stdout', Logger::INFO); + $logger->pushHandler($messageHandler); + } + + // Error handling. buble is false so that errors aren't logged twice + $errorHandler = $this->getStreamHandler($formatter, 'php://stderr', Logger::ERROR, false); + $logger->pushHandler($errorHandler); + return $logger; + } + + public function getQueuedJobLogger($job) { + $logger = $this->getLoggerFor(get_class($job)); + $handler = $this->getJobHandler($job); + $logger->pushHandler($handler); + return $logger; + } + + /** + * Generate a handler for the given stream + * + * @param FormatterInterface $formatter + * @param string $stream Name of preferred stream + * @param int $level + * @param bool $bubble + * @return HandlerInterface + */ + protected function getStreamHandler(FormatterInterface $formatter, $stream, $level = Logger::DEBUG, $bubble = true) { + // Unless cli, force output to php://output + $stream = Director::is_cli() ? $stream : 'php://output'; + $handler = Injector::inst()->createWithArgs( + 'Monolog\Handler\StreamHandler', + array($stream, $level, $bubble) + ); + $handler->setFormatter($formatter); + return $handler; + } + + /** + * Gets a formatter for standard output + * + * @return FormatterInterface + */ + protected function getFormatter() { + // Get formatter + $format = LineFormatter::SIMPLE_FORMAT; + if(!Director::is_cli()) { + $format = "

$format

"; + } + return Injector::inst()->createWithArgs( + 'Monolog\Formatter\LineFormatter', + array($format) + ); + } + + /** + * Get a logger for a named class + * + * @param string $name + * @return Logger + */ + protected function getLoggerFor($name) { + return Injector::inst()->createWithArgs( + 'Monolog\Logger', + array(strtolower($name)) + ); + } + + /** + * Generate handler for a job object + * + * @param QueuedJob $job + * @return HandlerInterface + */ + protected function getJobHandler($job) { + return Injector::inst()->createWithArgs( + 'QueuedJobLogHandler', + array($job, Logger::INFO) + ); + } +} diff --git a/code/utils/logging/QueuedJobLogHandler.php b/code/utils/logging/QueuedJobLogHandler.php new file mode 100644 index 0000000..666cdfd --- /dev/null +++ b/code/utils/logging/QueuedJobLogHandler.php @@ -0,0 +1,53 @@ +setQueuedJob($queuedJob); + } + + /** + * Set a new queuedjob + * + * @param QueuedJob $queuedJob + */ + public function setQueuedJob(QueuedJob $queuedJob) { + $this->queuedJob = $queuedJob; + } + + /** + * Get queuedjob + * + * @return QueuedJob + */ + public function getQueuedJob() { + return $this->queuedJob; + } + + protected function write(array $record) { + // Write formatted message + $this->getQueuedJob()->addMessage($record['formatted']); + } + +} diff --git a/code/utils/logging/SearchLogFactory.php b/code/utils/logging/SearchLogFactory.php new file mode 100644 index 0000000..6296691 --- /dev/null +++ b/code/utils/logging/SearchLogFactory.php @@ -0,0 +1,23 @@ +write()` will automatically update the index entry for this record (and all its variants). -You can narrow down the operation with the following options: +This task has the following options: + +- `verbose`: Debug information + +Internally, depending on what job processing backend you have configured (such as queuedjobs) +individual tasks for re-indexing groups of records may either be performed behind the scenes +as crontasks, or via separate processes initiated by the current request. + +Internally groups of records are grouped into sizes of 200. You can configure this +group sizing by using the `Solr_Reindex.recordsPerRequest` config. + + + :::yaml + Solr_Reindex: + recordsPerRequest: 150 - - `index`: PHP class name of an index - - `class`: PHP model class to reindex - - `start`: Offset (applies to matched records) - - `variantstate`: JSON encoded string with state, e.g. '{"SearchVariantVersioned":"Stage"}' - - `verbose`: Debug information Note: The Solr indexes will be stored as binary files inside your SilverStripe project. You can also copy the `thirdparty/` solr directory somewhere else, diff --git a/tests/SolrReindexQueuedTest.php b/tests/SolrReindexQueuedTest.php new file mode 100644 index 0000000..5c6539f --- /dev/null +++ b/tests/SolrReindexQueuedTest.php @@ -0,0 +1,219 @@ +skipTest = true; + return $this->markTestSkipped("These tests need the Phockito module installed to run"); + } + + if(!interface_exists('QueuedJob')) { + $this->skipTest = true; + return $this->markTestSkipped("These tests need the QueuedJobs module installed to run"); + } + + // Set queued handler for reindex + Config::inst()->update('Injector', 'SolrReindexHandler', array( + 'class' => 'SolrReindexQueuedHandler' + )); + Injector::inst()->registerService(new SolrReindexQueuedHandler(), 'SolrReindexHandler'); + + // Set test variant + SolrReindexTest_Variant::enable(); + + // Set index list + $this->service = $this->getServiceMock(); + $this->index = singleton('SolrReindexTest_Index'); + $this->index->setService($this->service); + FullTextSearch::force_index_list($this->index); + } + + /** + * Populate database with dummy dataset + * + * @param int $number Number of records to create in each variant + */ + protected function createDummyData($number) { + // Populate dataobjects. Use truncate to generate predictable IDs + DB::query('TRUNCATE "SolrReindexTest_Item"'); + + // Note that we don't create any records in variant = 2, to represent a variant + // that should be cleared without any re-indexes performed + foreach(array(0, 1) as $variant) { + for($i = 1; $i <= $number; $i++) { + $item = new SolrReindexTest_Item(); + $item->Variant = $variant; + $item->Title = "Item $variant / $i"; + $item->write(); + } + } + } + + /** + * Mock service + * + * @return SolrService + */ + protected function getServiceMock() { + return Phockito::mock('Solr4Service'); + } + + public function tearDown() { + FullTextSearch::force_index_list(); + SolrReindexTest_Variant::disable(); + parent::tearDown(); + } + + /** + * Get the reindex handler + * + * @return SolrReindexHandler + */ + protected function getHandler() { + return Injector::inst()->get('SolrReindexHandler'); + } + + /** + * @return SolrReindexQueuedTest_Service + */ + protected function getQueuedJobService() { + return singleton('SolrReindexQueuedTest_Service'); + } + + /** + * Test that reindex will generate a top top level queued job, and executing this will perform + * the necessary initialisation of the grouped queued jobs + */ + public function testReindexSegmentsGroups() { + $this->createDummyData(18); + + // Create pre-existing jobs + $this->getQueuedJobService()->queueJob(new SolrReindexQueuedJob()); + $this->getQueuedJobService()->queueJob(new SolrReindexGroupQueuedJob()); + $this->getQueuedJobService()->queueJob(new SolrReindexGroupQueuedJob()); + + // Initiate re-index + $logger = new SolrReindexTest_RecordingLogger(); + $this->getHandler()->triggerReindex($logger, 6, 'Solr_Reindex'); + + // Old jobs should be cancelled + $this->assertEquals(1, $logger->countMessages('Cancelled 1 re-index tasks and 2 re-index groups')); + $this->assertEquals(1, $logger->countMessages('Queued Solr Reindex Job')); + + // Next job should be queue job + $job = $this->getQueuedJobService()->getNextJob(); + $this->assertInstanceOf('SolrReindexQueuedJob', $job); + $this->assertEquals(6, $job->getBatchSize()); + + // Test that necessary items are created + $logger->clear(); + $job->setLogger($logger); + $job->process(); + + // Deletes are performed in the main task prior to individual groups being processed + // 18 records means 3 groups of 6 in each variant (6 total) + Phockito::verify($this->service, 2) + ->deleteByQuery(anything()); + $this->assertEquals(1, $logger->countMessages('Beginning init of reindex')); + $this->assertEquals(6, $logger->countMessages('Queued Solr Reindex Group ')); + $this->assertEquals(3, $logger->countMessages(' of SolrReindexTest_Item in {"SolrReindexTest_Variant":"0"}')); + $this->assertEquals(3, $logger->countMessages(' of SolrReindexTest_Item in {"SolrReindexTest_Variant":"1"}')); + $this->assertEquals(1, $logger->countMessages('Completed init of reindex')); + + + // Test that invalid classes are removed + $this->assertNotEmpty($logger->getMessages('Clearing obsolete classes from SolrReindexTest_Index')); + Phockito::verify($this->service, 1) + ->deleteByQuery('-(ClassHierarchy:SolrReindexTest_Item)'); + + // Test that valid classes in invalid variants are removed + $this->assertNotEmpty($logger->getMessages( + 'Clearing all records of type SolrReindexTest_Item in the current state: {"SolrReindexTest_Variant":"2"}' + )); + Phockito::verify($this->service, 1) + ->deleteByQuery('+(ClassHierarchy:SolrReindexTest_Item) +(_testvariant:"2")'); + } + + /** + * Test index processing on individual groups + */ + public function testRunGroup() { + $this->createDummyData(18); + + // Just do what the SolrReindexQueuedJob would do to create each sub + $logger = new SolrReindexTest_RecordingLogger(); + $this->getHandler()->runReindex($logger, 6, 'Solr_Reindex'); + + // Assert jobs are created + $this->assertEquals(6, $logger->countMessages('Queued Solr Reindex Group')); + + // Check next job is a group queued job + $job = $this->getQueuedJobService()->getNextJob(); + $this->assertInstanceOf('SolrReindexGroupQueuedJob', $job); + $this->assertEquals( + 'Solr Reindex Group (1/3) of SolrReindexTest_Item in {"SolrReindexTest_Variant":"0"}', + $job->getTitle() + ); + + // Running this job performs the necessary reindex + $logger->clear(); + $job->setLogger($logger); + $job->process(); + + // Check tasks completed (as per non-queuedjob version) + $this->assertEquals(1, $logger->countMessages('Beginning reindex group')); + $this->assertEquals(1, $logger->countMessages('Adding SolrReindexTest_Item')); + $this->assertEquals(1, $logger->countMessages('Queuing commit on all changes')); + $this->assertEquals(1, $logger->countMessages('Completed reindex group')); + + // Check IDs + $idMessage = $logger->filterMessages('Updated '); + $this->assertNotEmpty(preg_match('/^Updated (?[,\d]+)/i', $idMessage[0], $matches)); + $ids = array_unique(explode(',', $matches['ids'])); + $this->assertEquals(6, count($ids)); + foreach($ids as $id) { + // Each id should be % 3 == 0 + $this->assertEquals(0, $id % 3, "ID $id Should match pattern ID % 3 = 0"); + } + } +} + +if(!class_exists('QueuedJobService')) return; + +class SolrReindexQueuedTest_Service extends QueuedJobService implements TestOnly { + + /** + * @return QueuedJob + */ + public function getNextJob() { + $job = $this->getNextPendingJob(); + return $this->initialiseJob($job); + } + +} \ No newline at end of file diff --git a/tests/SolrReindexTest.php b/tests/SolrReindexTest.php new file mode 100644 index 0000000..f352308 --- /dev/null +++ b/tests/SolrReindexTest.php @@ -0,0 +1,531 @@ +skipTest = true; + return $this->markTestSkipped("These tests need the Phockito module installed to run"); + } + + // Set test handler for reindex + Config::inst()->update('Injector', 'SolrReindexHandler', array( + 'class' => 'SolrReindexTest_TestHandler' + )); + Injector::inst()->registerService(new SolrReindexTest_TestHandler(), 'SolrReindexHandler'); + + // Set test variant + SolrReindexTest_Variant::enable(); + + // Set index list + $this->service = $this->getServiceMock(); + $this->index = singleton('SolrReindexTest_Index'); + $this->index->setService($this->service); + FullTextSearch::force_index_list($this->index); + } + + /** + * Populate database with dummy dataset + * + * @param int $number Number of records to create in each variant + */ + protected function createDummyData($number) { + // Populate dataobjects. Use truncate to generate predictable IDs + DB::query('TRUNCATE "SolrReindexTest_Item"'); + + // Note that we don't create any records in variant = 2, to represent a variant + // that should be cleared without any re-indexes performed + foreach(array(0, 1) as $variant) { + for($i = 1; $i <= $number; $i++) { + $item = new SolrReindexTest_Item(); + $item->Variant = $variant; + $item->Title = "Item $variant / $i"; + $item->write(); + } + } + } + + /** + * Mock service + * + * @return SolrService + */ + protected function getServiceMock() { + return Phockito::mock('Solr4Service'); + } + + public function tearDown() { + FullTextSearch::force_index_list(); + SolrReindexTest_Variant::disable(); + parent::tearDown(); + } + + /** + * Get the reindex handler + * + * @return SolrReindexHandler + */ + protected function getHandler() { + return Injector::inst()->get('SolrReindexHandler'); + } + + /** + * Ensure the test variant is up and running properly + */ + public function testVariant() { + // State defaults to 0 + $variant = SearchVariant::current_state(); + $this->assertEquals( + array( + "SolrReindexTest_Variant" => "0" + ), + $variant + ); + + // All states enumerated + $allStates = iterator_to_array(SearchVariant::reindex_states()); + $this->assertEquals( + array( + array( + "SolrReindexTest_Variant" => "0" + ), + array( + "SolrReindexTest_Variant" => "1" + ), + array( + "SolrReindexTest_Variant" => "2" + ) + ), + $allStates + ); + + // Check correct items created and that filtering on variant works + $this->createDummyData(120); + SolrReindexTest_Variant::set_current(2); + $this->assertEquals(0, SolrReindexTest_Item::get()->count()); + SolrReindexTest_Variant::set_current(1); + $this->assertEquals(120, SolrReindexTest_Item::get()->count()); + SolrReindexTest_Variant::set_current(0); + $this->assertEquals(120, SolrReindexTest_Item::get()->count()); + SolrReindexTest_Variant::disable(); + $this->assertEquals(240, SolrReindexTest_Item::get()->count()); + } + + + /** + * Given the invocation of a new re-index with a given set of data, ensure that the necessary + * list of groups are created and segmented for each state + * + * Test should work fine with any variants (versioned, subsites, etc) specified + */ + public function testReindexSegmentsGroups() { + $this->createDummyData(120); + + // Initiate re-index + $logger = new SolrReindexTest_RecordingLogger(); + $this->getHandler()->runReindex($logger, 21, 'Solr_Reindex'); + + // Test that invalid classes are removed + $this->assertNotEmpty($logger->getMessages('Clearing obsolete classes from SolrReindexTest_Index')); + Phockito::verify($this->service, 1) + ->deleteByQuery('-(ClassHierarchy:SolrReindexTest_Item)'); + + // Test that valid classes in invalid variants are removed + $this->assertNotEmpty($logger->getMessages( + 'Clearing all records of type SolrReindexTest_Item in the current state: {"SolrReindexTest_Variant":"2"}' + )); + Phockito::verify($this->service, 1) + ->deleteByQuery('+(ClassHierarchy:SolrReindexTest_Item) +(_testvariant:"2")'); + + // 120x2 grouped into groups of 21 results in 12 groups + $this->assertEquals(12, $logger->countMessages('Called processGroup with ')); + $this->assertEquals(6, $logger->countMessages('{"SolrReindexTest_Variant":"0"}')); + $this->assertEquals(6, $logger->countMessages('{"SolrReindexTest_Variant":"1"}')); + + // Given that there are two variants, there should be two group ids of each number + $this->assertEquals(2, $logger->countMessages(' SolrReindexTest_Item, group 0 of 6')); + $this->assertEquals(2, $logger->countMessages(' SolrReindexTest_Item, group 1 of 6')); + $this->assertEquals(2, $logger->countMessages(' SolrReindexTest_Item, group 2 of 6')); + $this->assertEquals(2, $logger->countMessages(' SolrReindexTest_Item, group 3 of 6')); + $this->assertEquals(2, $logger->countMessages(' SolrReindexTest_Item, group 4 of 6')); + $this->assertEquals(2, $logger->countMessages(' SolrReindexTest_Item, group 5 of 6')); + + // Check various group sizes + $logger->clear(); + $this->getHandler()->runReindex($logger, 120, 'Solr_Reindex'); + $this->assertEquals(2, $logger->countMessages('Called processGroup with ')); + $logger->clear(); + $this->getHandler()->runReindex($logger, 119, 'Solr_Reindex'); + $this->assertEquals(4, $logger->countMessages('Called processGroup with ')); + $logger->clear(); + $this->getHandler()->runReindex($logger, 121, 'Solr_Reindex'); + $this->assertEquals(2, $logger->countMessages('Called processGroup with ')); + $logger->clear(); + $this->getHandler()->runReindex($logger, 2, 'Solr_Reindex'); + $this->assertEquals(120, $logger->countMessages('Called processGroup with ')); + } + + /** + * Test index processing on individual groups + */ + public function testRunGroup() { + $this->createDummyData(120); + $logger = new SolrReindexTest_RecordingLogger(); + + // Initiate re-index of third group (index 2 of 6) + $state = array('SolrReindexTest_Variant' => '1'); + $this->getHandler()->runGroup($logger, $this->index, $state, 'SolrReindexTest_Item', 6, 2); + $idMessage = $logger->filterMessages('Updated '); + $this->assertNotEmpty(preg_match('/^Updated (?[,\d]+)/i', $idMessage[0], $matches)); + $ids = array_unique(explode(',', $matches['ids'])); + + // Test successful + $this->assertNotEmpty($logger->getMessages('Adding SolrReindexTest_Item')); + $this->assertNotEmpty($logger->getMessages('Done')); + + // Test that items in this variant / group are cleared from solr + Phockito::verify($this->service, 1)->deleteByQuery( + '+(ClassHierarchy:SolrReindexTest_Item) +_query_:"{!frange l=2 u=2}mod(ID, 6)" +(_testvariant:"1")' + ); + + // Test that items in this variant / group are re-indexed + // 120 divided into 6 groups should be 20 at least (max 21) + $this->assertEquals(21, count($ids), 'Group size is about 20', 1); + foreach($ids as $id) { + // Each id should be % 6 == 2 + $this->assertEquals(2, $id % 6, "ID $id Should match pattern ID % 6 = 2"); + } + } + + /** + * Test that running all groups covers the entire range of dataobject IDs + */ + public function testRunAllGroups() { + $this->createDummyData(120); + $logger = new SolrReindexTest_RecordingLogger(); + + // Test that running all groups covers the complete set of ids + $state = array('SolrReindexTest_Variant' => '1'); + for($i = 0; $i < 6; $i++) { + // See testReindexSegmentsGroups for test that each of these states is invoked during a full reindex + $this + ->getHandler() + ->runGroup($logger, $this->index, $state, 'SolrReindexTest_Item', 6, $i); + } + + // Count all ids updated + $ids = array(); + foreach($logger->filterMessages('Updated ') as $message) { + $this->assertNotEmpty(preg_match('/^Updated (?[,\d]+)/', $message, $matches)); + $ids = array_unique(array_merge($ids, explode(',', $matches['ids']))); + } + + // Check ids + $this->assertEquals(120, count($ids)); + Phockito::verify($this->service, 6)->deleteByQuery(anything()); + Phockito::verify($this->service, 1)->deleteByQuery( + '+(ClassHierarchy:SolrReindexTest_Item) +_query_:"{!frange l=0 u=0}mod(ID, 6)" +(_testvariant:"1")' + ); + Phockito::verify($this->service, 1)->deleteByQuery( + '+(ClassHierarchy:SolrReindexTest_Item) +_query_:"{!frange l=1 u=1}mod(ID, 6)" +(_testvariant:"1")' + ); + Phockito::verify($this->service, 1)->deleteByQuery( + '+(ClassHierarchy:SolrReindexTest_Item) +_query_:"{!frange l=2 u=2}mod(ID, 6)" +(_testvariant:"1")' + ); + Phockito::verify($this->service, 1)->deleteByQuery( + '+(ClassHierarchy:SolrReindexTest_Item) +_query_:"{!frange l=3 u=3}mod(ID, 6)" +(_testvariant:"1")' + ); + Phockito::verify($this->service, 1)->deleteByQuery( + '+(ClassHierarchy:SolrReindexTest_Item) +_query_:"{!frange l=4 u=4}mod(ID, 6)" +(_testvariant:"1")' + ); + Phockito::verify($this->service, 1)->deleteByQuery( + '+(ClassHierarchy:SolrReindexTest_Item) +_query_:"{!frange l=5 u=5}mod(ID, 6)" +(_testvariant:"1")' + ); + } +} + +/** + * Provides a wrapper for testing SolrReindexBase + */ +class SolrReindexTest_TestHandler extends SolrReindexBase { + + public function processGroup( + LoggerInterface $logger, SolrIndex $indexInstance, $state, $class, $groups, $group, $taskName + ) { + $indexName = $indexInstance->getIndexName(); + $stateName = json_encode($state); + $logger->info("Called processGroup with {$indexName}, {$stateName}, {$class}, group {$group} of {$groups}"); + } + + public function triggerReindex(LoggerInterface $logger, $batchSize, $taskName, $classes = null) { + $logger->info("Called triggerReindex"); + } + +} + + +class SolrReindexTest_Index extends SolrIndex implements TestOnly { + public function init() { + $this->addClass('SolrReindexTest_Item'); + $this->addAllFulltextFields(); + } +} + +/** + * Does not have any variant extensions + */ +class SolrReindexTest_Item extends DataObject implements TestOnly { + + private static $extensions = array( + 'SolrReindexTest_ItemExtension' + ); + + private static $db = array( + 'Title' => 'Varchar(255)', + 'Variant' => 'Int(0)' + ); + +} + +/** + * Select only records in the current variant + */ +class SolrReindexTest_ItemExtension extends DataExtension implements TestOnly { + + /** + * Filter records on the current variant + * + * @param SQLQuery $query + * @param DataQuery $dataQuery + */ + public function augmentSQL(SQLQuery &$query, DataQuery &$dataQuery = null) { + $variant = SolrReindexTest_Variant::get_current(); + if($variant !== null && !$query->filtersOnID()) { + $sqlVariant = Convert::raw2sql($variant); + $query->addWhere("\"Variant\" = '{$sqlVariant}'"); + } + } +} + + +/** + * Dummy variant that selects items with field Varient matching the current value + * + * Variant states are 0 and 1, or null if disabled + */ +class SolrReindexTest_Variant extends SearchVariant implements TestOnly { + + /** + * Value of this variant (either null, 0, or 1) + * + * @var int|null + */ + protected static $current = null; + + /** + * Activate this variant + */ + public static function enable() { + self::disable(); + + self::$current = 0; + self::$variants = array( + 'SolrReindexTest_Variant' => singleton('SolrReindexTest_Variant') + ); + } + + /** + * Disable this variant and reset + */ + public static function disable() { + self::$current = null; + self::$variants = null; + self::$class_variants = array(); + self::$call_instances = array(); + } + + public function activateState($state) { + self::set_current($state); + } + + /** + * Set the current variant to the given state + * + * @param int $current 0, 1, 2, or null (disabled) + */ + public static function set_current($current) { + self::$current = $current; + } + + /** + * Get the current state + * + * @return string|null + */ + public static function get_current() { + // Always use string values for states for consistent json_encode value + if(isset(self::$current)) { + return (string)self::$current; + } + } + + function alterDefinition($base, $index) { + $self = get_class($this); + + $index->filterFields['_testvariant'] = array( + 'name' => '_testvariant', + 'field' => '_testvariant', + 'fullfield' => '_testvariant', + 'base' => $base, + 'origin' => $base, + 'type' => 'Int', + 'lookup_chain' => array(array('call' => 'variant', 'variant' => $self, 'method' => 'currentState')) + ); + } + + public function alterQuery($query, $index) { + // I guess just calling it _testvariant is ok? + $query->filter('_testvariant', $this->currentState()); + } + + public function appliesTo($class, $includeSubclasses) { + return $class === 'SolrReindexTest_Item' || + ($includeSubclasses && is_subclass_of($class, 'SolrReindexTest_Item', true)); + } + + public function appliesToEnvironment() { + // Set to null to disable + return self::$current !== null; + } + + public function currentState() { + return self::get_current(); + } + + public function reindexStates() { + // Always use string values for states for consistent json_encode value + return array('0', '1', '2'); + } + +} + +/** + * Test logger for recording messages + */ +class SolrReindexTest_RecordingLogger extends Logger implements TestOnly { + + /** + * @var SolrReindexTest_Handler + */ + protected $testHandler = null; + + public function __construct($name = 'testlogger', array $handlers = array(), array $processors = array()) { + parent::__construct($name, $handlers, $processors); + + $this->testHandler = new SolrReindexTest_Handler(); + $this->pushHandler($this->testHandler); + } + + /** + * @return array + */ + public function getMessages() { + return $this->testHandler->getMessages(); + } + + /** + * Clear all messages + */ + public function clear() { + $this->testHandler->clear(); + } + + /** + * Get messages with the given filter + * + * @param string $containing + * @return array Filtered array + */ + public function filterMessages($containing) { + return array_values(array_filter( + $this->getMessages(), + function($content) use ($containing) { + return stripos($content, $containing) !== false; + } + )); + } + + /** + * Count all messages containing the given substring + * + * @param string $containing Message to filter by + * @return int + */ + public function countMessages($containing = null) { + if($containing) { + $messages = $this->filterMessages($containing); + } else { + $messages = $this->getMessages(); + } + return count($messages); + } +} + +/** + * Logger for recording messages for later retrieval + */ +class SolrReindexTest_Handler extends AbstractProcessingHandler implements TestOnly { + + /** + * Messages + * + * @var array + */ + protected $messages = array(); + + /** + * Get all messages + * + * @return array + */ + public function getMessages() { + return $this->messages; + } + + public function clear() { + $this->messages = array(); + } + + protected function write(array $record) { + $this->messages[] = $record['message']; + } +} \ No newline at end of file