2015-05-07 15:16:27 +12:00
|
|
|
<?php
|
|
|
|
|
2015-11-21 19:19:20 +13:00
|
|
|
if (!interface_exists('QueuedJob')) {
|
|
|
|
return;
|
|
|
|
}
|
2015-05-07 15:16:27 +12:00
|
|
|
|
2015-11-21 19:19:20 +13:00
|
|
|
class SearchUpdateCommitJobProcessor implements QueuedJob
|
|
|
|
{
|
|
|
|
/**
|
|
|
|
* The QueuedJob queue to use when processing commits
|
|
|
|
*
|
|
|
|
* @config
|
|
|
|
* @var int
|
|
|
|
*/
|
|
|
|
private static $commit_queue = 2; // QueuedJob::QUEUED;
|
|
|
|
|
|
|
|
/**
|
|
|
|
* List of indexes to commit
|
|
|
|
*
|
|
|
|
* @var array
|
|
|
|
*/
|
|
|
|
protected $indexes = array();
|
|
|
|
|
|
|
|
/**
|
|
|
|
* True if this job is skipped to be be re-scheduled in the future
|
|
|
|
*
|
|
|
|
* @var boolean
|
|
|
|
*/
|
|
|
|
protected $skipped = false;
|
|
|
|
|
|
|
|
/**
|
|
|
|
* List of completed indexes
|
|
|
|
*
|
|
|
|
* @var array
|
|
|
|
*/
|
|
|
|
protected $completed = array();
|
|
|
|
|
|
|
|
/**
|
|
|
|
* List of messages
|
|
|
|
*
|
|
|
|
* @var array
|
|
|
|
*/
|
|
|
|
protected $messages = array();
|
|
|
|
|
|
|
|
/**
|
|
|
|
* List of dirty indexes to be committed
|
|
|
|
*
|
|
|
|
* @var array
|
|
|
|
*/
|
|
|
|
public static $dirty_indexes = true;
|
|
|
|
|
|
|
|
/**
|
|
|
|
* If solrindex::commit has already been performed, but additional commits are necessary,
|
|
|
|
* how long do we wait before attempting to touch the index again?
|
|
|
|
*
|
|
|
|
* {@see http://stackoverflow.com/questions/7512945/how-to-fix-exceeded-limit-of-maxwarmingsearchers}
|
|
|
|
*
|
|
|
|
* @var int
|
|
|
|
* @config
|
|
|
|
*/
|
|
|
|
private static $cooldown = 300;
|
|
|
|
|
|
|
|
/**
|
|
|
|
* True if any commits have been executed this request. If so, any attempts to run subsequent commits
|
|
|
|
* should be delayed until next queuedjob to prevent solr reaching maxWarmingSearchers
|
|
|
|
*
|
|
|
|
* {@see http://stackoverflow.com/questions/7512945/how-to-fix-exceeded-limit-of-maxwarmingsearchers}
|
|
|
|
*
|
|
|
|
* @var boolean
|
|
|
|
*/
|
|
|
|
public static $has_run = false;
|
|
|
|
|
|
|
|
/**
|
|
|
|
* This method is invoked once indexes with dirty ids have been updapted and a commit is necessary
|
|
|
|
*
|
|
|
|
* @param boolean $dirty Marks all indexes as dirty by default. Set to false if there are known comitted and
|
|
|
|
* clean indexes
|
|
|
|
* @param string $startAfter Start date
|
|
|
|
* @return int The ID of the next queuedjob to run. This could be a new one or an existing one.
|
|
|
|
*/
|
|
|
|
public static function queue($dirty = true, $startAfter = null)
|
|
|
|
{
|
|
|
|
$commit = Injector::inst()->create(__CLASS__);
|
|
|
|
$id = singleton('QueuedJobService')->queueJob($commit, $startAfter);
|
|
|
|
|
|
|
|
if ($dirty) {
|
|
|
|
$indexes = FullTextSearch::get_indexes();
|
|
|
|
static::$dirty_indexes = array_keys($indexes);
|
|
|
|
}
|
|
|
|
return $id;
|
|
|
|
}
|
|
|
|
|
|
|
|
public function getJobType()
|
|
|
|
{
|
|
|
|
return Config::inst()->get(__CLASS__, 'commit_queue');
|
|
|
|
}
|
|
|
|
|
|
|
|
public function getSignature()
|
|
|
|
{
|
|
|
|
// There is only ever one commit job on the queue so the signature is consistent
|
|
|
|
// See QueuedJobService::queueJob() for the code that prevents duplication
|
|
|
|
return __CLASS__;
|
|
|
|
}
|
|
|
|
|
|
|
|
public function getTitle()
|
|
|
|
{
|
|
|
|
return "FullTextSearch Commit Job";
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Get the list of index names we should process
|
|
|
|
*
|
|
|
|
* @return array
|
|
|
|
*/
|
|
|
|
public function getAllIndexes()
|
|
|
|
{
|
|
|
|
if (empty($this->indexes)) {
|
|
|
|
$indexes = FullTextSearch::get_indexes();
|
|
|
|
$this->indexes = array_keys($indexes);
|
|
|
|
}
|
|
|
|
return $this->indexes;
|
|
|
|
}
|
|
|
|
|
|
|
|
public function jobFinished()
|
|
|
|
{
|
|
|
|
// If we've indexed exactly as many as we would like, we are done
|
|
|
|
return $this->skipped
|
|
|
|
|| (count($this->getAllIndexes()) <= count($this->completed));
|
|
|
|
}
|
|
|
|
|
|
|
|
public function prepareForRestart()
|
|
|
|
{
|
|
|
|
// NOOP
|
|
|
|
}
|
|
|
|
|
|
|
|
public function afterComplete()
|
|
|
|
{
|
|
|
|
// NOOP
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Abort this job, potentially rescheduling a replacement if there is still work to do
|
|
|
|
*/
|
|
|
|
protected function discardJob()
|
|
|
|
{
|
|
|
|
$this->skipped = true;
|
|
|
|
|
|
|
|
// If we do not have dirty records, then assume that these dirty records were committed
|
|
|
|
// already this request (but probably another job), so we don't need to commit anything else.
|
|
|
|
// This could occur if we completed multiple searchupdate jobs in a prior request, and
|
|
|
|
// we only need one commit job to commit all of them in the current request.
|
|
|
|
if (empty(static::$dirty_indexes)) {
|
|
|
|
$this->addMessage("Indexing already completed this request: Discarding this job");
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// If any commit has run, but some (or all) indexes are un-comitted, we must re-schedule this task.
|
|
|
|
// This could occur if we completed a searchupdate job in a prior request, as well as in
|
|
|
|
// the current request
|
|
|
|
$cooldown = Config::inst()->get(__CLASS__, 'cooldown');
|
|
|
|
$now = new DateTime(SS_Datetime::now()->getValue());
|
|
|
|
$now->add(new DateInterval('PT'.$cooldown.'S'));
|
|
|
|
$runat = $now->Format('Y-m-d H:i:s');
|
|
|
|
|
|
|
|
$this->addMessage("Indexing already run this request, but incomplete. Re-scheduling for {$runat}");
|
|
|
|
|
|
|
|
// Queue after the given cooldown
|
|
|
|
static::queue(false, $runat);
|
|
|
|
}
|
|
|
|
|
|
|
|
public function process()
|
|
|
|
{
|
|
|
|
// If we have already run an instance of SearchUpdateCommitJobProcessor this request, immediately
|
|
|
|
// quit this job to prevent hitting warming search limits in Solr
|
|
|
|
if (static::$has_run) {
|
|
|
|
$this->discardJob();
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
// To prevent other commit jobs from running this request
|
|
|
|
static::$has_run = true;
|
|
|
|
|
|
|
|
// Run all incompleted indexes
|
|
|
|
$indexNames = $this->getAllIndexes();
|
|
|
|
foreach ($indexNames as $name) {
|
|
|
|
$index = singleton($name);
|
|
|
|
$this->commitIndex($index);
|
|
|
|
}
|
|
|
|
|
|
|
|
$this->addMessage("All indexes committed");
|
|
|
|
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Commits a specific index
|
|
|
|
*
|
|
|
|
* @param SolrIndex $index
|
|
|
|
* @throws Exception
|
|
|
|
*/
|
|
|
|
protected function commitIndex($index)
|
|
|
|
{
|
|
|
|
// Skip index if this is already complete
|
|
|
|
$name = get_class($index);
|
|
|
|
if (in_array($name, $this->completed)) {
|
|
|
|
$this->addMessage("Skipping already comitted index {$name}");
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Bypass SolrIndex::commit exception handling so that queuedjobs can handle the error
|
|
|
|
$this->addMessage("Committing index {$name}");
|
|
|
|
$index->getService()->commit(false, false, false);
|
|
|
|
$this->addMessage("Committing index {$name} was successful");
|
|
|
|
|
|
|
|
// If this index is currently marked as dirty, it's now clean
|
|
|
|
if (in_array($name, static::$dirty_indexes)) {
|
|
|
|
static::$dirty_indexes = array_diff(static::$dirty_indexes, array($name));
|
|
|
|
}
|
|
|
|
|
|
|
|
// Mark complete
|
|
|
|
$this->completed[] = $name;
|
|
|
|
}
|
|
|
|
|
|
|
|
public function setup()
|
|
|
|
{
|
|
|
|
// NOOP
|
|
|
|
}
|
|
|
|
|
|
|
|
public function getJobData()
|
|
|
|
{
|
|
|
|
$data = new stdClass();
|
|
|
|
$data->totalSteps = count($this->getAllIndexes());
|
|
|
|
$data->currentStep = count($this->completed);
|
|
|
|
$data->isComplete = $this->jobFinished();
|
|
|
|
$data->messages = $this->messages;
|
|
|
|
|
|
|
|
$data->jobData = new stdClass();
|
|
|
|
$data->jobData->skipped = $this->skipped;
|
|
|
|
$data->jobData->completed = $this->completed;
|
|
|
|
$data->jobData->indexes = $this->getAllIndexes();
|
|
|
|
|
|
|
|
return $data;
|
|
|
|
}
|
|
|
|
|
|
|
|
public function setJobData($totalSteps, $currentStep, $isComplete, $jobData, $messages)
|
|
|
|
{
|
|
|
|
$this->isComplete = $isComplete;
|
|
|
|
$this->messages = $messages;
|
|
|
|
|
|
|
|
$this->skipped = $jobData->skipped;
|
|
|
|
$this->completed = $jobData->completed;
|
|
|
|
$this->indexes = $jobData->indexes;
|
|
|
|
}
|
|
|
|
|
|
|
|
public function addMessage($message, $severity='INFO')
|
|
|
|
{
|
|
|
|
$severity = strtoupper($severity);
|
|
|
|
$this->messages[] = '[' . date('Y-m-d H:i:s') . "][$severity] $message";
|
|
|
|
}
|
|
|
|
|
|
|
|
public function getMessages()
|
|
|
|
{
|
|
|
|
return $this->messages;
|
|
|
|
}
|
2015-05-07 15:16:27 +12:00
|
|
|
}
|