API QueuedJob support for Solr_Reindex

API Incremental clear and re-index of records rather than clearing all records from SolrIndex up front
This commit is contained in:
Damian Mooyman 2015-07-16 18:18:04 +12:00
parent 259bd033f1
commit 1683f776bd
22 changed files with 2109 additions and 204 deletions

View File

@ -2,3 +2,5 @@
global $databaseConfig;
if (isset($databaseConfig['type'])) SearchUpdater::bind_manipulation_capture();
Deprecation::notification_version('1.0.0', 'fulltextsearch');

View File

@ -4,6 +4,10 @@ Name: defaultprocessor
Injector:
SearchUpdateProcessor:
class: SearchUpdateImmediateProcessor
SolrReindexHandler:
class: SolrReindexImmediateHandler
SearchLogFactory:
class: 'MonologFactory'
---
Name: messagequeueprocessor
Only:
@ -14,6 +18,8 @@ Except:
Injector:
SearchUpdateProcessor:
class: SearchUpdateMessageQueueProcessor
SolrReindexHandler:
class: SolrReindexMessageHandler
---
Name: queuedjobprocessor
Only:
@ -24,3 +30,5 @@ Except:
Injector:
SearchUpdateProcessor:
class: SearchUpdateQueuedJobProcessor
SolrReindexHandler:
class: SolrReindexQueuedHandler

View File

@ -39,6 +39,14 @@ abstract class SearchVariant {
*/
abstract function activateState($state);
/**
* Apply this variant to a search query
*
* @param SearchQuery $query
* @param SearchIndex $index
*/
abstract public function alterQuery($query, $index);
/*** OVERRIDES end here*/
/** Holds a cache of all variants */

View File

@ -48,7 +48,7 @@ class SearchVariantSiteTreeSubsitesPolyhome extends SearchVariant {
);
}
function alterQuery($query, $index) {
public function alterQuery($query, $index) {
$subsite = Subsite::currentSubsiteID();
$query->filter('_subsite', array($subsite, SearchQuery::$missing));
}

View File

@ -28,7 +28,7 @@ class SearchVariantVersioned extends SearchVariant {
);
}
function alterQuery($query) {
public function alterQuery($query, $index) {
$stage = Versioned::current_stage();
$query->filter('_versionedstage', array($stage, SearchQuery::$missing));
}

View File

@ -1,5 +1,10 @@
<?php
use Monolog\Formatter\LineFormatter;
use Monolog\Handler\StreamHandler;
use Monolog\Logger;
use Psr\Log\LoggerInterface;
class Solr {
/**
@ -145,10 +150,70 @@ class Solr {
}
}
/**
* Abstract class for build tasks
*/
class Solr_BuildTask extends BuildTask {
class Solr_Configure extends BuildTask {
protected $enabled = false;
/**
* Logger
*
* @var LoggerInterface
*/
protected $logger = null;
/**
* Get the current logger
*
* @return LoggerInterface
*/
public function getLogger() {
return $this->logger;
}
/**
* Assign a new logger
*
* @param LoggerInterface $logger
*/
public function setLogger(LoggerInterface $logger) {
$this->logger = $logger;
}
/**
* @return SearchLogFactory
*/
protected function getLoggerFactory() {
return Injector::inst()->get('SearchLogFactory');
}
/**
* Setup task
*
* @param SS_HTTPReqest $request
*/
public function run($request) {
$name = get_class($this);
$verbose = $request->getVar('verbose');
// Set new logger
$logger = $this
->getLoggerFactory()
->getOutputLogger($name, $verbose);
$this->setLogger($logger);
}
}
class Solr_Configure extends Solr_BuildTask {
protected $enabled = true;
public function run($request) {
parent::run($request);
// Find the IndexStore handler, which will handle uploading config files to Solr
$store = $this->getSolrConfigStore();
$indexes = Solr::get_indexes();
@ -158,7 +223,9 @@ class Solr_Configure extends BuildTask {
$this->updateIndex($instance, $store);
} catch(Exception $e) {
// We got an exception. Warn, but continue to next index.
$this->log("Failure: " . $e->getMessage());
$this
->getLogger()
->error("Failure: " . $e->getMessage());
}
}
}
@ -171,26 +238,25 @@ class Solr_Configure extends BuildTask {
*/
protected function updateIndex($instance, $store) {
$index = $instance->getIndexName();
$this->log("Configuring $index.");
$this->log("Uploading configuration ... ");
$this->getLogger()->info("Configuring $index.");
// Upload the config files for this index
$this->getLogger()->info("Uploading configuration ...");
$instance->uploadConfig($store);
// Then tell Solr to use those config files
$service = Solr::service();
if ($service->coreIsActive($index)) {
$this->log("Reloading core ...");
$this->getLogger()->info("Reloading core ...");
$service->coreReload($index);
} else {
$this->log("Creating core ...");
$this->getLogger()->info("Creating core ...");
$service->coreCreate($index, $store->instanceDir($index));
}
$this->log("Done");
$this->getLogger()->info("Done");
}
/**
* Get config store
*
@ -217,19 +283,26 @@ class Solr_Configure extends BuildTask {
}
}
protected function log($message) {
if(Director::is_cli()) {
echo $message . "\n";
} else {
echo Convert::raw2xml($message) . "<br />";
}
flush();
}
}
/**
* Task used for both initiating a new reindex, as well as for processing incremental batches
* within a reindex.
*
* When running a complete reindex you can provide any of the following
* - class (to limit to a single class)
* - verbose (optional)
*
* When running with a single batch, provide the following querystring arguments:
* - start
* - index
* - class
* - variantstate
* - verbose (optional)
*/
class Solr_Reindex extends Solr_BuildTask {
class Solr_Reindex extends BuildTask {
protected $enabled = true;
/**
* Number of records to load and index per request
@ -239,117 +312,94 @@ class Solr_Reindex extends BuildTask {
*/
private static $recordsPerRequest = 200;
public function run($request) {
increase_time_limit_to();
$self = get_class($this);
$verbose = isset($_GET['verbose']);
$originalState = SearchVariant::current_state();
if (isset($_GET['start'])) {
$this->runFrom(singleton($_GET['index']), $_GET['class'], $_GET['start'], json_decode($_GET['variantstate'], true));
}
else {
foreach(array('framework','sapphire') as $dirname) {
$script = sprintf("%s%s$dirname%scli-script.php", BASE_PATH, DIRECTORY_SEPARATOR, DIRECTORY_SEPARATOR);
if(file_exists($script)) {
break;
}
}
$class = get_class($this);
foreach (Solr::get_indexes() as $index => $instance) {
echo "Rebuilding {$instance->getIndexName()}\n\n";
$classes = $instance->getClasses();
if($request->getVar('class')) {
$limitClasses = explode(',', $request->getVar('class'));
$classes = array_intersect_key($classes, array_combine($limitClasses, $limitClasses));
}
if($classes) {
Solr::service($index)->deleteByQuery('ClassHierarchy:(' . implode(' OR ', array_keys($classes)) . ')');
}
foreach ($classes as $class => $options) {
$includeSubclasses = $options['include_children'];
foreach (SearchVariant::reindex_states($class, $includeSubclasses) as $state) {
if ($instance->variantStateExcluded($state)) continue;
SearchVariant::activate_state($state);
$filter = $includeSubclasses ? "" : '"ClassName" = \''.$class."'";
$singleton = singleton($class);
$query = $singleton->get($class,$filter,null);
$dtaQuery = $query->dataQuery();
$sqlQuery = $dtaQuery->getFinalisedQuery();
$singleton->extend('augmentSQL',$sqlQuery,$dtaQuery);
$total = $query->count();
$statevar = json_encode($state);
echo "Class: $class, total: $total";
echo ($statevar) ? " in state $statevar\n" : "\n";
if (strpos(PHP_OS, "WIN") !== false) $statevar = '"'.str_replace('"', '\\"', $statevar).'"';
else $statevar = "'".$statevar."'";
for ($offset = 0; $offset < $total; $offset += $this->stat('recordsPerRequest')) {
echo "$offset..";
$cmd = "php $script dev/tasks/$self index=$index class=$class start=$offset variantstate=$statevar";
if($verbose) {
echo "\n Running '$cmd'\n";
$cmd .= " verbose=1 2>&1";
}
$res = $verbose ? passthru($cmd) : `$cmd`;
if($verbose) echo " ".preg_replace('/\r\n|\n/', '$0 ', $res)."\n";
// If we're in dev mode, commit more often for fun and profit
if (Director::isDev()) Solr::service($index)->commit();
// This will slow down things a tiny bit, but it is done so that we don't timeout to the database during a reindex
DB::query('SELECT 1');
}
echo "\n";
}
}
Solr::service($index)->commit();
}
}
$originalState = SearchVariant::current_state();
/**
* Get the reindex handler
*
* @return SolrReindexHandler
*/
protected function getHandler() {
return Injector::inst()->get('SolrReindexHandler');
}
protected function runFrom($index, $class, $start, $variantstate) {
$classes = $index->getClasses();
$options = $classes[$class];
$verbose = isset($_GET['verbose']);
/**
* @param SS_HTTPRequest $request
*/
public function run($request) {
parent::run($request);
// Reset state
$originalState = SearchVariant::current_state();
$this->doReindex($request);
SearchVariant::activate_state($originalState);
}
/**
* @param SS_HTTPRequest $request
*/
protected function doReindex($request) {
$class = $request->getVar('class');
// Deprecated reindex mechanism
$start = $request->getVar('start');
if ($start !== null) {
// Run single batch directly
$indexInstance = singleton($request->getVar('index'));
$state = json_decode($request->getVar('variantstate'), true);
$this->runFrom($indexInstance, $class, $start, $state);
return;
}
// Check if we are re-indexing a single group
// If not using queuedjobs, we need to invoke Solr_Reindex as a separate process
// Otherwise each group is processed via a SolrReindexGroupJob
$groups = $request->getVar('groups');
$handler = $this->getHandler();
if($groups) {
// Run grouped batches (id % groups = group)
$group = $request->getVar('group');
$indexInstance = singleton($request->getVar('index'));
$state = json_decode($request->getVar('variantstate'), true);
$handler->runGroup($this->getLogger(), $indexInstance, $state, $class, $groups, $group);
return;
}
// If run at the top level, delegate to appropriate handler
$self = get_class($this);
$handler->triggerReindex($this->getLogger(), $this->config()->recordsPerRequest, $self, $class);
}
/**
* @deprecated since version 2.0.0
*/
protected function runFrom($index, $class, $start, $variantstate) {
DeprecationTest_Deprecation::notice('2.0.0', 'Solr_Reindex now uses a new grouping mechanism');
// Set time limit and state
increase_time_limit_to();
SearchVariant::activate_state($variantstate);
$includeSubclasses = $options['include_children'];
$filter = $includeSubclasses ? "" : '"ClassName" = \''.$class."'";
// Generate filtered list
$items = DataList::create($class)
->where($filter)
->limit($this->stat('recordsPerRequest'), $start);
->limit($this->config()->recordsPerRequest, $start);
if($verbose) echo "Adding $class";
foreach ($items as $item) {
if($verbose) echo $item->ID . ' ';
// Add child filter
$classes = $index->getClasses();
$options = $classes[$class];
if(!$options['include_children']) {
$items = $items->filter('ClassName', $class);
}
// Process selected records in this class
$this->getLogger()->info("Adding $class");
foreach ($items->sort("ID") as $item) {
$this->getLogger()->debug($item->ID);
// See SearchUpdater_ObjectHandler::triggerReindex
$item->triggerReindex();
$item->destroy();
}
if($verbose) echo "Done ";
$this->getLogger()->info("Done");
}
}

View File

@ -511,6 +511,39 @@ abstract class SolrIndex extends SearchIndex {
}
}
/**
* Clear all records which do not match the given classname whitelist.
*
* Can also be used to trim an index when reducing to a narrower set of classes.
*
* Ignores current state / variant.
*
* @param array $classes List of non-obsolete classes in the same format as SolrIndex::getClasses()
* @return bool Flag if successful
*/
public function clearObsoleteClasses($classes) {
if(empty($classes)) {
return false;
}
// Delete all records which do not match the necessary classname rules
$conditions = array();
foreach ($classes as $class => $options) {
if ($options['include_children']) {
$conditions[] = "ClassHierarchy:{$class}";
} else {
$conditions[] = "ClassName:{$class}";
}
}
// Delete records which don't match any of these conditions in this index
$deleteQuery = "-(" . implode(' ', $conditions) . ")";
$this
->getService()
->deleteByQuery($deleteQuery);
return true;
}
function commit() {
try {
$this->getService()->commit(false, false, false);
@ -543,32 +576,8 @@ abstract class SolrIndex extends SearchIndex {
$hlq = array(); // Highlight query
// Build the search itself
$q = $this->getQueryComponent($query, $hlq);
foreach ($query->search as $search) {
$text = $search['text'];
preg_match_all('/"[^"]*"|\S+/', $text, $parts);
$fuzzy = $search['fuzzy'] ? '~' : '';
foreach ($parts[0] as $part) {
$fields = (isset($search['fields'])) ? $search['fields'] : array();
if(isset($search['boost'])) {
$fields = array_merge($fields, array_keys($search['boost']));
}
if ($fields) {
$searchq = array();
foreach ($fields as $field) {
$boost = (isset($search['boost'][$field])) ? '^' . $search['boost'][$field] : '';
$searchq[] = "{$field}:".$part.$fuzzy.$boost;
}
$q[] = '+('.implode(' OR ', $searchq).')';
}
else {
$q[] = '+'.$part.$fuzzy;
}
$hlq[] = $part;
}
}
// If using boosting, set the clean term separately for highlighting.
// See https://issues.apache.org/jira/browse/SOLR-2632
if(array_key_exists('hl', $params) && !array_key_exists('hl.q', $params)) {
@ -576,64 +585,17 @@ abstract class SolrIndex extends SearchIndex {
}
// Filter by class if requested
$classq = array();
foreach ($query->classes as $class) {
if (!empty($class['includeSubclasses'])) $classq[] = 'ClassHierarchy:'.$class['class'];
if (!empty($class['includeSubclasses'])) {
$classq[] = 'ClassHierarchy:'.$class['class'];
}
else $classq[] = 'ClassName:'.$class['class'];
}
if ($classq) $fq[] = '+('.implode(' ', $classq).')';
// Filter by filters
foreach ($query->require as $field => $values) {
$requireq = array();
foreach ($values as $value) {
if ($value === SearchQuery::$missing) {
$requireq[] = "(*:* -{$field}:[* TO *])";
}
else if ($value === SearchQuery::$present) {
$requireq[] = "{$field}:[* TO *]";
}
else if ($value instanceof SearchQuery_Range) {
$start = $value->start; if ($start === null) $start = '*';
$end = $value->end; if ($end === null) $end = '*';
$requireq[] = "$field:[$start TO $end]";
}
else {
$requireq[] = $field.':"'.$value.'"';
}
}
$fq[] = '+('.implode(' ', $requireq).')';
}
foreach ($query->exclude as $field => $values) {
$excludeq = array();
$missing = false;
foreach ($values as $value) {
if ($value === SearchQuery::$missing) {
$missing = true;
}
else if ($value === SearchQuery::$present) {
$excludeq[] = "{$field}:[* TO *]";
}
else if ($value instanceof SearchQuery_Range) {
$start = $value->start; if ($start === null) $start = '*';
$end = $value->end; if ($end === null) $end = '*';
$excludeq[] = "$field:[$start TO $end]";
}
else {
$excludeq[] = $field.':"'.$value.'"';
}
}
$fq[] = ($missing ? "+{$field}:[* TO *] " : '') . '-('.implode(' ', $excludeq).')';
}
$fq = array_merge($fq, $this->getFiltersComponent($query));
// Prepare query fields unless specified explicitly
if(isset($params['qf'])) {
@ -739,6 +701,136 @@ abstract class SolrIndex extends SearchIndex {
return new ArrayData($ret);
}
/**
* Get the query (q) component for this search
*
* @param SearchQuery $searchQuery
* @param array &$hlq Highlight query returned by reference
* @return array
*/
protected function getQueryComponent(SearchQuery $searchQuery, &$hlq = array()) {
$q = array();
foreach ($searchQuery->search as $search) {
$text = $search['text'];
preg_match_all('/"[^"]*"|\S+/', $text, $parts);
$fuzzy = $search['fuzzy'] ? '~' : '';
foreach ($parts[0] as $part) {
$fields = (isset($search['fields'])) ? $search['fields'] : array();
if(isset($search['boost'])) {
$fields = array_merge($fields, array_keys($search['boost']));
}
if ($fields) {
$searchq = array();
foreach ($fields as $field) {
$boost = (isset($search['boost'][$field])) ? '^' . $search['boost'][$field] : '';
$searchq[] = "{$field}:".$part.$fuzzy.$boost;
}
$q[] = '+('.implode(' OR ', $searchq).')';
}
else {
$q[] = '+'.$part.$fuzzy;
}
$hlq[] = $part;
}
}
return $q;
}
/**
* Parse all require constraints for inclusion in a filter query
*
* @param SearchQuery $searchQuery
* @return array List of parsed string values for each require
*/
protected function getRequireFiltersComponent(SearchQuery $searchQuery) {
$fq = array();
foreach ($searchQuery->require as $field => $values) {
$requireq = array();
foreach ($values as $value) {
if ($value === SearchQuery::$missing) {
$requireq[] = "(*:* -{$field}:[* TO *])";
}
else if ($value === SearchQuery::$present) {
$requireq[] = "{$field}:[* TO *]";
}
else if ($value instanceof SearchQuery_Range) {
$start = $value->start;
if ($start === null) {
$start = '*';
}
$end = $value->end;
if ($end === null) {
$end = '*';
}
$requireq[] = "$field:[$start TO $end]";
}
else {
$requireq[] = $field.':"'.$value.'"';
}
}
$fq[] = '+('.implode(' ', $requireq).')';
}
return $fq;
}
/**
* Parse all exclude constraints for inclusion in a filter query
*
* @param SearchQuery $searchQuery
* @return array List of parsed string values for each exclusion
*/
protected function getExcludeFiltersComponent(SearchQuery $searchQuery) {
$fq = array();
foreach ($searchQuery->exclude as $field => $values) {
$excludeq = array();
$missing = false;
foreach ($values as $value) {
if ($value === SearchQuery::$missing) {
$missing = true;
}
else if ($value === SearchQuery::$present) {
$excludeq[] = "{$field}:[* TO *]";
}
else if ($value instanceof SearchQuery_Range) {
$start = $value->start;
if ($start === null) {
$start = '*';
}
$end = $value->end;
if ($end === null) {
$end = '*';
}
$excludeq[] = "$field:[$start TO $end]";
}
else {
$excludeq[] = $field.':"'.$value.'"';
}
}
$fq[] = ($missing ? "+{$field}:[* TO *] " : '') . '-('.implode(' ', $excludeq).')';
}
return $fq;
}
/**
* Get all filter conditions for this search
*
* @param SearchQuery $searchQuery
* @return array
*/
public function getFiltersComponent(SearchQuery $searchQuery) {
return array_merge(
$this->getRequireFiltersComponent($searchQuery),
$this->getExcludeFiltersComponent($searchQuery)
);
}
protected $service;
/**

View File

@ -0,0 +1,230 @@
<?php
use Psr\Log\LoggerInterface;
/**
* Base class for re-indexing of solr content
*/
abstract class SolrReindexBase implements SolrReindexHandler {
public function runReindex(LoggerInterface $logger, $batchSize, $taskName, $classes = null) {
foreach (Solr::get_indexes() as $indexInstance) {
$this->processIndex($logger, $indexInstance, $batchSize, $taskName, $classes);
}
}
/**
* Process index for a single SolrIndex instance
*
* @param LoggerInterface $logger
* @param SolrIndex $indexInstance
* @param int $batchSize
* @param string $taskName
* @param string $classes
*/
protected function processIndex(
LoggerInterface $logger, SolrIndex $indexInstance, $batchSize, $taskName, $classes = null
) {
// Filter classes for this index
$indexClasses = $this->getClassesForIndex($indexInstance, $classes);
// Clear all records in this index which do not contain the given classes
$logger->info("Clearing obsolete classes from ".$indexInstance->getIndexName());
$indexInstance->clearObsoleteClasses($indexClasses);
// Build queue for each class
foreach ($indexClasses as $class => $options) {
$includeSubclasses = $options['include_children'];
foreach (SearchVariant::reindex_states($class, $includeSubclasses) as $state) {
$this->processVariant($logger, $indexInstance, $state, $class, $includeSubclasses, $batchSize, $taskName);
}
}
}
/**
* Get valid classes and options for an index with an optional filter
*
* @param SolrIndex $index
* @param string|array $filterClasses Optional class or classes to limit to
* @return array List of classes, where the key is the classname and value is list of options
*/
protected function getClassesForIndex(SolrIndex $index, $filterClasses = null) {
// Get base classes
$classes = $index->getClasses();
if(!$filterClasses) {
return $classes;
}
// Apply filter
if(!is_array($filterClasses)) {
$filterClasses = explode(',', $filterClasses);
}
return array_intersect_key($classes, array_combine($filterClasses, $filterClasses));
}
/**
* Process re-index for a given variant state and class
*
* @param LoggerInterface $logger
* @param SolrIndex $indexInstance
* @param array $state Variant state
* @param string $class
* @param bool $includeSubclasses
* @param int $batchSize
* @param string $taskName
*/
protected function processVariant(
LoggerInterface $logger, SolrIndex $indexInstance, $state,
$class, $includeSubclasses, $batchSize, $taskName
) {
// Set state
SearchVariant::activate_state($state);
// Count records
$query = $class::get();
if(!$includeSubclasses) {
$query = $query->filter('ClassName', $class);
}
$total = $query->count();
// Skip this variant if nothing to process, or if there are no records
if ($total == 0 || $indexInstance->variantStateExcluded($state)) {
// Remove all records in the current state, since there are no groups to process
$logger->info("Clearing all records of type {$class} in the current state: " . json_encode($state));
$this->clearRecords($indexInstance, $class);
return;
}
// For each group, run processing
$groups = (int)(($total + $batchSize - 1) / $batchSize);
for ($group = 0; $group < $groups; $group++) {
$this->processGroup($logger, $indexInstance, $state, $class, $groups, $group, $taskName);
}
}
/**
* Initiate the processing of a single group
*
* @param LoggerInterface $logger
* @param SolrIndex $indexInstance Index instance
* @param array $state Variant state
* @param string $class Class to index
* @param int $groups Total groups
* @param int $group Index of group to process
* @param string $taskName Name of task script to run
*/
abstract protected function processGroup(
LoggerInterface $logger, SolrIndex $indexInstance, $state, $class, $groups, $group, $taskName
);
/**
* Explicitly invoke the process that performs the group
* processing. Can be run either by a background task or a queuedjob.
*
* Does not commit changes to the index, so this must be controlled externally.
*
* @param LoggerInterface $logger
* @param SolrIndex $indexInstance
* @param array $state
* @param string $class
* @param int $groups
* @param int $group
*/
public function runGroup(
LoggerInterface $logger, SolrIndex $indexInstance, $state, $class, $groups, $group
) {
// Set time limit and state
increase_time_limit_to();
SearchVariant::activate_state($state);
$logger->info("Adding $class");
// Prior to adding these records to solr, delete existing solr records
$this->clearRecords($indexInstance, $class, $groups, $group);
// Process selected records in this class
$items = $this->getRecordsInGroup($indexInstance, $class, $groups, $group);
$processed = array();
foreach ($items as $item) {
$processed[] = $item->ID;
// By this point, obsolete classes/states have been removed in processVariant
// and obsolete records have been removed in clearRecords
$indexInstance->add($item);
$item->destroy();
}
$logger->info("Updated ".implode(',', $processed));
// This will slow down things a tiny bit, but it is done so that we don't timeout to the database during a reindex
DB::query('SELECT 1');
$logger->info("Done");
}
/**
* Gets the datalist of records in the given group in the current state
*
* Assumes that the desired variant state is in effect.
*
* @param SolrIndex $indexInstance
* @param string $class
* @param int $groups
* @param int $group
* @return DataList
*/
protected function getRecordsInGroup(SolrIndex $indexInstance, $class, $groups, $group) {
// Generate filtered list of local records
$baseClass = ClassInfo::baseDataClass($class);
$items = DataList::create($class)
->where(sprintf(
'"%s"."ID" %% \'%d\' = \'%d\'',
$baseClass,
intval($groups),
intval($group)
))
->sort("ID");
// Add child filter
$classes = $indexInstance->getClasses();
$options = $classes[$class];
if(!$options['include_children']) {
$items = $items->filter('ClassName', $class);
}
return $items;
}
/**
* Clear all records of the given class in the current state ONLY.
*
* Optionally delete from a given group (where the group is defined as the ID % total groups)
*
* @param SolrIndex $indexInstance Index instance
* @param string $class Class name
* @param int $groups Number of groups, if clearing from a striped group
* @param int $group Group number, if clearing from a striped group
*/
protected function clearRecords(SolrIndex $indexInstance, $class, $groups = null, $group = null) {
// Clear by classname
$conditions = array("+(ClassHierarchy:{$class})");
// If grouping, delete from this group only
if($groups) {
$conditions[] = "+_query_:\"{!frange l={$group} u={$group}}mod(ID, {$groups})\"";
}
// Also filter by state (suffix on document ID)
$query = new SearchQuery();
SearchVariant::with($class)
->call('alterQuery', $query, $indexInstance);
if($query->isfiltered()) {
$conditions = array_merge($conditions, $indexInstance->getFiltersComponent($query));
}
// Invoke delete on index
$deleteQuery = implode(' ', $conditions);
$indexInstance
->getService()
->deleteByQuery($deleteQuery);
}
}

View File

@ -0,0 +1,42 @@
<?php
use Psr\Log\LoggerInterface;
/**
* Provides interface for queueing a solr reindex
*/
interface SolrReindexHandler {
/**
* Trigger a solr-reindex
*
* @param LoggerInterface $logger
* @param int $batchSize Records to run each process
* @param string $taskName Name of devtask to run
* @param string|array|null $classes Optional class or classes to limit index to
*/
public function triggerReindex(LoggerInterface $logger, $batchSize, $taskName, $classes = null);
/**
* Begin an immediate re-index
*
* @param LoggerInterface $logger
* @param int $batchSize Records to run each process
* @param string $taskName Name of devtask to run
* @param string|array|null $classes Optional class or classes to limit index to
*/
public function runReindex(LoggerInterface $logger, $batchSize, $taskName, $classes = null);
/**
* Do an immediate re-index on the given group, where the group is defined as the list of items
* where ID mod $groups = $group, in the given $state and optional $class filter.
*
* @param LoggerInterface $logger
* @param SolrIndex $indexInstance
* @param array $state
* @param string $class
* @param int $groups
* @param int $group
*/
public function runGroup(LoggerInterface $logger, SolrIndex $indexInstance, $state, $class, $groups, $group);
}

View File

@ -0,0 +1,74 @@
<?php
use Psr\Log\LoggerInterface;
/**
* Invokes an immediate reindex
*
* Internally batches of records will be invoked via shell tasks in the background
*/
class SolrReindexImmediateHandler extends SolrReindexBase {
public function triggerReindex(LoggerInterface $logger, $batchSize, $taskName, $classes = null) {
$this->runReindex($logger, $batchSize, $taskName, $classes);
}
protected function processIndex(
LoggerInterface $logger, SolrIndex $indexInstance, $batchSize, $taskName, $classes = null
) {
parent::processIndex($logger, $indexInstance, $batchSize, $taskName, $classes);
// Immediate processor needs to immediately commit after each index
$indexInstance->getService()->commit();
}
/**
* Process a single group.
*
* Without queuedjobs, it's necessary to shell this out to a background task as this is
* very memory intensive.
*
* The sub-process will then invoke $processor->runGroup() in {@see Solr_Reindex::doReindex}
*
* @param LoggerInterface $logger
* @param SolrIndex $indexInstance Index instance
* @param array $state Variant state
* @param string $class Class to index
* @param int $groups Total groups
* @param int $group Index of group to process
* @param string $taskName Name of task script to run
*/
protected function processGroup(
LoggerInterface $logger, SolrIndex $indexInstance, $state, $class, $groups, $group, $taskName
) {
// Build state
$statevar = json_encode($state);
if (strpos(PHP_OS, "WIN") !== false) {
$statevar = '"'.str_replace('"', '\\"', $statevar).'"';
} else {
$statevar = "'".$statevar."'";
}
// Build script
$indexName = $indexInstance->getIndexName();
$scriptPath = sprintf("%s%sframework%scli-script.php", BASE_PATH, DIRECTORY_SEPARATOR, DIRECTORY_SEPARATOR);
$scriptTask = "php {$scriptPath} dev/tasks/{$taskName}";
$cmd = "{$scriptTask} index={$indexName} class={$class} group={$group} groups={$groups} variantstate={$statevar}";
$cmd .= " verbose=1 2>&1";
$logger->info("Running '$cmd'");
// Execute script via shell
$res = $logger ? passthru($cmd) : `$cmd`;
if($logger) {
$logger->info(preg_replace('/\r\n|\n/', '$0 ', $res));
}
// If we're in dev mode, commit more often for fun and profit
if (Director::isDev()) {
Solr::service($indexName)->commit();
}
// This will slow down things a tiny bit, but it is done so that we don't timeout to the database during a reindex
DB::query('SELECT 1');
}
}

View File

@ -0,0 +1,40 @@
<?php
use Psr\Log\LoggerInterface;
if(!class_exists('MessageQueue')) return;
class SolrReindexMessageHandler extends SolrReindexImmediateHandler {
/**
* The MessageQueue to use when processing updates
* @config
* @var string
*/
private static $reindex_queue = "search_indexing";
public function triggerReindex(LoggerInterface $logger, $batchSize, $taskName, $classes = null) {
$queue = Config::inst()->get(__CLASS__, 'reindex_queue');
$logger->info('Queuing message');
MessageQueue::send(
$queue,
new MethodInvocationMessage('SolrReindexMessageHandler', 'run_reindex', $batchSize, $taskName, $classes)
);
}
/**
* Entry point for message queue
*
* @param int $batchSize
* @param string $taskName
* @param array|string|null $classes
*/
public static function run_reindex($batchSize, $taskName, $classes = null) {
// @todo Logger for message queue?
$logger = Injector::inst()->createWithArgs('Monolog\Logger', array(strtolower(get_class())));
$inst = Injector::inst()->get(get_class());
$inst->runReindex($logger, $batchSize, $taskName, $classes);
}
}

View File

@ -0,0 +1,94 @@
<?php
use Psr\Log\LoggerInterface;
if(!interface_exists('QueuedJob')) return;
/**
* Represents a queued task to start the reindex job
*/
class SolrReindexQueuedHandler extends SolrReindexBase {
/**
* @return QueuedJobService
*/
protected function getQueuedJobService() {
return singleton('QueuedJobService');
}
/**
* Cancel any cancellable jobs
*
* @param string $type Type of job to cancel
* @return int Number of jobs cleared
*/
protected function cancelExistingJobs($type) {
$clearable = array(
// Paused jobs need to be discarded
QueuedJob::STATUS_PAUSED,
// These types would be automatically started
QueuedJob::STATUS_NEW,
QueuedJob::STATUS_WAIT,
// Cancel any in-progress job
QueuedJob::STATUS_INIT,
QueuedJob::STATUS_RUN
);
DB::query(sprintf(
'UPDATE "QueuedJobDescriptor" '
. ' SET "JobStatus" = \'%s\''
. ' WHERE "JobStatus" IN (\'%s\')'
. ' AND "Implementation" = \'%s\'',
Convert::raw2sql(QueuedJob::STATUS_CANCELLED),
implode("','", Convert::raw2sql($clearable)),
Convert::raw2sql($type)
));
return DB::affectedRows();
}
public function triggerReindex(LoggerInterface $logger, $batchSize, $taskName, $classes = null) {
// Cancel existing jobs
$queues = $this->cancelExistingJobs('SolrReindexQueuedJob');
$groups = $this->cancelExistingJobs('SolrReindexGroupQueuedJob');
$logger->info("Cancelled {$queues} re-index tasks and {$groups} re-index groups");
// Although this class is used as a service (singleton) it may also be instantiated
// as a queuedjob
$job = Injector::inst()->create('SolrReindexQueuedJob', $batchSize, $taskName, $classes);
$this
->getQueuedJobService()
->queueJob($job);
$title = $job->getTitle();
$logger->info("Queued {$title}");
}
protected function processGroup(
LoggerInterface $logger, SolrIndex $indexInstance, $state, $class, $groups, $group, $taskName
) {
// Trigger another job for this group
$job = Injector::inst()->create(
'SolrReindexGroupQueuedJob',
$indexInstance->getIndexName(), $state, $class, $groups, $group
);
$this
->getQueuedJobService()
->queueJob($job);
$title = $job->getTitle();
$logger->info("Queued {$title}");
}
public function runGroup(
LoggerInterface $logger, SolrIndex $indexInstance, $state, $class, $groups, $group
) {
parent::runGroup($logger, $indexInstance, $state, $class, $groups, $group);
// After any changes have been made, mark all indexes as dirty for commit
// see http://stackoverflow.com/questions/7512945/how-to-fix-exceeded-limit-of-maxwarmingsearchers
$logger->info("Queuing commit on all changes");
SearchUpdateCommitJobProcessor::queue();
}
}

View File

@ -0,0 +1,117 @@
<?php
if(!interface_exists('QueuedJob')) return;
/**
* Queuedjob to re-index a small group within an index.
*
* This job is optimised for efficient full re-indexing of an index via Solr_Reindex.
*
* Operates similarly to {@see SearchUpdateQueuedJobProcessor} but can not work with an arbitrary
* list of IDs. Instead groups are segmented by ID. Additionally, this task does incremental
* deletions of records.
*/
class SolrReindexGroupQueuedJob extends SolrReindexQueuedJobBase {
/**
* Name of index to reindex
*
* @var string
*/
protected $indexName;
/**
* Variant state that this group belongs to
*
* @var type
*/
protected $state;
/**
* Single class name to index
*
* @var string
*/
protected $class;
/**
* Total number of groups
*
* @var int
*/
protected $groups;
/**
* Group index
*
* @var int
*/
protected $group;
public function __construct($indexName = null, $state = null, $class = null, $groups = null, $group = null) {
parent::__construct();
$this->indexName = $indexName;
$this->state = $state;
$this->class = $class;
$this->groups = $groups;
$this->group = $group;
}
public function getJobData() {
$data = parent::getJobData();
// Custom data
$data->jobData->indexName = $this->indexName;
$data->jobData->state = $this->state;
$data->jobData->class = $this->class;
$data->jobData->groups = $this->groups;
$data->jobData->group = $this->group;
return $data;
}
public function setJobData($totalSteps, $currentStep, $isComplete, $jobData, $messages) {
parent::setJobData($totalSteps, $currentStep, $isComplete, $jobData, $messages);
// Custom data
$this->indexName = $jobData->indexName;
$this->state = $jobData->state;
$this->class = $jobData->class;
$this->groups = $jobData->groups;
$this->group = $jobData->group;
}
public function getSignature() {
return md5(get_class($this) . time() . mt_rand(0, 100000));
}
public function getTitle() {
return sprintf(
'Solr Reindex Group (%d/%d) of %s in %s',
($this->group+1),
$this->groups,
$this->class,
json_encode($this->state)
);
}
public function process() {
$logger = $this->getLogger();
if($this->jobFinished()) {
$logger->notice("reindex group already complete");
return;
}
// Get instance of index
$indexInstance = singleton($this->indexName);
// Send back to processor
$logger->info("Beginning reindex group");
$this
->getHandler()
->runGroup($logger, $indexInstance, $this->state, $this->class, $this->groups, $this->group);
$logger->info("Completed reindex group");
$this->isComplete = true;
}
}

View File

@ -0,0 +1,91 @@
<?php
if(!interface_exists('QueuedJob')) return;
/**
* Represents a queuedjob which invokes a reindex
*/
class SolrReindexQueuedJob extends SolrReindexQueuedJobBase {
/**
* Size of each batch to run
*
* @var int
*/
protected $batchSize;
/**
* Name of devtask Which invoked this
* Not necessary for re-index processing performed entirely by queuedjobs
*
* @var string
*/
protected $taskName;
/**
* List of classes to filter
*
* @var array|string
*/
protected $classes;
public function __construct($batchSize = null, $taskName = null, $classes = null) {
$this->batchSize = $batchSize;
$this->taskName = $taskName;
$this->classes = $classes;
parent::__construct();
}
public function getJobData() {
$data = parent::getJobData();
// Custom data
$data->jobData->batchSize = $this->batchSize;
$data->jobData->taskName = $this->taskName;
$data->jobData->classes = $this->classes;
return $data;
}
public function setJobData($totalSteps, $currentStep, $isComplete, $jobData, $messages) {
parent::setJobData($totalSteps, $currentStep, $isComplete, $jobData, $messages);
// Custom data
$this->batchSize = $jobData->batchSize;
$this->taskName = $jobData->taskName;
$this->classes = $jobData->classes;
}
public function getSignature() {
return __CLASS__;
}
public function getTitle() {
return 'Solr Reindex Job';
}
public function process() {
$logger = $this->getLogger();
if($this->jobFinished()) {
$logger->notice("reindex already complete");
return;
}
// Send back to processor
$logger->info("Beginning init of reindex");
$this
->getHandler()
->runReindex($logger, $this->batchSize, $this->taskName, $this->classes);
$logger->info("Completed init of reindex");
$this->isComplete = true;
}
/**
* Get size of batch
*
* @return int
*/
public function getBatchSize() {
return $this->batchSize;
}
}

View File

@ -0,0 +1,123 @@
<?php
use Monolog\Logger;
use Psr\Log\LoggerInterface;
if(!interface_exists('QueuedJob')) return;
/**
* Base class for jobs which perform re-index
*/
abstract class SolrReindexQueuedJobBase implements QueuedJob {
/**
* Flag whether this job is done
*
* @var bool
*/
protected $isComplete;
/**
* List of messages
*
* @var array
*/
protected $messages;
/**
* Logger to use for this job
*
* @var LoggerInterface
*/