From 60b05db1ceaa26486de9c9fc46cb93f77d349a3e Mon Sep 17 00:00:00 2001 From: Damian Mooyman Date: Thu, 7 May 2015 15:16:27 +1200 Subject: [PATCH 1/2] API Separate searchupdate / commit into separate queued-jobs API Enable indexes to deprecate other classes API Enable indexes to be globally specified in config --- code/search/FullTextSearch.php | 46 +++- code/search/SearchIndex.php | 64 +++-- .../SearchUpdateBatchedProcessor.php | 22 +- .../SearchUpdateCommitJobProcessor.php | 247 ++++++++++++++++++ .../SearchUpdateQueuedJobProcessor.php | 18 +- docs/en/index.md | 26 ++ tests/BatchedProcessorTest.php | 93 ++++++- 7 files changed, 468 insertions(+), 48 deletions(-) create mode 100644 code/search/processors/SearchUpdateCommitJobProcessor.php diff --git a/code/search/FullTextSearch.php b/code/search/FullTextSearch.php index b90ff77..d0616c5 100644 --- a/code/search/FullTextSearch.php +++ b/code/search/FullTextSearch.php @@ -9,6 +9,15 @@ class FullTextSearch { static protected $indexes_by_subclass = array(); + /** + * Optional list of index names to limit to. If left empty, all subclasses of SearchIndex + * will be used + * + * @var array + * @config + */ + private static $indexes = array(); + /** * Get all the instantiable search indexes (so all the user created indexes, but not the connector or library level * abstract indexes). Can optionally be filtered to only return indexes that are subclasses of some class @@ -18,16 +27,43 @@ class FullTextSearch { * @param bool $rebuild - If true, don't use cached values */ static function get_indexes($class = null, $rebuild = false) { - if ($rebuild) { self::$all_indexes = null; self::$indexes_by_subclass = array(); } + if ($rebuild) { + self::$all_indexes = null; + self::$indexes_by_subclass = array(); + } if (!$class) { if (self::$all_indexes === null) { - $classes = ClassInfo::subclassesFor('SearchIndex'); + // Get declared indexes, or otherwise default to all subclasses of SearchIndex + $classes = Config::inst()->get(__CLASS__, 'indexes') + ?: ClassInfo::subclassesFor('SearchIndex'); - $concrete = array(); + $hidden = array(); + $candidates = array(); foreach ($classes as $class) { + // Check if this index is disabled + $hides = $class::config()->hide_ancestor; + if($hides) { + $hidden[] = $hides; + } + + // Check if this index is abstract $ref = new ReflectionClass($class); - if ($ref->isInstantiable()) $concrete[$class] = singleton($class); + if (!$ref->isInstantiable()) { + continue; + } + + $candidates[] = $class; + } + + if($hidden) { + $candidates = array_diff($candidates, $hidden); + } + + // Create all indexes + $concrete = array(); + foreach($candidates as $class) { + $concrete[$class] = singleton($class); } self::$all_indexes = $concrete; @@ -60,6 +96,8 @@ class FullTextSearch { * From then on, fulltext search system will only see those indexes passed in this most recent call. * * Passing in no arguments resets back to automatic index list + * + * Alternatively you can use `FullTextSearch.indexes` to configure a list of indexes via config. */ static function force_index_list() { $indexes = func_get_args(); diff --git a/code/search/SearchIndex.php b/code/search/SearchIndex.php index 2c650ef..6c547b6 100644 --- a/code/search/SearchIndex.php +++ b/code/search/SearchIndex.php @@ -29,7 +29,16 @@ */ abstract class SearchIndex extends ViewableData { - function __construct() { + /** + * Allows this index to hide a parent index. Specifies the name of a parent index to disable + * + * @var string + * @config + */ + private static $hide_ancestor; + + public function __construct() { + parent::__construct(); $this->init(); foreach ($this->getClasses() as $class => $options) { @@ -39,7 +48,7 @@ abstract class SearchIndex extends ViewableData { $this->buildDependancyList(); } - function __toString() { + public function __toString() { return 'Search Index ' . get_class($this); } @@ -47,7 +56,7 @@ abstract class SearchIndex extends ViewableData { * Examines the classes this index is built on to try and find defined fields in the class hierarchy for those classes. * Looks for db and viewable-data fields, although can't nessecarily find type for viewable-data fields. */ - function fieldData($field, $forceType = null, $extraOptions = array()) { + public function fieldData($field, $forceType = null, $extraOptions = array()) { $fullfield = str_replace(".", "_", $field); $sources = $this->getClasses(); @@ -287,7 +296,7 @@ abstract class SearchIndex extends ViewableData { public $dependancyList = array(); - function buildDependancyList() { + public function buildDependancyList() { $this->dependancyList = array_keys($this->getClasses()); foreach ($this->getFieldsIterator() as $name => $field) { @@ -302,7 +311,7 @@ abstract class SearchIndex extends ViewableData { * Returns an array where each member is all the fields and the classes that are at the end of some * specific lookup chain from one of the base classes */ - function getDerivedFields() { + public function getDerivedFields() { if ($this->derivedFields === null) { $this->derivedFields = array(); @@ -341,7 +350,7 @@ abstract class SearchIndex extends ViewableData { * @param Array $state - The variant state of the object * @return string - The document ID as a string */ - function getDocumentIDForState($base, $id, $state) { + public function getDocumentIDForState($base, $id, $state) { ksort($state); $parts = array('id' => $id, 'base' => $base, 'state' => json_encode($state)); return implode('-', array_values($parts)); @@ -355,7 +364,7 @@ abstract class SearchIndex extends ViewableData { * @param Boolean $includesubs - TODO: Probably going away * @return string - The document ID as a string */ - function getDocumentID($object, $base, $includesubs) { + public function getDocumentID($object, $base, $includesubs) { return $this->getDocumentIDForState($base, $object->ID, SearchVariant::current_state($base, $includesubs)); } @@ -433,7 +442,7 @@ abstract class SearchIndex extends ViewableData { * @param $fields * @return array */ - function getDirtyIDs($class, $id, $statefulids, $fields) { + public function getDirtyIDs($class, $id, $statefulids, $fields) { $dirty = array(); // First, if this object is directly contained in the index, add it @@ -499,10 +508,10 @@ abstract class SearchIndex extends ViewableData { /** !! These should be implemented by the full text search engine */ - abstract function add($object) ; - abstract function delete($base, $id, $state) ; + abstract public function add($object) ; + abstract public function delete($base, $id, $state) ; - abstract function commit(); + abstract public function commit(); /** !! These should be implemented by the specific index */ @@ -511,7 +520,7 @@ abstract class SearchIndex extends ViewableData { * Used instead of overriding __construct as we have specific execution order - code that has * to be run before _and/or_ after this. */ - abstract function init(); + abstract public function init(); } /** @@ -519,11 +528,11 @@ abstract class SearchIndex extends ViewableData { */ abstract class SearchIndex_Null extends SearchIndex { - function add($object) { } + public function add($object) { } - function delete($base, $id, $state) { } + public function delete($base, $id, $state) { } - function commit() { } + public function commit() { } } @@ -534,13 +543,15 @@ abstract class SearchIndex_Recording extends SearchIndex { public $added = array(); public $deleted = array(); + public $committed = false; - function reset() { + public function reset() { $this->added = array(); $this->deleted = array(); + $this->committed = false; } - function add($object) { + public function add($object) { $res = array(); $res['ID'] = $object->ID; @@ -553,7 +564,7 @@ abstract class SearchIndex_Recording extends SearchIndex { $this->added[] = $res; } - function getAdded($fields = array()) { + public function getAdded($fields = array()) { $res = array(); foreach ($this->added as $added) { @@ -567,14 +578,25 @@ abstract class SearchIndex_Recording extends SearchIndex { return $res; } - function delete($base, $id, $state) { + public function delete($base, $id, $state) { $this->deleted[] = array('base' => $base, 'id' => $id, 'state' => $state); } - function commit() { } + public function commit() { + $this->committed = true; + } - function getIndexName() { + public function getIndexName() { return get_class($this); } + + public function getIsCommitted() { + return $this->committed; + } + + public function getService() { + // Causes commits to the service to be redirected back to the same object + return $this; + } } diff --git a/code/search/processors/SearchUpdateBatchedProcessor.php b/code/search/processors/SearchUpdateBatchedProcessor.php index 6b49c7c..17845e3 100644 --- a/code/search/processors/SearchUpdateBatchedProcessor.php +++ b/code/search/processors/SearchUpdateBatchedProcessor.php @@ -52,19 +52,6 @@ abstract class SearchUpdateBatchedProcessor extends SearchUpdateProcessor { $this->setBatch(0); } - protected function commitIndex($index) { - $name = $index->getIndexName(); - - // If this is a resurrected batch then it's not necessary to commit the index - // twice, assuming it has successfully been comitted before - if(isset($this->completedIndexes[$name])) return true; - - // Commit index and mark as completed - $result = parent::commitIndex($index); - if($result) $this->completedIndexes[$name] = $name; - return $result; - } - /** * Set the current batch index * @@ -72,7 +59,6 @@ abstract class SearchUpdateBatchedProcessor extends SearchUpdateProcessor { */ protected function setBatch($batch) { $this->currentBatch = $batch; - $this->completedIndexes = array(); } protected function getSource() { @@ -93,12 +79,12 @@ abstract class SearchUpdateBatchedProcessor extends SearchUpdateProcessor { // Don't re-process completed queue if($this->currentBatch >= count($this->batches)) return true; - // Process batch - $result = parent::process(); + // Send current patch to indexes + $this->prepareIndexes(); // Advance to next batch if successful - if($result) $this->setBatch($this->currentBatch + 1); - return $result; + $this->setBatch($this->currentBatch + 1); + return true; } /** diff --git a/code/search/processors/SearchUpdateCommitJobProcessor.php b/code/search/processors/SearchUpdateCommitJobProcessor.php new file mode 100644 index 0000000..fde3382 --- /dev/null +++ b/code/search/processors/SearchUpdateCommitJobProcessor.php @@ -0,0 +1,247 @@ +create(__CLASS__); + singleton('QueuedJobService')->queueJob($commit, $startAfter); + + if($dirty) { + $indexes = FullTextSearch::get_indexes(); + static::$dirty_indexes = array_keys($indexes); + } + return $commit; + } + + public function getJobType() { + return Config::inst()->get(__CLASS__, 'commit_queue'); + } + + public function getSignature() { + return md5(get_class($this) . time() . mt_rand(0, 100000)); + } + + public function getTitle() { + return "FullTextSearch Commit Job"; + } + + /** + * Get the list of index names we should process + * + * @return array + */ + public function getAllIndexes() { + if(empty($this->indexes)) { + $indexes = FullTextSearch::get_indexes(); + $this->indexes = array_keys($indexes); + } + return $this->indexes; + } + + public function jobFinished() { + // If we've indexed exactly as many as we would like, we are done + return $this->skipped + || (count($this->getAllIndexes()) <= count($this->completed)); + } + + public function prepareForRestart() { + // NOOP + } + + public function afterComplete() { + // NOOP + } + + /** + * Abort this job, potentially rescheduling a replacement if there is still work to do + */ + protected function discardJob() { + $this->skipped = true; + + // If we do not have dirty records, then assume that these dirty records were committed + // already this request (but probably another job), so we don't need to commit anything else. + // This could occur if we completed multiple searchupdate jobs in a prior request, and + // we only need one commit job to commit all of them in the current request. + if(empty(static::$dirty_indexes)) { + $this->addMessage("Indexing already completed this request: Discarding this job"); + return; + } + + + // If any commit has run, but some (or all) indexes are un-comitted, we must re-schedule this task. + // This could occur if we completed a searchupdate job in a prior request, as well as in + // the current request + $cooldown = Config::inst()->get(__CLASS__, 'cooldown'); + $now = new DateTime(SS_Datetime::now()->getValue()); + $now->add(new DateInterval('PT'.$cooldown.'S')); + $runat = $now->Format('Y-m-d H:i:s'); + + $this->addMessage("Indexing already run this request, but incomplete. Re-scheduling for {$runat}"); + + // Queue after the given cooldown + static::queue(false, $runat); + } + + public function process() { + // If we have already run an instance of SearchUpdateCommitJobProcessor this request, immediately + // quit this job to prevent hitting warming search limits in Solr + if(static::$has_run) { + $this->discardJob(); + return true; + } + + // To prevent other commit jobs from running this request + static::$has_run = true; + + // Run all incompleted indexes + $indexNames = $this->getAllIndexes(); + foreach ($indexNames as $name) { + $index = singleton($name); + $this->commitIndex($index); + } + + $this->addMessage("All indexes committed"); + + return true; + } + + /** + * Commits a specific index + * + * @param SolrIndex $index + * @throws Exception + */ + protected function commitIndex($index) { + // Skip index if this is already complete + $name = get_class($index); + if(in_array($name, $this->completed)) { + $this->addMessage("Skipping already comitted index {$name}"); + return; + } + + // Bypass SolrIndex::commit exception handling so that queuedjobs can handle the error + $this->addMessage("Committing index {$name}"); + $index->getService()->commit(false, false, false); + $this->addMessage("Committing index {$name} was successful"); + + // If this index is currently marked as dirty, it's now clean + if(in_array($name, static::$dirty_indexes)) { + static::$dirty_indexes = array_diff(static::$dirty_indexes, array($name)); + } + + // Mark complete + $this->completed[] = $name; + } + + public function setup() { + // NOOP + } + + public function getJobData() { + $data = new stdClass(); + $data->totalSteps = count($this->getAllIndexes()); + $data->currentStep = count($this->completed); + $data->isComplete = $this->jobFinished(); + $data->messages = $this->messages; + + $data->jobData = new stdClass(); + $data->jobData->skipped = $this->skipped; + $data->jobData->completed = $this->completed; + $data->jobData->indexes = $this->getAllIndexes(); + + return $data; + } + + public function setJobData($totalSteps, $currentStep, $isComplete, $jobData, $messages) { + $this->isComplete = $isComplete; + $this->messages = $messages; + + $this->skipped = $jobData->skipped; + $this->completed = $jobData->completed; + $this->indexes = $jobData->indexes; + } + + public function addMessage($message, $severity='INFO') { + $severity = strtoupper($severity); + $this->messages[] = '[' . date('Y-m-d H:i:s') . "][$severity] $message"; + } + + public function getMessages() { + return $this->messages; + } + +} diff --git a/code/search/processors/SearchUpdateQueuedJobProcessor.php b/code/search/processors/SearchUpdateQueuedJobProcessor.php index d8d37f8..4214061 100644 --- a/code/search/processors/SearchUpdateQueuedJobProcessor.php +++ b/code/search/processors/SearchUpdateQueuedJobProcessor.php @@ -7,7 +7,7 @@ class SearchUpdateQueuedJobProcessor extends SearchUpdateBatchedProcessor implem /** * The QueuedJob queue to use when processing updates * @config - * @var string + * @var int */ private static $reindex_queue = 2; // QueuedJob::QUEUED; @@ -43,7 +43,9 @@ class SearchUpdateQueuedJobProcessor extends SearchUpdateBatchedProcessor implem } public function afterComplete() { - // NOP + // Once indexing is complete, commit later in order to avoid solr limits + // see http://stackoverflow.com/questions/7512945/how-to-fix-exceeded-limit-of-maxwarmingsearchers + SearchUpdateCommitJobProcessor::queue(); } public function getJobData() { @@ -56,7 +58,6 @@ class SearchUpdateQueuedJobProcessor extends SearchUpdateBatchedProcessor implem $data->jobData = new stdClass(); $data->jobData->batches = $this->batches; $data->jobData->currentBatch = $this->currentBatch; - $data->jobData->completedIndexes = $this->completedIndexes; return $data; } @@ -67,11 +68,20 @@ class SearchUpdateQueuedJobProcessor extends SearchUpdateBatchedProcessor implem $this->batches = $jobData->batches; $this->currentBatch = $jobData->currentBatch; - $this->completedIndexes = $jobData->completedIndexes; } public function addMessage($message, $severity='INFO') { $severity = strtoupper($severity); $this->messages[] = '[' . date('Y-m-d H:i:s') . "][$severity] $message"; } + + public function process() { + $result = parent::process(); + + if($this->jobFinished()) { + $this->addMessage("All batched updates complete. Queuing commit job"); + } + + return $result; + } } diff --git a/docs/en/index.md b/docs/en/index.md index 2b2ae37..645a8c9 100644 --- a/docs/en/index.md +++ b/docs/en/index.md @@ -121,6 +121,32 @@ As you can only search one index at a time, all searchable classes need to be in } } +## Using Multiple Indexes + +Multiple indexes can be created and searched independently, but if you wish to override an existing +index with another, you can use the `$hide_ancestor` config. + + :::php + class MyReplacementIndex extends MyIndex { + private static $hide_ancestor = 'MyIndex'; + + public function init() { + parent::init(); + $this->addClass('File'); + $this->addFulltextField('Title'); + } + } + +You can also filter all indexes globally to a set of pre-defined classes if you wish to +prevent any unknown indexes from being automatically included. + + :::yaml + FullTextSearch: + indexes: + - MyReplacementIndex + - CoreSearchIndex + + ## Indexing Relationships TODO diff --git a/tests/BatchedProcessorTest.php b/tests/BatchedProcessorTest.php index e8f5d09..939cead 100644 --- a/tests/BatchedProcessorTest.php +++ b/tests/BatchedProcessorTest.php @@ -14,6 +14,21 @@ class BatchedProcessorTest_Index extends SearchIndex_Recording implements TestOn } } +class BatchedProcessor_QueuedJobService { + protected $jobs = array(); + + public function queueJob(QueuedJob $job, $startAfter = null, $userId = null, $queueName = null) { + $this->jobs[] = array( + 'job' => $job, + 'startAfter' => $startAfter + ); + } + + public function getJobs() { + return $this->jobs; + } +} + /** * Tests {@see SearchUpdateQueuedJobProcessor} */ @@ -25,6 +40,13 @@ class BatchedProcessorTest extends SapphireTest { 'BatchedProcessorTest_Object' ); + protected $illegalExtensions = array( + 'SiteTree' => array( + 'SiteTreeSubsites', + 'Translatable' + ) + ); + public function setUp() { parent::setUp(); Config::nest(); @@ -33,11 +55,22 @@ class BatchedProcessorTest extends SapphireTest { $this->markTestSkipped("These tests need the QueuedJobs module installed to run"); $this->skipTest = true; } + + + SS_Datetime::set_mock_now('2015-05-07 06:00:00'); Config::inst()->update('SearchUpdateBatchedProcessor', 'batch_size', 5); Config::inst()->update('SearchUpdateBatchedProcessor', 'batch_soft_cap', 0); + Config::inst()->update('SearchUpdateCommitJobProcessor', 'cooldown', 600); Versioned::reading_stage("Stage"); + + Injector::inst()->registerService(new BatchedProcessor_QueuedJobService(), 'QueuedJobService'); + + FullTextSearch::force_index_list('BatchedProcessorTest_Index'); + + SearchUpdateCommitJobProcessor::$dirty_indexes = array(); + SearchUpdateCommitJobProcessor::$has_run = false; $this->oldProcessor = SearchUpdater::$processor; SearchUpdater::$processor = new SearchUpdateQueuedJobProcessor(); @@ -47,9 +80,14 @@ class BatchedProcessorTest extends SapphireTest { SearchUpdater::$processor = $this->oldProcessor; Config::unnest(); + Injector::inst()->unregisterNamedObject('QueuedJobService'); + FullTextSearch::force_index_list(); parent::tearDown(); } - + + /** + * @return SearchUpdateQueuedJobProcessor + */ protected function generateDirtyIds() { $processor = SearchUpdater::$processor; for($id = 1; $id <= 42; $id++) { @@ -100,7 +138,60 @@ class BatchedProcessorTest extends SapphireTest { $this->assertEquals(9, $data->currentStep); $this->assertEquals(42, count($index->getAdded())); $this->assertTrue($data->isComplete); + + // Check any additional queued jobs + $processor->afterComplete(); + $service = singleton('QueuedJobService'); + $jobs = $service->getJobs(); + $this->assertEquals(1, count($jobs)); + $this->assertInstanceOf('SearchUpdateCommitJobProcessor', $jobs[0]['job']); } + + /** + * Test creation of multiple commit jobs + */ + public function testMultipleCommits() { + $index = singleton('BatchedProcessorTest_Index'); + $index->reset(); + + // Test that running a commit immediately after submitting to the indexes + // correctly commits + $first = SearchUpdateCommitJobProcessor::queue(); + $second = SearchUpdateCommitJobProcessor::queue(); + + $this->assertFalse($index->getIsCommitted()); + + // First process will cause the commit + $this->assertFalse($first->jobFinished()); + $first->process(); + $allMessages = $first->getMessages(); + $this->assertTrue($index->getIsCommitted()); + $this->assertTrue($first->jobFinished()); + $this->assertStringEndsWith('All indexes committed', $allMessages[2]); + + // Executing the subsequent processor should not re-trigger a commit + $index->reset(); + $this->assertFalse($second->jobFinished()); + $second->process(); + $allMessages = $second->getMessages(); + $this->assertFalse($index->getIsCommitted()); + $this->assertTrue($second->jobFinished()); + $this->assertStringEndsWith('Indexing already completed this request: Discarding this job', $allMessages[0]); + + // Given that a third job is created, and the indexes are dirtied, attempting to run this job + // should result in a delay + $index->reset(); + $third = SearchUpdateCommitJobProcessor::queue(); + $this->assertFalse($third->jobFinished()); + $third->process(); + $this->assertTrue($third->jobFinished()); + $allMessages = $third->getMessages(); + $this->assertStringEndsWith( + 'Indexing already run this request, but incomplete. Re-scheduling for 2015-05-07 06:10:00', + $allMessages[0] + ); + } + /** * Tests that the batch_soft_cap setting is properly respected From 16a6f362e2714212431aeddbacd11ba8290551cf Mon Sep 17 00:00:00 2001 From: Damian Mooyman Date: Mon, 11 May 2015 16:35:43 +1200 Subject: [PATCH 2/2] API Only allow one scheduled commit job at a time --- .../processors/SearchUpdateCommitJobProcessor.php | 10 ++++++---- tests/BatchedProcessorTest.php | 1 + 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/code/search/processors/SearchUpdateCommitJobProcessor.php b/code/search/processors/SearchUpdateCommitJobProcessor.php index fde3382..f14743c 100644 --- a/code/search/processors/SearchUpdateCommitJobProcessor.php +++ b/code/search/processors/SearchUpdateCommitJobProcessor.php @@ -74,17 +74,17 @@ class SearchUpdateCommitJobProcessor implements QueuedJob { * @param boolean $dirty Marks all indexes as dirty by default. Set to false if there are known comitted and * clean indexes * @param string $startAfter Start date - * @return static The queued job + * @return int The ID of the next queuedjob to run. This could be a new one or an existing one. */ public static function queue($dirty = true, $startAfter = null) { $commit = Injector::inst()->create(__CLASS__); - singleton('QueuedJobService')->queueJob($commit, $startAfter); + $id = singleton('QueuedJobService')->queueJob($commit, $startAfter); if($dirty) { $indexes = FullTextSearch::get_indexes(); static::$dirty_indexes = array_keys($indexes); } - return $commit; + return $id; } public function getJobType() { @@ -92,7 +92,9 @@ class SearchUpdateCommitJobProcessor implements QueuedJob { } public function getSignature() { - return md5(get_class($this) . time() . mt_rand(0, 100000)); + // There is only ever one commit job on the queue so the signature is consistent + // See QueuedJobService::queueJob() for the code that prevents duplication + return __CLASS__; } public function getTitle() { diff --git a/tests/BatchedProcessorTest.php b/tests/BatchedProcessorTest.php index 939cead..6f53807 100644 --- a/tests/BatchedProcessorTest.php +++ b/tests/BatchedProcessorTest.php @@ -22,6 +22,7 @@ class BatchedProcessor_QueuedJobService { 'job' => $job, 'startAfter' => $startAfter ); + return $job; } public function getJobs() {