API Separate searchupdate / commit into separate queued-jobs

API Enable indexes to deprecate other classes
API Enable indexes to be globally specified in config
This commit is contained in:
Damian Mooyman 2015-05-07 15:16:27 +12:00
parent 741aab1208
commit 60b05db1ce
7 changed files with 468 additions and 48 deletions

View File

@ -9,6 +9,15 @@ class FullTextSearch {
static protected $indexes_by_subclass = array();
/**
* Optional list of index names to limit to. If left empty, all subclasses of SearchIndex
* will be used
*
* @var array
* @config
*/
private static $indexes = array();
/**
* Get all the instantiable search indexes (so all the user created indexes, but not the connector or library level
* abstract indexes). Can optionally be filtered to only return indexes that are subclasses of some class
@ -18,16 +27,43 @@ class FullTextSearch {
* @param bool $rebuild - If true, don't use cached values
*/
static function get_indexes($class = null, $rebuild = false) {
if ($rebuild) { self::$all_indexes = null; self::$indexes_by_subclass = array(); }
if ($rebuild) {
self::$all_indexes = null;
self::$indexes_by_subclass = array();
}
if (!$class) {
if (self::$all_indexes === null) {
$classes = ClassInfo::subclassesFor('SearchIndex');
// Get declared indexes, or otherwise default to all subclasses of SearchIndex
$classes = Config::inst()->get(__CLASS__, 'indexes')
?: ClassInfo::subclassesFor('SearchIndex');
$concrete = array();
$hidden = array();
$candidates = array();
foreach ($classes as $class) {
// Check if this index is disabled
$hides = $class::config()->hide_ancestor;
if($hides) {
$hidden[] = $hides;
}
// Check if this index is abstract
$ref = new ReflectionClass($class);
if ($ref->isInstantiable()) $concrete[$class] = singleton($class);
if (!$ref->isInstantiable()) {
continue;
}
$candidates[] = $class;
}
if($hidden) {
$candidates = array_diff($candidates, $hidden);
}
// Create all indexes
$concrete = array();
foreach($candidates as $class) {
$concrete[$class] = singleton($class);
}
self::$all_indexes = $concrete;
@ -60,6 +96,8 @@ class FullTextSearch {
* From then on, fulltext search system will only see those indexes passed in this most recent call.
*
* Passing in no arguments resets back to automatic index list
*
* Alternatively you can use `FullTextSearch.indexes` to configure a list of indexes via config.
*/
static function force_index_list() {
$indexes = func_get_args();

View File

@ -29,7 +29,16 @@
*/
abstract class SearchIndex extends ViewableData {
function __construct() {
/**
* Allows this index to hide a parent index. Specifies the name of a parent index to disable
*
* @var string
* @config
*/
private static $hide_ancestor;
public function __construct() {
parent::__construct();
$this->init();
foreach ($this->getClasses() as $class => $options) {
@ -39,7 +48,7 @@ abstract class SearchIndex extends ViewableData {
$this->buildDependancyList();
}
function __toString() {
public function __toString() {
return 'Search Index ' . get_class($this);
}
@ -47,7 +56,7 @@ abstract class SearchIndex extends ViewableData {
* Examines the classes this index is built on to try and find defined fields in the class hierarchy for those classes.
* Looks for db and viewable-data fields, although can't nessecarily find type for viewable-data fields.
*/
function fieldData($field, $forceType = null, $extraOptions = array()) {
public function fieldData($field, $forceType = null, $extraOptions = array()) {
$fullfield = str_replace(".", "_", $field);
$sources = $this->getClasses();
@ -287,7 +296,7 @@ abstract class SearchIndex extends ViewableData {
public $dependancyList = array();
function buildDependancyList() {
public function buildDependancyList() {
$this->dependancyList = array_keys($this->getClasses());
foreach ($this->getFieldsIterator() as $name => $field) {
@ -302,7 +311,7 @@ abstract class SearchIndex extends ViewableData {
* Returns an array where each member is all the fields and the classes that are at the end of some
* specific lookup chain from one of the base classes
*/
function getDerivedFields() {
public function getDerivedFields() {
if ($this->derivedFields === null) {
$this->derivedFields = array();
@ -341,7 +350,7 @@ abstract class SearchIndex extends ViewableData {
* @param Array $state - The variant state of the object
* @return string - The document ID as a string
*/
function getDocumentIDForState($base, $id, $state) {
public function getDocumentIDForState($base, $id, $state) {
ksort($state);
$parts = array('id' => $id, 'base' => $base, 'state' => json_encode($state));
return implode('-', array_values($parts));
@ -355,7 +364,7 @@ abstract class SearchIndex extends ViewableData {
* @param Boolean $includesubs - TODO: Probably going away
* @return string - The document ID as a string
*/
function getDocumentID($object, $base, $includesubs) {
public function getDocumentID($object, $base, $includesubs) {
return $this->getDocumentIDForState($base, $object->ID, SearchVariant::current_state($base, $includesubs));
}
@ -433,7 +442,7 @@ abstract class SearchIndex extends ViewableData {
* @param $fields
* @return array
*/
function getDirtyIDs($class, $id, $statefulids, $fields) {
public function getDirtyIDs($class, $id, $statefulids, $fields) {
$dirty = array();
// First, if this object is directly contained in the index, add it
@ -499,10 +508,10 @@ abstract class SearchIndex extends ViewableData {
/** !! These should be implemented by the full text search engine */
abstract function add($object) ;
abstract function delete($base, $id, $state) ;
abstract public function add($object) ;
abstract public function delete($base, $id, $state) ;
abstract function commit();
abstract public function commit();
/** !! These should be implemented by the specific index */
@ -511,7 +520,7 @@ abstract class SearchIndex extends ViewableData {
* Used instead of overriding __construct as we have specific execution order - code that has
* to be run before _and/or_ after this.
*/
abstract function init();
abstract public function init();
}
/**
@ -519,11 +528,11 @@ abstract class SearchIndex extends ViewableData {
*/
abstract class SearchIndex_Null extends SearchIndex {
function add($object) { }
public function add($object) { }
function delete($base, $id, $state) { }
public function delete($base, $id, $state) { }
function commit() { }
public function commit() { }
}
@ -534,13 +543,15 @@ abstract class SearchIndex_Recording extends SearchIndex {
public $added = array();
public $deleted = array();
public $committed = false;
function reset() {
public function reset() {
$this->added = array();
$this->deleted = array();
$this->committed = false;
}
function add($object) {
public function add($object) {
$res = array();
$res['ID'] = $object->ID;
@ -553,7 +564,7 @@ abstract class SearchIndex_Recording extends SearchIndex {
$this->added[] = $res;
}
function getAdded($fields = array()) {
public function getAdded($fields = array()) {
$res = array();
foreach ($this->added as $added) {
@ -567,14 +578,25 @@ abstract class SearchIndex_Recording extends SearchIndex {
return $res;
}
function delete($base, $id, $state) {
public function delete($base, $id, $state) {
$this->deleted[] = array('base' => $base, 'id' => $id, 'state' => $state);
}
function commit() { }
public function commit() {
$this->committed = true;
}
function getIndexName() {
public function getIndexName() {
return get_class($this);
}
public function getIsCommitted() {
return $this->committed;
}
public function getService() {
// Causes commits to the service to be redirected back to the same object
return $this;
}
}

View File

@ -52,19 +52,6 @@ abstract class SearchUpdateBatchedProcessor extends SearchUpdateProcessor {
$this->setBatch(0);
}
protected function commitIndex($index) {
$name = $index->getIndexName();
// If this is a resurrected batch then it's not necessary to commit the index
// twice, assuming it has successfully been comitted before
if(isset($this->completedIndexes[$name])) return true;
// Commit index and mark as completed
$result = parent::commitIndex($index);
if($result) $this->completedIndexes[$name] = $name;
return $result;
}
/**
* Set the current batch index
*
@ -72,7 +59,6 @@ abstract class SearchUpdateBatchedProcessor extends SearchUpdateProcessor {
*/
protected function setBatch($batch) {
$this->currentBatch = $batch;
$this->completedIndexes = array();
}
protected function getSource() {
@ -93,12 +79,12 @@ abstract class SearchUpdateBatchedProcessor extends SearchUpdateProcessor {
// Don't re-process completed queue
if($this->currentBatch >= count($this->batches)) return true;
// Process batch
$result = parent::process();
// Send current patch to indexes
$this->prepareIndexes();
// Advance to next batch if successful
if($result) $this->setBatch($this->currentBatch + 1);
return $result;
$this->setBatch($this->currentBatch + 1);
return true;
}
/**

View File

@ -0,0 +1,247 @@
<?php
if(!interface_exists('QueuedJob')) return;
class SearchUpdateCommitJobProcessor implements QueuedJob {
/**
* The QueuedJob queue to use when processing commits
*
* @config
* @var int
*/
private static $commit_queue = 2; // QueuedJob::QUEUED;
/**
* List of indexes to commit
*
* @var array
*/
protected $indexes = array();
/**
* True if this job is skipped to be be re-scheduled in the future
*
* @var boolean
*/
protected $skipped = false;
/**
* List of completed indexes
*
* @var array
*/
protected $completed = array();
/**
* List of messages
*
* @var array
*/
protected $messages = array();
/**
* List of dirty indexes to be committed
*
* @var array
*/
public static $dirty_indexes = true;
/**
* If solrindex::commit has already been performed, but additional commits are necessary,
* how long do we wait before attempting to touch the index again?
*
* {@see http://stackoverflow.com/questions/7512945/how-to-fix-exceeded-limit-of-maxwarmingsearchers}
*
* @var int
* @config
*/
private static $cooldown = 300;
/**
* True if any commits have been executed this request. If so, any attempts to run subsequent commits
* should be delayed until next queuedjob to prevent solr reaching maxWarmingSearchers
*
* {@see http://stackoverflow.com/questions/7512945/how-to-fix-exceeded-limit-of-maxwarmingsearchers}
*
* @var boolean
*/
public static $has_run = false;
/**
* This method is invoked once indexes with dirty ids have been updapted and a commit is necessary
*
* @param boolean $dirty Marks all indexes as dirty by default. Set to false if there are known comitted and
* clean indexes
* @param string $startAfter Start date
* @return static The queued job
*/
public static function queue($dirty = true, $startAfter = null) {
$commit = Injector::inst()->create(__CLASS__);
singleton('QueuedJobService')->queueJob($commit, $startAfter);
if($dirty) {
$indexes = FullTextSearch::get_indexes();
static::$dirty_indexes = array_keys($indexes);
}
return $commit;
}
public function getJobType() {
return Config::inst()->get(__CLASS__, 'commit_queue');
}
public function getSignature() {
return md5(get_class($this) . time() . mt_rand(0, 100000));
}
public function getTitle() {
return "FullTextSearch Commit Job";
}
/**
* Get the list of index names we should process
*
* @return array
*/
public function getAllIndexes() {
if(empty($this->indexes)) {
$indexes = FullTextSearch::get_indexes();
$this->indexes = array_keys($indexes);
}
return $this->indexes;
}
public function jobFinished() {
// If we've indexed exactly as many as we would like, we are done
return $this->skipped
|| (count($this->getAllIndexes()) <= count($this->completed));
}
public function prepareForRestart() {
// NOOP
}
public function afterComplete() {
// NOOP
}
/**
* Abort this job, potentially rescheduling a replacement if there is still work to do
*/
protected function discardJob() {
$this->skipped = true;
// If we do not have dirty records, then assume that these dirty records were committed
// already this request (but probably another job), so we don't need to commit anything else.
// This could occur if we completed multiple searchupdate jobs in a prior request, and
// we only need one commit job to commit all of them in the current request.
if(empty(static::$dirty_indexes)) {
$this->addMessage("Indexing already completed this request: Discarding this job");
return;
}
// If any commit has run, but some (or all) indexes are un-comitted, we must re-schedule this task.
// This could occur if we completed a searchupdate job in a prior request, as well as in
// the current request
$cooldown = Config::inst()->get(__CLASS__, 'cooldown');
$now = new DateTime(SS_Datetime::now()->getValue());
$now->add(new DateInterval('PT'.$cooldown.'S'));
$runat = $now->Format('Y-m-d H:i:s');
$this->addMessage("Indexing already run this request, but incomplete. Re-scheduling for {$runat}");
// Queue after the given cooldown
static::queue(false, $runat);
}
public function process() {
// If we have already run an instance of SearchUpdateCommitJobProcessor this request, immediately
// quit this job to prevent hitting warming search limits in Solr
if(static::$has_run) {
$this->discardJob();
return true;
}
// To prevent other commit jobs from running this request
static::$has_run = true;
// Run all incompleted indexes
$indexNames = $this->getAllIndexes();
foreach ($indexNames as $name) {
$index = singleton($name);
$this->commitIndex($index);
}
$this->addMessage("All indexes committed");
return true;
}
/**
* Commits a specific index
*
* @param SolrIndex $index
* @throws Exception
*/
protected function commitIndex($index) {
// Skip index if this is already complete
$name = get_class($index);
if(in_array($name, $this->completed)) {
$this->addMessage("Skipping already comitted index {$name}");
return;
}
// Bypass SolrIndex::commit exception handling so that queuedjobs can handle the error
$this->addMessage("Committing index {$name}");
$index->getService()->commit(false, false, false);
$this->addMessage("Committing index {$name} was successful");
// If this index is currently marked as dirty, it's now clean
if(in_array($name, static::$dirty_indexes)) {
static::$dirty_indexes = array_diff(static::$dirty_indexes, array($name));
}
// Mark complete
$this->completed[] = $name;
}
public function setup() {
// NOOP
}
public function getJobData() {
$data = new stdClass();
$data->totalSteps = count($this->getAllIndexes());
$data->currentStep = count($this->completed);
$data->isComplete = $this->jobFinished();
$data->messages = $this->messages;
$data->jobData = new stdClass();
$data->jobData->skipped = $this->skipped;
$data->jobData->completed = $this->completed;
$data->jobData->indexes = $this->getAllIndexes();
return $data;
}
public function setJobData($totalSteps, $currentStep, $isComplete, $jobData, $messages) {
$this->isComplete = $isComplete;
$this->messages = $messages;
$this->skipped = $jobData->skipped;
$this->completed = $jobData->completed;
$this->indexes = $jobData->indexes;
}
public function addMessage($message, $severity='INFO') {
$severity = strtoupper($severity);
$this->messages[] = '[' . date('Y-m-d H:i:s') . "][$severity] $message";
}
public function getMessages() {
return $this->messages;
}
}

View File

@ -7,7 +7,7 @@ class SearchUpdateQueuedJobProcessor extends SearchUpdateBatchedProcessor implem
/**
* The QueuedJob queue to use when processing updates
* @config
* @var string
* @var int
*/
private static $reindex_queue = 2; // QueuedJob::QUEUED;
@ -43,7 +43,9 @@ class SearchUpdateQueuedJobProcessor extends SearchUpdateBatchedProcessor implem
}
public function afterComplete() {
// NOP
// Once indexing is complete, commit later in order to avoid solr limits
// see http://stackoverflow.com/questions/7512945/how-to-fix-exceeded-limit-of-maxwarmingsearchers
SearchUpdateCommitJobProcessor::queue();
}
public function getJobData() {
@ -56,7 +58,6 @@ class SearchUpdateQueuedJobProcessor extends SearchUpdateBatchedProcessor implem
$data->jobData = new stdClass();
$data->jobData->batches = $this->batches;
$data->jobData->currentBatch = $this->currentBatch;
$data->jobData->completedIndexes = $this->completedIndexes;
return $data;
}
@ -67,11 +68,20 @@ class SearchUpdateQueuedJobProcessor extends SearchUpdateBatchedProcessor implem
$this->batches = $jobData->batches;
$this->currentBatch = $jobData->currentBatch;
$this->completedIndexes = $jobData->completedIndexes;
}
public function addMessage($message, $severity='INFO') {
$severity = strtoupper($severity);
$this->messages[] = '[' . date('Y-m-d H:i:s') . "][$severity] $message";
}
public function process() {
$result = parent::process();
if($this->jobFinished()) {
$this->addMessage("All batched updates complete. Queuing commit job");
}
return $result;
}
}

View File

@ -121,6 +121,32 @@ As you can only search one index at a time, all searchable classes need to be in
}
}
## Using Multiple Indexes
Multiple indexes can be created and searched independently, but if you wish to override an existing
index with another, you can use the `$hide_ancestor` config.
:::php
class MyReplacementIndex extends MyIndex {
private static $hide_ancestor = 'MyIndex';
public function init() {
parent::init();
$this->addClass('File');
$this->addFulltextField('Title');
}
}
You can also filter all indexes globally to a set of pre-defined classes if you wish to
prevent any unknown indexes from being automatically included.
:::yaml
FullTextSearch:
indexes:
- MyReplacementIndex
- CoreSearchIndex
## Indexing Relationships
TODO

View File

@ -14,6 +14,21 @@ class BatchedProcessorTest_Index extends SearchIndex_Recording implements TestOn
}
}
class BatchedProcessor_QueuedJobService {
protected $jobs = array();
public function queueJob(QueuedJob $job, $startAfter = null, $userId = null, $queueName = null) {
$this->jobs[] = array(
'job' => $job,
'startAfter' => $startAfter
);
}
public function getJobs() {
return $this->jobs;
}
}
/**
* Tests {@see SearchUpdateQueuedJobProcessor}
*/
@ -25,6 +40,13 @@ class BatchedProcessorTest extends SapphireTest {
'BatchedProcessorTest_Object'
);
protected $illegalExtensions = array(
'SiteTree' => array(
'SiteTreeSubsites',
'Translatable'
)
);
public function setUp() {
parent::setUp();
Config::nest();
@ -34,11 +56,22 @@ class BatchedProcessorTest extends SapphireTest {
$this->skipTest = true;
}
SS_Datetime::set_mock_now('2015-05-07 06:00:00');
Config::inst()->update('SearchUpdateBatchedProcessor', 'batch_size', 5);
Config::inst()->update('SearchUpdateBatchedProcessor', 'batch_soft_cap', 0);
Config::inst()->update('SearchUpdateCommitJobProcessor', 'cooldown', 600);
Versioned::reading_stage("Stage");
Injector::inst()->registerService(new BatchedProcessor_QueuedJobService(), 'QueuedJobService');
FullTextSearch::force_index_list('BatchedProcessorTest_Index');
SearchUpdateCommitJobProcessor::$dirty_indexes = array();
SearchUpdateCommitJobProcessor::$has_run = false;
$this->oldProcessor = SearchUpdater::$processor;
SearchUpdater::$processor = new SearchUpdateQueuedJobProcessor();
}
@ -47,9 +80,14 @@ class BatchedProcessorTest extends SapphireTest {
SearchUpdater::$processor = $this->oldProcessor;
Config::unnest();
Injector::inst()->unregisterNamedObject('QueuedJobService');
FullTextSearch::force_index_list();
parent::tearDown();
}
/**
* @return SearchUpdateQueuedJobProcessor
*/
protected function generateDirtyIds() {
$processor = SearchUpdater::$processor;
for($id = 1; $id <= 42; $id++) {
@ -100,8 +138,61 @@ class BatchedProcessorTest extends SapphireTest {
$this->assertEquals(9, $data->currentStep);
$this->assertEquals(42, count($index->getAdded()));
$this->assertTrue($data->isComplete);
// Check any additional queued jobs
$processor->afterComplete();
$service = singleton('QueuedJobService');
$jobs = $service->getJobs();
$this->assertEquals(1, count($jobs));
$this->assertInstanceOf('SearchUpdateCommitJobProcessor', $jobs[0]['job']);
}
/**
* Test creation of multiple commit jobs
*/
public function testMultipleCommits() {
$index = singleton('BatchedProcessorTest_Index');
$index->reset();
// Test that running a commit immediately after submitting to the indexes
// correctly commits
$first = SearchUpdateCommitJobProcessor::queue();
$second = SearchUpdateCommitJobProcessor::queue();
$this->assertFalse($index->getIsCommitted());
// First process will cause the commit
$this->assertFalse($first->jobFinished());
$first->process();
$allMessages = $first->getMessages();
$this->assertTrue($index->getIsCommitted());
$this->assertTrue($first->jobFinished());
$this->assertStringEndsWith('All indexes committed', $allMessages[2]);
// Executing the subsequent processor should not re-trigger a commit
$index->reset();
$this->assertFalse($second->jobFinished());
$second->process();
$allMessages = $second->getMessages();
$this->assertFalse($index->getIsCommitted());
$this->assertTrue($second->jobFinished());
$this->assertStringEndsWith('Indexing already completed this request: Discarding this job', $allMessages[0]);
// Given that a third job is created, and the indexes are dirtied, attempting to run this job
// should result in a delay
$index->reset();
$third = SearchUpdateCommitJobProcessor::queue();
$this->assertFalse($third->jobFinished());
$third->process();
$this->assertTrue($third->jobFinished());
$allMessages = $third->getMessages();
$this->assertStringEndsWith(
'Indexing already run this request, but incomplete. Re-scheduling for 2015-05-07 06:10:00',
$allMessages[0]
);
}
/**
* Tests that the batch_soft_cap setting is properly respected
*/