API Move SearchUpdate queue handling to DIed processor, add QueuedJob support

This commit is contained in:
Hamish Friedlander 2013-07-25 14:27:09 +12:00
parent d6cf16c391
commit dbd68dc300
7 changed files with 238 additions and 134 deletions

22
_config/processor.yml Normal file
View File

@ -0,0 +1,22 @@
---
Name: defaultprocessor
---
Injector:
SearchUpdateProcessor:
class: SearchUpdateImmediateProcessor
---
Name: messagequeueprocessor
Only:
ModuleExists: messagequeue
---
Injector:
SearchUpdateProcessor:
class: SearchUpdateMessageQueueProcessor
---
Name: queuedjobprocessor
Only:
ModuleExists: queuedjobs
---
Injector:
SearchUpdateProcessor:
class: SearchUpdateQueuedJobProcessor

View File

@ -10,36 +10,10 @@
*
* Pretty closely tied to the field structure of SearchIndex.
*
* TODO: The way we bind in is awful hacky. The config stuff in 3 will hopefully allow us to force ourselves as the very last
* augmentManipulation.
* TODO: The way we bind in is awful hacky.
*/
class SearchUpdater extends Object {
const AUTO = 0;
const DEFERRED = 1;
const IMMEDIATE = 2;
const DISABLED = 3;
/**
* How to schedule index updates at the end of the request.
*
* AUTO = IMMEDIATE if not _many_ dirty records, DEFERRED if _many_ where many is self::$auto_threshold
* DEFERRED = Use messagequeue to trigger updating indexes sometime soonish
* IMMEDIATE = Update indexes at end of request
* DISABLE = Dont update indexes
*
* If messagequeue module not installed, AUTO => IMMEDIATE and DEFERRED => DISABLED
*/
static $update_method = SearchUpdater::AUTO;
// How many items can be dirty before we defer updates
static $auto_threshold = 6;
// The indexing message queue
static $reindex_queue = "search_indexing";
static function set_reindexing_queue($queue) { self::$reindex_queue = $queue; }
/**
* Replace the database object with a subclass that captures all manipulations and passes them to us
*/
@ -75,35 +49,10 @@ class SearchUpdater extends Object {
DB::setConn($captured);
}
static $dirty = array(); static $dirtycount = 0;
static function add_dirty_ids($class, $statefulids, $index) {
$base = ClassInfo::baseDataClass($class);
$forclass = isset(self::$dirty[$base]) ? self::$dirty[$base] : array();
foreach ($statefulids as $statefulid) {
$id = $statefulid['id'];
$state = $statefulid['state']; $statekey = serialize($state);
if (!isset($forclass[$statekey])) {
$forclass[$statekey] = array('state' => $state, 'ids' => array($id => array($index)));
self::$dirtycount += 1;
}
else if (!isset($forclass[$statekey]['ids'][$id])) {
$forclass[$statekey]['ids'][$id] = array($index);
self::$dirtycount += 1;
}
else if (array_search($index, $forclass[$statekey]['ids'][$id]) === false) {
$forclass[$statekey]['ids'][$id][] = $index;
// dirty count stays the same
}
}
self::$dirty[$base] = $forclass;
}
static $registered = false;
/** @var SearchUpdateProcessor */
static $processor = null;
/**
* Called by the SearchManiplateCapture database adapter with every manipulation made against the database.
*
@ -167,7 +116,10 @@ class SearchUpdater extends Object {
// Then add then then to the global list to deal with later
foreach ($dirtyids as $dirtyclass => $ids) {
if ($ids) self::add_dirty_ids($dirtyclass, $ids, $index);
if ($ids) {
if (!self::$processor) self::$processor = Injector::inst()->create('SearchUpdateProcessor');
self::$processor->addDirtyIDs($dirtyclass, $ids, $index);
}
}
}
}
@ -177,8 +129,9 @@ class SearchUpdater extends Object {
// Don't do it if we're testing - there's no database connection outside the test methods, so we'd
// just get errors
$runningTests = class_exists('SapphireTest',false) && SapphireTest::is_running_test();
if (self::$dirty && !self::$registered && !(class_exists('SapphireTest',false) && SapphireTest::is_running_test())) {
if (self::$processor && !self::$registered && !$runningTests) {
register_shutdown_function(array("SearchUpdater", "flush_dirty_indexes"));
self::$registered = true;
}
@ -188,7 +141,7 @@ class SearchUpdater extends Object {
* Throw away the recorded dirty IDs without doing anything with them.
*/
static function clear_dirty_indexes() {
self::$dirty = array(); self::$dirtycount = 0;
self::$processor = null;
}
/**
@ -197,82 +150,9 @@ class SearchUpdater extends Object {
* just throw the dirty IDs away.
*/
static function flush_dirty_indexes() {
if (!self::$dirty) return;
$method = self::$update_method;
if (class_exists("MessageQueue")) {
if ($method == self::AUTO) $method = self::$dirtycount < self::$auto_threshold ? self::IMMEDIATE : self::DEFERRED;
}
else {
if ($method == self::AUTO) $method = self::IMMEDIATE;
elseif ($method == self::DEFERRED) $method = self::DISABLED;
}
switch ($method) {
case self::IMMEDIATE:
self::process_dirty_indexes(self::$dirty);
break;
case self::DEFERRED:
MessageQueue::send(
self::$reindex_queue,
new MethodInvocationMessage("SearchUpdater", "process_dirty_indexes", self::$dirty)
);
break;
case self::DISABLED:
// NOP
break;
}
self::clear_dirty_indexes();
}
/**
* Internal function. Process the passed list of dirty ids. Split from flush_dirty_indexes so it can be called both
* directly and via messagequeue message.
*/
static function process_dirty_indexes($dirty) {
$indexes = FullTextSearch::get_indexes();
$dirtyindexes = array();
$originalState = SearchVariant::current_state();
foreach ($dirty as $base => $statefulids) {
if (!$statefulids) continue;
foreach ($statefulids as $statefulid) {
$state = $statefulid['state'];
$ids = $statefulid['ids'];
SearchVariant::activate_state($state);
$objs = DataObject::get($base, '"'.$base.'"."ID" IN ('.implode(',', array_keys($ids)).')');
if ($objs) foreach ($objs as $obj) {
foreach ($ids[$obj->ID] as $index) {
if (!$indexes[$index]->variantStateExcluded($state)) {
$indexes[$index]->add($obj);
$dirtyindexes[$index] = $index;
}
}
unset($ids[$obj->ID]);
}
foreach ($ids as $id => $fromindexes) {
foreach ($fromindexes as $index) {
if (!$indexes[$index]->variantStateExcluded($state)) {
$indexes[$index]->delete($base, $id, $state);
$dirtyindexes[$index] = $index;
}
}
}
}
}
foreach ($dirtyindexes as $index) {
$indexes[$index]->commit();
}
SearchVariant::activate_state($originalState);
if (!self::$processor) return;
self::$processor->triggerProcessing();
self::$processor = null;
}
}

View File

@ -0,0 +1,7 @@
<?php
class SearchUpdateImmediateProcessor extends SearchUpdateProcessor {
public function triggerProcessing() {
$this->process();
}
}

View File

@ -0,0 +1,17 @@
<?php
class SearchUpdateMessageQueueProcessor extends SearchUpdateProcessor {
/**
* The MessageQueue to use when processing updates
* @config
* @var string
*/
private static $reindex_queue = "search_indexing";
public function triggerProcessing() {
MessageQueue::send(
Config::inst()->get('SearchMessageQueueUpdater', 'reindex_queue'),
new MethodInvocationMessage($this, "process")
);
}
}

View File

@ -0,0 +1,79 @@
<?php
abstract class SearchUpdateProcessor {
function __construct() {
$this->dirty = array();
$this->dirtyindexes = array();
}
public function addDirtyIDs($class, $statefulids, $index) {
$base = ClassInfo::baseDataClass($class);
$forclass = isset($this->dirty[$base]) ? $this->dirty[$base] : array();
foreach ($statefulids as $statefulid) {
$id = $statefulid['id'];
$state = $statefulid['state']; $statekey = serialize($state);
if (!isset($forclass[$statekey])) {
$forclass[$statekey] = array('state' => $state, 'ids' => array($id => array($index)));
}
else if (!isset($forclass[$statekey]['ids'][$id])) {
$forclass[$statekey]['ids'][$id] = array($index);
}
else if (array_search($index, $forclass[$statekey]['ids'][$id]) === false) {
$forclass[$statekey]['ids'][$id][] = $index;
// dirty count stays the same
}
}
$this->dirty[$base] = $forclass;
}
public function process() {
$indexes = FullTextSearch::get_indexes();
$originalState = SearchVariant::current_state();
foreach ($this->dirty as $base => $statefulids) {
if (!$statefulids) continue;
foreach ($statefulids as $statefulid) {
$state = $statefulid['state'];
$ids = $statefulid['ids'];
SearchVariant::activate_state($state);
$objs = DataObject::get($base, '"'.$base.'"."ID" IN ('.implode(',', array_keys($ids)).')');
if ($objs) foreach ($objs as $obj) {
foreach ($ids[$obj->ID] as $index) {
if (!$indexes[$index]->variantStateExcluded($state)) {
$indexes[$index]->add($obj);
$this->dirtyindexes[$index] = $index;
}
}
unset($ids[$obj->ID]);
}
foreach ($ids as $id => $fromindexes) {
foreach ($fromindexes as $index) {
if (!$indexes[$index]->variantStateExcluded($state)) {
$indexes[$index]->delete($base, $id, $state);
$this->dirtyindexes[$index] = $index;
}
}
}
}
}
SearchVariant::activate_state($originalState);
// Then commit all indexes
foreach ($this->dirtyindexes as $index) {
if ($indexes[$index]->commit() === false) return false;
}
}
abstract public function triggerProcessing();
}

View File

@ -0,0 +1,89 @@
<?php
class SearchUpdateQueuedJobProcessor extends SearchUpdateProcessor implements QueuedJob {
/**
* The QueuedJob queue to use when processing updates
* @config
* @var string
*/
private static $reindex_queue = 2; // QueuedJob::QUEUED;
protected $messages = array();
protected $totalSteps = 0;
protected $currentStep = 0;
protected $isComplete = false;
public function triggerProcessing() {
singleton('QueuedJobService')->queueJob($this);
}
public function getTitle() {
return "FullTextSearch Update Job";
}
public function getSignature() {
return md5(get_class($this) . time() . mt_rand(0, 100000));
}
public function getJobType() {
return Config::inst()->get('SearchUpdateQueuedJobProcessor', 'reindex_queue');
}
public function jobFinished() {
return $this->isComplete;
}
public function setup() {
$this->totalSteps = count(array_keys($this->dirty));
}
public function prepareForRestart() {
// NOP
}
public function afterComplete() {
// NOP
}
public function process() {
if (parent::process() === false) {
$this->currentStep += 1;
$this->totalSteps += 1;
}
else {
$this->currentStep = $this->totalSteps;
$this->isComplete = true;
}
}
public function getJobData() {
$data = new stdClass();
$data->totalSteps = $this->totalSteps;
$data->currentStep = $this->currentStep;
$data->isComplete = $this->isComplete;
$data->messages = $this->messages;
$data->jobData = new stdClass();
$data->jobData->dirty = $this->dirty;
$data->jobData->dirtyindexes = $this->dirtyindexes;
return $data;
}
public function setJobData($totalSteps, $currentStep, $isComplete, $jobData, $messages) {
$this->totalSteps = $totalSteps;
$this->currentStep = $currentStep;
$this->isComplete = $isComplete;
$this->messages = $messages;
$this->dirty = $jobData->dirty;
$this->dirtyindexes = $jobData->dirtyindexes;
}
public function addMessage($message, $severity='INFO') {
$severity = strtoupper($severity);
$this->messages[] = '[' . date('Y-m-d H:i:s') . "][$severity] $message";
}
}

View File

@ -60,10 +60,20 @@ class SearchUpdaterTest extends SapphireTest {
SearchUpdater::bind_manipulation_capture();
Config::nest();
Config::inst()->update('Injector', 'SearchUpdateProcessor', array(
'class' => 'SearchUpdateImmediateProcessor'
));
FullTextSearch::force_index_list(self::$index);
SearchUpdater::clear_dirty_indexes();
}
function tearDown() {
Config::unnest();
}
function testBasic() {
$item = new SearchUpdaterTest_Container();
$item->write();