mirror of
https://github.com/silverstripe/silverstripe-fulltextsearch
synced 2024-10-22 14:05:29 +02:00
Merge pull request #67 from tractorcow/pulls/safe-commit
API Separate searchupdate / commit into separate queued-jobs
This commit is contained in:
commit
bac802c1f2
@ -9,6 +9,15 @@ class FullTextSearch {
|
||||
|
||||
static protected $indexes_by_subclass = array();
|
||||
|
||||
/**
|
||||
* Optional list of index names to limit to. If left empty, all subclasses of SearchIndex
|
||||
* will be used
|
||||
*
|
||||
* @var array
|
||||
* @config
|
||||
*/
|
||||
private static $indexes = array();
|
||||
|
||||
/**
|
||||
* Get all the instantiable search indexes (so all the user created indexes, but not the connector or library level
|
||||
* abstract indexes). Can optionally be filtered to only return indexes that are subclasses of some class
|
||||
@ -18,16 +27,43 @@ class FullTextSearch {
|
||||
* @param bool $rebuild - If true, don't use cached values
|
||||
*/
|
||||
static function get_indexes($class = null, $rebuild = false) {
|
||||
if ($rebuild) { self::$all_indexes = null; self::$indexes_by_subclass = array(); }
|
||||
if ($rebuild) {
|
||||
self::$all_indexes = null;
|
||||
self::$indexes_by_subclass = array();
|
||||
}
|
||||
|
||||
if (!$class) {
|
||||
if (self::$all_indexes === null) {
|
||||
$classes = ClassInfo::subclassesFor('SearchIndex');
|
||||
// Get declared indexes, or otherwise default to all subclasses of SearchIndex
|
||||
$classes = Config::inst()->get(__CLASS__, 'indexes')
|
||||
?: ClassInfo::subclassesFor('SearchIndex');
|
||||
|
||||
$concrete = array();
|
||||
$hidden = array();
|
||||
$candidates = array();
|
||||
foreach ($classes as $class) {
|
||||
// Check if this index is disabled
|
||||
$hides = $class::config()->hide_ancestor;
|
||||
if($hides) {
|
||||
$hidden[] = $hides;
|
||||
}
|
||||
|
||||
// Check if this index is abstract
|
||||
$ref = new ReflectionClass($class);
|
||||
if ($ref->isInstantiable()) $concrete[$class] = singleton($class);
|
||||
if (!$ref->isInstantiable()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$candidates[] = $class;
|
||||
}
|
||||
|
||||
if($hidden) {
|
||||
$candidates = array_diff($candidates, $hidden);
|
||||
}
|
||||
|
||||
// Create all indexes
|
||||
$concrete = array();
|
||||
foreach($candidates as $class) {
|
||||
$concrete[$class] = singleton($class);
|
||||
}
|
||||
|
||||
self::$all_indexes = $concrete;
|
||||
@ -60,6 +96,8 @@ class FullTextSearch {
|
||||
* From then on, fulltext search system will only see those indexes passed in this most recent call.
|
||||
*
|
||||
* Passing in no arguments resets back to automatic index list
|
||||
*
|
||||
* Alternatively you can use `FullTextSearch.indexes` to configure a list of indexes via config.
|
||||
*/
|
||||
static function force_index_list() {
|
||||
$indexes = func_get_args();
|
||||
|
@ -29,7 +29,16 @@
|
||||
*/
|
||||
abstract class SearchIndex extends ViewableData {
|
||||
|
||||
function __construct() {
|
||||
/**
|
||||
* Allows this index to hide a parent index. Specifies the name of a parent index to disable
|
||||
*
|
||||
* @var string
|
||||
* @config
|
||||
*/
|
||||
private static $hide_ancestor;
|
||||
|
||||
public function __construct() {
|
||||
parent::__construct();
|
||||
$this->init();
|
||||
|
||||
foreach ($this->getClasses() as $class => $options) {
|
||||
@ -39,7 +48,7 @@ abstract class SearchIndex extends ViewableData {
|
||||
$this->buildDependancyList();
|
||||
}
|
||||
|
||||
function __toString() {
|
||||
public function __toString() {
|
||||
return 'Search Index ' . get_class($this);
|
||||
}
|
||||
|
||||
@ -47,7 +56,7 @@ abstract class SearchIndex extends ViewableData {
|
||||
* Examines the classes this index is built on to try and find defined fields in the class hierarchy for those classes.
|
||||
* Looks for db and viewable-data fields, although can't nessecarily find type for viewable-data fields.
|
||||
*/
|
||||
function fieldData($field, $forceType = null, $extraOptions = array()) {
|
||||
public function fieldData($field, $forceType = null, $extraOptions = array()) {
|
||||
$fullfield = str_replace(".", "_", $field);
|
||||
$sources = $this->getClasses();
|
||||
|
||||
@ -287,7 +296,7 @@ abstract class SearchIndex extends ViewableData {
|
||||
|
||||
public $dependancyList = array();
|
||||
|
||||
function buildDependancyList() {
|
||||
public function buildDependancyList() {
|
||||
$this->dependancyList = array_keys($this->getClasses());
|
||||
|
||||
foreach ($this->getFieldsIterator() as $name => $field) {
|
||||
@ -302,7 +311,7 @@ abstract class SearchIndex extends ViewableData {
|
||||
* Returns an array where each member is all the fields and the classes that are at the end of some
|
||||
* specific lookup chain from one of the base classes
|
||||
*/
|
||||
function getDerivedFields() {
|
||||
public function getDerivedFields() {
|
||||
if ($this->derivedFields === null) {
|
||||
$this->derivedFields = array();
|
||||
|
||||
@ -341,7 +350,7 @@ abstract class SearchIndex extends ViewableData {
|
||||
* @param Array $state - The variant state of the object
|
||||
* @return string - The document ID as a string
|
||||
*/
|
||||
function getDocumentIDForState($base, $id, $state) {
|
||||
public function getDocumentIDForState($base, $id, $state) {
|
||||
ksort($state);
|
||||
$parts = array('id' => $id, 'base' => $base, 'state' => json_encode($state));
|
||||
return implode('-', array_values($parts));
|
||||
@ -355,7 +364,7 @@ abstract class SearchIndex extends ViewableData {
|
||||
* @param Boolean $includesubs - TODO: Probably going away
|
||||
* @return string - The document ID as a string
|
||||
*/
|
||||
function getDocumentID($object, $base, $includesubs) {
|
||||
public function getDocumentID($object, $base, $includesubs) {
|
||||
return $this->getDocumentIDForState($base, $object->ID, SearchVariant::current_state($base, $includesubs));
|
||||
}
|
||||
|
||||
@ -433,7 +442,7 @@ abstract class SearchIndex extends ViewableData {
|
||||
* @param $fields
|
||||
* @return array
|
||||
*/
|
||||
function getDirtyIDs($class, $id, $statefulids, $fields) {
|
||||
public function getDirtyIDs($class, $id, $statefulids, $fields) {
|
||||
$dirty = array();
|
||||
|
||||
// First, if this object is directly contained in the index, add it
|
||||
@ -499,10 +508,10 @@ abstract class SearchIndex extends ViewableData {
|
||||
|
||||
/** !! These should be implemented by the full text search engine */
|
||||
|
||||
abstract function add($object) ;
|
||||
abstract function delete($base, $id, $state) ;
|
||||
abstract public function add($object) ;
|
||||
abstract public function delete($base, $id, $state) ;
|
||||
|
||||
abstract function commit();
|
||||
abstract public function commit();
|
||||
|
||||
/** !! These should be implemented by the specific index */
|
||||
|
||||
@ -511,7 +520,7 @@ abstract class SearchIndex extends ViewableData {
|
||||
* Used instead of overriding __construct as we have specific execution order - code that has
|
||||
* to be run before _and/or_ after this.
|
||||
*/
|
||||
abstract function init();
|
||||
abstract public function init();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -519,11 +528,11 @@ abstract class SearchIndex extends ViewableData {
|
||||
*/
|
||||
abstract class SearchIndex_Null extends SearchIndex {
|
||||
|
||||
function add($object) { }
|
||||
public function add($object) { }
|
||||
|
||||
function delete($base, $id, $state) { }
|
||||
public function delete($base, $id, $state) { }
|
||||
|
||||
function commit() { }
|
||||
public function commit() { }
|
||||
|
||||
}
|
||||
|
||||
@ -534,13 +543,15 @@ abstract class SearchIndex_Recording extends SearchIndex {
|
||||
|
||||
public $added = array();
|
||||
public $deleted = array();
|
||||
public $committed = false;
|
||||
|
||||
function reset() {
|
||||
public function reset() {
|
||||
$this->added = array();
|
||||
$this->deleted = array();
|
||||
$this->committed = false;
|
||||
}
|
||||
|
||||
function add($object) {
|
||||
public function add($object) {
|
||||
$res = array();
|
||||
|
||||
$res['ID'] = $object->ID;
|
||||
@ -553,7 +564,7 @@ abstract class SearchIndex_Recording extends SearchIndex {
|
||||
$this->added[] = $res;
|
||||
}
|
||||
|
||||
function getAdded($fields = array()) {
|
||||
public function getAdded($fields = array()) {
|
||||
$res = array();
|
||||
|
||||
foreach ($this->added as $added) {
|
||||
@ -567,14 +578,25 @@ abstract class SearchIndex_Recording extends SearchIndex {
|
||||
return $res;
|
||||
}
|
||||
|
||||
function delete($base, $id, $state) {
|
||||
public function delete($base, $id, $state) {
|
||||
$this->deleted[] = array('base' => $base, 'id' => $id, 'state' => $state);
|
||||
}
|
||||
|
||||
function commit() { }
|
||||
public function commit() {
|
||||
$this->committed = true;
|
||||
}
|
||||
|
||||
function getIndexName() {
|
||||
public function getIndexName() {
|
||||
return get_class($this);
|
||||
}
|
||||
|
||||
public function getIsCommitted() {
|
||||
return $this->committed;
|
||||
}
|
||||
|
||||
public function getService() {
|
||||
// Causes commits to the service to be redirected back to the same object
|
||||
return $this;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -52,19 +52,6 @@ abstract class SearchUpdateBatchedProcessor extends SearchUpdateProcessor {
|
||||
$this->setBatch(0);
|
||||
}
|
||||
|
||||
protected function commitIndex($index) {
|
||||
$name = $index->getIndexName();
|
||||
|
||||
// If this is a resurrected batch then it's not necessary to commit the index
|
||||
// twice, assuming it has successfully been comitted before
|
||||
if(isset($this->completedIndexes[$name])) return true;
|
||||
|
||||
// Commit index and mark as completed
|
||||
$result = parent::commitIndex($index);
|
||||
if($result) $this->completedIndexes[$name] = $name;
|
||||
return $result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the current batch index
|
||||
*
|
||||
@ -72,7 +59,6 @@ abstract class SearchUpdateBatchedProcessor extends SearchUpdateProcessor {
|
||||
*/
|
||||
protected function setBatch($batch) {
|
||||
$this->currentBatch = $batch;
|
||||
$this->completedIndexes = array();
|
||||
}
|
||||
|
||||
protected function getSource() {
|
||||
@ -93,12 +79,12 @@ abstract class SearchUpdateBatchedProcessor extends SearchUpdateProcessor {
|
||||
// Don't re-process completed queue
|
||||
if($this->currentBatch >= count($this->batches)) return true;
|
||||
|
||||
// Process batch
|
||||
$result = parent::process();
|
||||
// Send current patch to indexes
|
||||
$this->prepareIndexes();
|
||||
|
||||
// Advance to next batch if successful
|
||||
if($result) $this->setBatch($this->currentBatch + 1);
|
||||
return $result;
|
||||
$this->setBatch($this->currentBatch + 1);
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
|
249
code/search/processors/SearchUpdateCommitJobProcessor.php
Normal file
249
code/search/processors/SearchUpdateCommitJobProcessor.php
Normal file
@ -0,0 +1,249 @@
|
||||
<?php
|
||||
|
||||
if(!interface_exists('QueuedJob')) return;
|
||||
|
||||
class SearchUpdateCommitJobProcessor implements QueuedJob {
|
||||
|
||||
/**
|
||||
* The QueuedJob queue to use when processing commits
|
||||
*
|
||||
* @config
|
||||
* @var int
|
||||
*/
|
||||
private static $commit_queue = 2; // QueuedJob::QUEUED;
|
||||
|
||||
/**
|
||||
* List of indexes to commit
|
||||
*
|
||||
* @var array
|
||||
*/
|
||||
protected $indexes = array();
|
||||
|
||||
/**
|
||||
* True if this job is skipped to be be re-scheduled in the future
|
||||
*
|
||||
* @var boolean
|
||||
*/
|
||||
protected $skipped = false;
|
||||
|
||||
/**
|
||||
* List of completed indexes
|
||||
*
|
||||
* @var array
|
||||
*/
|
||||
protected $completed = array();
|
||||
|
||||
/**
|
||||
* List of messages
|
||||
*
|
||||
* @var array
|
||||
*/
|
||||
protected $messages = array();
|
||||
|
||||
/**
|
||||
* List of dirty indexes to be committed
|
||||
*
|
||||
* @var array
|
||||
*/
|
||||
public static $dirty_indexes = true;
|
||||
|
||||
/**
|
||||
* If solrindex::commit has already been performed, but additional commits are necessary,
|
||||
* how long do we wait before attempting to touch the index again?
|
||||
*
|
||||
* {@see http://stackoverflow.com/questions/7512945/how-to-fix-exceeded-limit-of-maxwarmingsearchers}
|
||||
*
|
||||
* @var int
|
||||
* @config
|
||||
*/
|
||||
private static $cooldown = 300;
|
||||
|
||||
/**
|
||||
* True if any commits have been executed this request. If so, any attempts to run subsequent commits
|
||||
* should be delayed until next queuedjob to prevent solr reaching maxWarmingSearchers
|
||||
*
|
||||
* {@see http://stackoverflow.com/questions/7512945/how-to-fix-exceeded-limit-of-maxwarmingsearchers}
|
||||
*
|
||||
* @var boolean
|
||||
*/
|
||||
public static $has_run = false;
|
||||
|
||||
/**
|
||||
* This method is invoked once indexes with dirty ids have been updapted and a commit is necessary
|
||||
*
|
||||
* @param boolean $dirty Marks all indexes as dirty by default. Set to false if there are known comitted and
|
||||
* clean indexes
|
||||
* @param string $startAfter Start date
|
||||
* @return int The ID of the next queuedjob to run. This could be a new one or an existing one.
|
||||
*/
|
||||
public static function queue($dirty = true, $startAfter = null) {
|
||||
$commit = Injector::inst()->create(__CLASS__);
|
||||
$id = singleton('QueuedJobService')->queueJob($commit, $startAfter);
|
||||
|
||||
if($dirty) {
|
||||
$indexes = FullTextSearch::get_indexes();
|
||||
static::$dirty_indexes = array_keys($indexes);
|
||||
}
|
||||
return $id;
|
||||
}
|
||||
|
||||
public function getJobType() {
|
||||
return Config::inst()->get(__CLASS__, 'commit_queue');
|
||||
}
|
||||
|
||||
public function getSignature() {
|
||||
// There is only ever one commit job on the queue so the signature is consistent
|
||||
// See QueuedJobService::queueJob() for the code that prevents duplication
|
||||
return __CLASS__;
|
||||
}
|
||||
|
||||
public function getTitle() {
|
||||
return "FullTextSearch Commit Job";
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the list of index names we should process
|
||||
*
|
||||
* @return array
|
||||
*/
|
||||
public function getAllIndexes() {
|
||||
if(empty($this->indexes)) {
|
||||
$indexes = FullTextSearch::get_indexes();
|
||||
$this->indexes = array_keys($indexes);
|
||||
}
|
||||
return $this->indexes;
|
||||
}
|
||||
|
||||
public function jobFinished() {
|
||||
// If we've indexed exactly as many as we would like, we are done
|
||||
return $this->skipped
|
||||
|| (count($this->getAllIndexes()) <= count($this->completed));
|
||||
}
|
||||
|
||||
public function prepareForRestart() {
|
||||
// NOOP
|
||||
}
|
||||
|
||||
public function afterComplete() {
|
||||
// NOOP
|
||||
}
|
||||
|
||||
/**
|
||||
* Abort this job, potentially rescheduling a replacement if there is still work to do
|
||||
*/
|
||||
protected function discardJob() {
|
||||
$this->skipped = true;
|
||||
|
||||
// If we do not have dirty records, then assume that these dirty records were committed
|
||||
// already this request (but probably another job), so we don't need to commit anything else.
|
||||
// This could occur if we completed multiple searchupdate jobs in a prior request, and
|
||||
// we only need one commit job to commit all of them in the current request.
|
||||
if(empty(static::$dirty_indexes)) {
|
||||
$this->addMessage("Indexing already completed this request: Discarding this job");
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
// If any commit has run, but some (or all) indexes are un-comitted, we must re-schedule this task.
|
||||
// This could occur if we completed a searchupdate job in a prior request, as well as in
|
||||
// the current request
|
||||
$cooldown = Config::inst()->get(__CLASS__, 'cooldown');
|
||||
$now = new DateTime(SS_Datetime::now()->getValue());
|
||||
$now->add(new DateInterval('PT'.$cooldown.'S'));
|
||||
$runat = $now->Format('Y-m-d H:i:s');
|
||||
|
||||
$this->addMessage("Indexing already run this request, but incomplete. Re-scheduling for {$runat}");
|
||||
|
||||
// Queue after the given cooldown
|
||||
static::queue(false, $runat);
|
||||
}
|
||||
|
||||
public function process() {
|
||||
// If we have already run an instance of SearchUpdateCommitJobProcessor this request, immediately
|
||||
// quit this job to prevent hitting warming search limits in Solr
|
||||
if(static::$has_run) {
|
||||
$this->discardJob();
|
||||
return true;
|
||||
}
|
||||
|
||||
// To prevent other commit jobs from running this request
|
||||
static::$has_run = true;
|
||||
|
||||
// Run all incompleted indexes
|
||||
$indexNames = $this->getAllIndexes();
|
||||
foreach ($indexNames as $name) {
|
||||
$index = singleton($name);
|
||||
$this->commitIndex($index);
|
||||
}
|
||||
|
||||
$this->addMessage("All indexes committed");
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Commits a specific index
|
||||
*
|
||||
* @param SolrIndex $index
|
||||
* @throws Exception
|
||||
*/
|
||||
protected function commitIndex($index) {
|
||||
// Skip index if this is already complete
|
||||
$name = get_class($index);
|
||||
if(in_array($name, $this->completed)) {
|
||||
$this->addMessage("Skipping already comitted index {$name}");
|
||||
return;
|
||||
}
|
||||
|
||||
// Bypass SolrIndex::commit exception handling so that queuedjobs can handle the error
|
||||
$this->addMessage("Committing index {$name}");
|
||||
$index->getService()->commit(false, false, false);
|
||||
$this->addMessage("Committing index {$name} was successful");
|
||||
|
||||
// If this index is currently marked as dirty, it's now clean
|
||||
if(in_array($name, static::$dirty_indexes)) {
|
||||
static::$dirty_indexes = array_diff(static::$dirty_indexes, array($name));
|
||||
}
|
||||
|
||||
// Mark complete
|
||||
$this->completed[] = $name;
|
||||
}
|
||||
|
||||
public function setup() {
|
||||
// NOOP
|
||||
}
|
||||
|
||||
public function getJobData() {
|
||||
$data = new stdClass();
|
||||
$data->totalSteps = count($this->getAllIndexes());
|
||||
$data->currentStep = count($this->completed);
|
||||
$data->isComplete = $this->jobFinished();
|
||||
$data->messages = $this->messages;
|
||||
|
||||
$data->jobData = new stdClass();
|
||||
$data->jobData->skipped = $this->skipped;
|
||||
$data->jobData->completed = $this->completed;
|
||||
$data->jobData->indexes = $this->getAllIndexes();
|
||||
|
||||
return $data;
|
||||
}
|
||||
|
||||
public function setJobData($totalSteps, $currentStep, $isComplete, $jobData, $messages) {
|
||||
$this->isComplete = $isComplete;
|
||||
$this->messages = $messages;
|
||||
|
||||
$this->skipped = $jobData->skipped;
|
||||
$this->completed = $jobData->completed;
|
||||
$this->indexes = $jobData->indexes;
|
||||
}
|
||||
|
||||
public function addMessage($message, $severity='INFO') {
|
||||
$severity = strtoupper($severity);
|
||||
$this->messages[] = '[' . date('Y-m-d H:i:s') . "][$severity] $message";
|
||||
}
|
||||
|
||||
public function getMessages() {
|
||||
return $this->messages;
|
||||
}
|
||||
|
||||
}
|
@ -7,7 +7,7 @@ class SearchUpdateQueuedJobProcessor extends SearchUpdateBatchedProcessor implem
|
||||
/**
|
||||
* The QueuedJob queue to use when processing updates
|
||||
* @config
|
||||
* @var string
|
||||
* @var int
|
||||
*/
|
||||
private static $reindex_queue = 2; // QueuedJob::QUEUED;
|
||||
|
||||
@ -43,7 +43,9 @@ class SearchUpdateQueuedJobProcessor extends SearchUpdateBatchedProcessor implem
|
||||
}
|
||||
|
||||
public function afterComplete() {
|
||||
// NOP
|
||||
// Once indexing is complete, commit later in order to avoid solr limits
|
||||
// see http://stackoverflow.com/questions/7512945/how-to-fix-exceeded-limit-of-maxwarmingsearchers
|
||||
SearchUpdateCommitJobProcessor::queue();
|
||||
}
|
||||
|
||||
public function getJobData() {
|
||||
@ -56,7 +58,6 @@ class SearchUpdateQueuedJobProcessor extends SearchUpdateBatchedProcessor implem
|
||||
$data->jobData = new stdClass();
|
||||
$data->jobData->batches = $this->batches;
|
||||
$data->jobData->currentBatch = $this->currentBatch;
|
||||
$data->jobData->completedIndexes = $this->completedIndexes;
|
||||
|
||||
return $data;
|
||||
}
|
||||
@ -67,11 +68,20 @@ class SearchUpdateQueuedJobProcessor extends SearchUpdateBatchedProcessor implem
|
||||
|
||||
$this->batches = $jobData->batches;
|
||||
$this->currentBatch = $jobData->currentBatch;
|
||||
$this->completedIndexes = $jobData->completedIndexes;
|
||||
}
|
||||
|
||||
public function addMessage($message, $severity='INFO') {
|
||||
$severity = strtoupper($severity);
|
||||
$this->messages[] = '[' . date('Y-m-d H:i:s') . "][$severity] $message";
|
||||
}
|
||||
|
||||
public function process() {
|
||||
$result = parent::process();
|
||||
|
||||
if($this->jobFinished()) {
|
||||
$this->addMessage("All batched updates complete. Queuing commit job");
|
||||
}
|
||||
|
||||
return $result;
|
||||
}
|
||||
}
|
||||
|
@ -121,6 +121,32 @@ As you can only search one index at a time, all searchable classes need to be in
|
||||
}
|
||||
}
|
||||
|
||||
## Using Multiple Indexes
|
||||
|
||||
Multiple indexes can be created and searched independently, but if you wish to override an existing
|
||||
index with another, you can use the `$hide_ancestor` config.
|
||||
|
||||
:::php
|
||||
class MyReplacementIndex extends MyIndex {
|
||||
private static $hide_ancestor = 'MyIndex';
|
||||
|
||||
public function init() {
|
||||
parent::init();
|
||||
$this->addClass('File');
|
||||
$this->addFulltextField('Title');
|
||||
}
|
||||
}
|
||||
|
||||
You can also filter all indexes globally to a set of pre-defined classes if you wish to
|
||||
prevent any unknown indexes from being automatically included.
|
||||
|
||||
:::yaml
|
||||
FullTextSearch:
|
||||
indexes:
|
||||
- MyReplacementIndex
|
||||
- CoreSearchIndex
|
||||
|
||||
|
||||
## Indexing Relationships
|
||||
|
||||
TODO
|
||||
|
@ -14,6 +14,22 @@ class BatchedProcessorTest_Index extends SearchIndex_Recording implements TestOn
|
||||
}
|
||||
}
|
||||
|
||||
class BatchedProcessor_QueuedJobService {
|
||||
protected $jobs = array();
|
||||
|
||||
public function queueJob(QueuedJob $job, $startAfter = null, $userId = null, $queueName = null) {
|
||||
$this->jobs[] = array(
|
||||
'job' => $job,
|
||||
'startAfter' => $startAfter
|
||||
);
|
||||
return $job;
|
||||
}
|
||||
|
||||
public function getJobs() {
|
||||
return $this->jobs;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests {@see SearchUpdateQueuedJobProcessor}
|
||||
*/
|
||||
@ -25,6 +41,13 @@ class BatchedProcessorTest extends SapphireTest {
|
||||
'BatchedProcessorTest_Object'
|
||||
);
|
||||
|
||||
protected $illegalExtensions = array(
|
||||
'SiteTree' => array(
|
||||
'SiteTreeSubsites',
|
||||
'Translatable'
|
||||
)
|
||||
);
|
||||
|
||||
public function setUp() {
|
||||
parent::setUp();
|
||||
Config::nest();
|
||||
@ -33,11 +56,22 @@ class BatchedProcessorTest extends SapphireTest {
|
||||
$this->markTestSkipped("These tests need the QueuedJobs module installed to run");
|
||||
$this->skipTest = true;
|
||||
}
|
||||
|
||||
|
||||
SS_Datetime::set_mock_now('2015-05-07 06:00:00');
|
||||
|
||||
Config::inst()->update('SearchUpdateBatchedProcessor', 'batch_size', 5);
|
||||
Config::inst()->update('SearchUpdateBatchedProcessor', 'batch_soft_cap', 0);
|
||||
Config::inst()->update('SearchUpdateCommitJobProcessor', 'cooldown', 600);
|
||||
|
||||
Versioned::reading_stage("Stage");
|
||||
|
||||
Injector::inst()->registerService(new BatchedProcessor_QueuedJobService(), 'QueuedJobService');
|
||||
|
||||
FullTextSearch::force_index_list('BatchedProcessorTest_Index');
|
||||
|
||||
SearchUpdateCommitJobProcessor::$dirty_indexes = array();
|
||||
SearchUpdateCommitJobProcessor::$has_run = false;
|
||||
|
||||
$this->oldProcessor = SearchUpdater::$processor;
|
||||
SearchUpdater::$processor = new SearchUpdateQueuedJobProcessor();
|
||||
@ -47,9 +81,14 @@ class BatchedProcessorTest extends SapphireTest {
|
||||
|
||||
SearchUpdater::$processor = $this->oldProcessor;
|
||||
Config::unnest();
|
||||
Injector::inst()->unregisterNamedObject('QueuedJobService');
|
||||
FullTextSearch::force_index_list();
|
||||
parent::tearDown();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return SearchUpdateQueuedJobProcessor
|
||||
*/
|
||||
protected function generateDirtyIds() {
|
||||
$processor = SearchUpdater::$processor;
|
||||
for($id = 1; $id <= 42; $id++) {
|
||||
@ -100,7 +139,60 @@ class BatchedProcessorTest extends SapphireTest {
|
||||
$this->assertEquals(9, $data->currentStep);
|
||||
$this->assertEquals(42, count($index->getAdded()));
|
||||
$this->assertTrue($data->isComplete);
|
||||
|
||||
// Check any additional queued jobs
|
||||
$processor->afterComplete();
|
||||
$service = singleton('QueuedJobService');
|
||||
$jobs = $service->getJobs();
|
||||
$this->assertEquals(1, count($jobs));
|
||||
$this->assertInstanceOf('SearchUpdateCommitJobProcessor', $jobs[0]['job']);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test creation of multiple commit jobs
|
||||
*/
|
||||
public function testMultipleCommits() {
|
||||
$index = singleton('BatchedProcessorTest_Index');
|
||||
$index->reset();
|
||||
|
||||
// Test that running a commit immediately after submitting to the indexes
|
||||
// correctly commits
|
||||
$first = SearchUpdateCommitJobProcessor::queue();
|
||||
$second = SearchUpdateCommitJobProcessor::queue();
|
||||
|
||||
$this->assertFalse($index->getIsCommitted());
|
||||
|
||||
// First process will cause the commit
|
||||
$this->assertFalse($first->jobFinished());
|
||||
$first->process();
|
||||
$allMessages = $first->getMessages();
|
||||
$this->assertTrue($index->getIsCommitted());
|
||||
$this->assertTrue($first->jobFinished());
|
||||
$this->assertStringEndsWith('All indexes committed', $allMessages[2]);
|
||||
|
||||
// Executing the subsequent processor should not re-trigger a commit
|
||||
$index->reset();
|
||||
$this->assertFalse($second->jobFinished());
|
||||
$second->process();
|
||||
$allMessages = $second->getMessages();
|
||||
$this->assertFalse($index->getIsCommitted());
|
||||
$this->assertTrue($second->jobFinished());
|
||||
$this->assertStringEndsWith('Indexing already completed this request: Discarding this job', $allMessages[0]);
|
||||
|
||||
// Given that a third job is created, and the indexes are dirtied, attempting to run this job
|
||||
// should result in a delay
|
||||
$index->reset();
|
||||
$third = SearchUpdateCommitJobProcessor::queue();
|
||||
$this->assertFalse($third->jobFinished());
|
||||
$third->process();
|
||||
$this->assertTrue($third->jobFinished());
|
||||
$allMessages = $third->getMessages();
|
||||
$this->assertStringEndsWith(
|
||||
'Indexing already run this request, but incomplete. Re-scheduling for 2015-05-07 06:10:00',
|
||||
$allMessages[0]
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Tests that the batch_soft_cap setting is properly respected
|
||||
|
Loading…
Reference in New Issue
Block a user