diff --git a/_config/processor.yml b/_config/processor.yml new file mode 100644 index 0000000..39ae8f6 --- /dev/null +++ b/_config/processor.yml @@ -0,0 +1,22 @@ +--- +Name: defaultprocessor +--- +Injector: + SearchUpdateProcessor: + class: SearchUpdateImmediateProcessor +--- +Name: messagequeueprocessor +Only: + ModuleExists: messagequeue +--- +Injector: + SearchUpdateProcessor: + class: SearchUpdateMessageQueueProcessor +--- +Name: queuedjobprocessor +Only: + ModuleExists: queuedjobs +--- +Injector: + SearchUpdateProcessor: + class: SearchUpdateQueuedJobProcessor diff --git a/code/search/SearchUpdater.php b/code/search/SearchUpdater.php index 4a0026c..7847170 100644 --- a/code/search/SearchUpdater.php +++ b/code/search/SearchUpdater.php @@ -10,36 +10,10 @@ * * Pretty closely tied to the field structure of SearchIndex. * - * TODO: The way we bind in is awful hacky. The config stuff in 3 will hopefully allow us to force ourselves as the very last - * augmentManipulation. + * TODO: The way we bind in is awful hacky. */ class SearchUpdater extends Object { - const AUTO = 0; - const DEFERRED = 1; - const IMMEDIATE = 2; - const DISABLED = 3; - - /** - * How to schedule index updates at the end of the request. - * - * AUTO = IMMEDIATE if not _many_ dirty records, DEFERRED if _many_ where many is self::$auto_threshold - * DEFERRED = Use messagequeue to trigger updating indexes sometime soonish - * IMMEDIATE = Update indexes at end of request - * DISABLE = Dont update indexes - * - * If messagequeue module not installed, AUTO => IMMEDIATE and DEFERRED => DISABLED - */ - static $update_method = SearchUpdater::AUTO; - - // How many items can be dirty before we defer updates - static $auto_threshold = 6; - - // The indexing message queue - static $reindex_queue = "search_indexing"; - - static function set_reindexing_queue($queue) { self::$reindex_queue = $queue; } - /** * Replace the database object with a subclass that captures all manipulations and passes them to us */ @@ -75,35 +49,10 @@ class SearchUpdater extends Object { DB::setConn($captured); } - static $dirty = array(); static $dirtycount = 0; - - static function add_dirty_ids($class, $statefulids, $index) { - $base = ClassInfo::baseDataClass($class); - $forclass = isset(self::$dirty[$base]) ? self::$dirty[$base] : array(); - - foreach ($statefulids as $statefulid) { - $id = $statefulid['id']; - $state = $statefulid['state']; $statekey = serialize($state); - - if (!isset($forclass[$statekey])) { - $forclass[$statekey] = array('state' => $state, 'ids' => array($id => array($index))); - self::$dirtycount += 1; - } - else if (!isset($forclass[$statekey]['ids'][$id])) { - $forclass[$statekey]['ids'][$id] = array($index); - self::$dirtycount += 1; - } - else if (array_search($index, $forclass[$statekey]['ids'][$id]) === false) { - $forclass[$statekey]['ids'][$id][] = $index; - // dirty count stays the same - } - } - - self::$dirty[$base] = $forclass; - } - static $registered = false; - + /** @var SearchUpdateProcessor */ + static $processor = null; + /** * Called by the SearchManiplateCapture database adapter with every manipulation made against the database. * @@ -167,7 +116,10 @@ class SearchUpdater extends Object { // Then add then then to the global list to deal with later foreach ($dirtyids as $dirtyclass => $ids) { - if ($ids) self::add_dirty_ids($dirtyclass, $ids, $index); + if ($ids) { + if (!self::$processor) self::$processor = Injector::inst()->create('SearchUpdateProcessor'); + self::$processor->addDirtyIDs($dirtyclass, $ids, $index); + } } } } @@ -177,8 +129,9 @@ class SearchUpdater extends Object { // Don't do it if we're testing - there's no database connection outside the test methods, so we'd // just get errors + $runningTests = class_exists('SapphireTest',false) && SapphireTest::is_running_test(); - if (self::$dirty && !self::$registered && !(class_exists('SapphireTest',false) && SapphireTest::is_running_test())) { + if (self::$processor && !self::$registered && !$runningTests) { register_shutdown_function(array("SearchUpdater", "flush_dirty_indexes")); self::$registered = true; } @@ -188,7 +141,7 @@ class SearchUpdater extends Object { * Throw away the recorded dirty IDs without doing anything with them. */ static function clear_dirty_indexes() { - self::$dirty = array(); self::$dirtycount = 0; + self::$processor = null; } /** @@ -197,82 +150,9 @@ class SearchUpdater extends Object { * just throw the dirty IDs away. */ static function flush_dirty_indexes() { - if (!self::$dirty) return; - - $method = self::$update_method; - - if (class_exists("MessageQueue")) { - if ($method == self::AUTO) $method = self::$dirtycount < self::$auto_threshold ? self::IMMEDIATE : self::DEFERRED; - } - else { - if ($method == self::AUTO) $method = self::IMMEDIATE; - elseif ($method == self::DEFERRED) $method = self::DISABLED; - } - - switch ($method) { - case self::IMMEDIATE: - self::process_dirty_indexes(self::$dirty); - break; - case self::DEFERRED: - MessageQueue::send( - self::$reindex_queue, - new MethodInvocationMessage("SearchUpdater", "process_dirty_indexes", self::$dirty) - ); - break; - case self::DISABLED: - // NOP - break; - } - - self::clear_dirty_indexes(); - } - - /** - * Internal function. Process the passed list of dirty ids. Split from flush_dirty_indexes so it can be called both - * directly and via messagequeue message. - */ - static function process_dirty_indexes($dirty) { - $indexes = FullTextSearch::get_indexes(); - $dirtyindexes = array(); - - $originalState = SearchVariant::current_state(); - - foreach ($dirty as $base => $statefulids) { - if (!$statefulids) continue; - - foreach ($statefulids as $statefulid) { - $state = $statefulid['state']; - $ids = $statefulid['ids']; - - SearchVariant::activate_state($state); - - $objs = DataObject::get($base, '"'.$base.'"."ID" IN ('.implode(',', array_keys($ids)).')'); - if ($objs) foreach ($objs as $obj) { - foreach ($ids[$obj->ID] as $index) { - if (!$indexes[$index]->variantStateExcluded($state)) { - $indexes[$index]->add($obj); - $dirtyindexes[$index] = $index; - } - } - unset($ids[$obj->ID]); - } - - foreach ($ids as $id => $fromindexes) { - foreach ($fromindexes as $index) { - if (!$indexes[$index]->variantStateExcluded($state)) { - $indexes[$index]->delete($base, $id, $state); - $dirtyindexes[$index] = $index; - } - } - } - } - } - - foreach ($dirtyindexes as $index) { - $indexes[$index]->commit(); - } - - SearchVariant::activate_state($originalState); + if (!self::$processor) return; + self::$processor->triggerProcessing(); + self::$processor = null; } } diff --git a/code/search/processors/SearchUpdateImmediateProcessor.php b/code/search/processors/SearchUpdateImmediateProcessor.php new file mode 100644 index 0000000..2f80666 --- /dev/null +++ b/code/search/processors/SearchUpdateImmediateProcessor.php @@ -0,0 +1,7 @@ +process(); + } +} diff --git a/code/search/processors/SearchUpdateMessageQueueProcessor.php b/code/search/processors/SearchUpdateMessageQueueProcessor.php new file mode 100644 index 0000000..0538826 --- /dev/null +++ b/code/search/processors/SearchUpdateMessageQueueProcessor.php @@ -0,0 +1,17 @@ +get('SearchMessageQueueUpdater', 'reindex_queue'), + new MethodInvocationMessage($this, "process") + ); + } +} diff --git a/code/search/processors/SearchUpdateProcessor.php b/code/search/processors/SearchUpdateProcessor.php new file mode 100644 index 0000000..ced8725 --- /dev/null +++ b/code/search/processors/SearchUpdateProcessor.php @@ -0,0 +1,79 @@ +dirty = array(); + $this->dirtyindexes = array(); + } + + public function addDirtyIDs($class, $statefulids, $index) { + $base = ClassInfo::baseDataClass($class); + $forclass = isset($this->dirty[$base]) ? $this->dirty[$base] : array(); + + foreach ($statefulids as $statefulid) { + $id = $statefulid['id']; + $state = $statefulid['state']; $statekey = serialize($state); + + if (!isset($forclass[$statekey])) { + $forclass[$statekey] = array('state' => $state, 'ids' => array($id => array($index))); + } + else if (!isset($forclass[$statekey]['ids'][$id])) { + $forclass[$statekey]['ids'][$id] = array($index); + } + else if (array_search($index, $forclass[$statekey]['ids'][$id]) === false) { + $forclass[$statekey]['ids'][$id][] = $index; + // dirty count stays the same + } + } + + $this->dirty[$base] = $forclass; + } + + public function process() { + $indexes = FullTextSearch::get_indexes(); + $originalState = SearchVariant::current_state(); + + foreach ($this->dirty as $base => $statefulids) { + if (!$statefulids) continue; + + foreach ($statefulids as $statefulid) { + $state = $statefulid['state']; + $ids = $statefulid['ids']; + + SearchVariant::activate_state($state); + + $objs = DataObject::get($base, '"'.$base.'"."ID" IN ('.implode(',', array_keys($ids)).')'); + if ($objs) foreach ($objs as $obj) { + foreach ($ids[$obj->ID] as $index) { + if (!$indexes[$index]->variantStateExcluded($state)) { + $indexes[$index]->add($obj); + $this->dirtyindexes[$index] = $index; + } + } + unset($ids[$obj->ID]); + } + + foreach ($ids as $id => $fromindexes) { + foreach ($fromindexes as $index) { + if (!$indexes[$index]->variantStateExcluded($state)) { + $indexes[$index]->delete($base, $id, $state); + $this->dirtyindexes[$index] = $index; + } + } + } + } + } + + SearchVariant::activate_state($originalState); + + // Then commit all indexes + foreach ($this->dirtyindexes as $index) { + if ($indexes[$index]->commit() === false) return false; + } + } + + abstract public function triggerProcessing(); +} + + + diff --git a/code/search/processors/SearchUpdateQueuedJobProcessor.php b/code/search/processors/SearchUpdateQueuedJobProcessor.php new file mode 100644 index 0000000..909077e --- /dev/null +++ b/code/search/processors/SearchUpdateQueuedJobProcessor.php @@ -0,0 +1,89 @@ +queueJob($this); + } + + public function getTitle() { + return "FullTextSearch Update Job"; + } + + public function getSignature() { + return md5(get_class($this) . time() . mt_rand(0, 100000)); + } + + public function getJobType() { + return Config::inst()->get('SearchUpdateQueuedJobProcessor', 'reindex_queue'); + } + + public function jobFinished() { + return $this->isComplete; + } + + public function setup() { + $this->totalSteps = count(array_keys($this->dirty)); + } + + public function prepareForRestart() { + // NOP + } + + public function afterComplete() { + // NOP + } + + public function process() { + if (parent::process() === false) { + $this->currentStep += 1; + $this->totalSteps += 1; + } + else { + $this->currentStep = $this->totalSteps; + $this->isComplete = true; + } + } + + public function getJobData() { + $data = new stdClass(); + $data->totalSteps = $this->totalSteps; + $data->currentStep = $this->currentStep; + $data->isComplete = $this->isComplete; + $data->messages = $this->messages; + + $data->jobData = new stdClass(); + $data->jobData->dirty = $this->dirty; + $data->jobData->dirtyindexes = $this->dirtyindexes; + + return $data; + } + + public function setJobData($totalSteps, $currentStep, $isComplete, $jobData, $messages) { + $this->totalSteps = $totalSteps; + $this->currentStep = $currentStep; + $this->isComplete = $isComplete; + $this->messages = $messages; + + $this->dirty = $jobData->dirty; + $this->dirtyindexes = $jobData->dirtyindexes; + } + + public function addMessage($message, $severity='INFO') { + $severity = strtoupper($severity); + $this->messages[] = '[' . date('Y-m-d H:i:s') . "][$severity] $message"; + } +} diff --git a/tests/SearchUpdaterTest.php b/tests/SearchUpdaterTest.php index ab3de04..fe70838 100644 --- a/tests/SearchUpdaterTest.php +++ b/tests/SearchUpdaterTest.php @@ -60,10 +60,20 @@ class SearchUpdaterTest extends SapphireTest { SearchUpdater::bind_manipulation_capture(); + Config::nest(); + + Config::inst()->update('Injector', 'SearchUpdateProcessor', array( + 'class' => 'SearchUpdateImmediateProcessor' + )); + FullTextSearch::force_index_list(self::$index); SearchUpdater::clear_dirty_indexes(); } + function tearDown() { + Config::unnest(); + } + function testBasic() { $item = new SearchUpdaterTest_Container(); $item->write();