From 9efe4baa1652d000358c07697d289afa524e2031 Mon Sep 17 00:00:00 2001 From: Damian Mooyman Date: Fri, 16 May 2014 17:42:08 +1200 Subject: [PATCH] API SearchUpdateQueuedJobProcessor now uses batching --- .travis.yml | 5 +- code/search/SearchIndex.php | 4 + .../SearchUpdateBatchedProcessor.php | 171 ++++++++++++++++++ .../processors/SearchUpdateProcessor.php | 94 ++++++++-- .../SearchUpdateQueuedJobProcessor.php | 40 ++-- tests/BatchedProcessorTest.php | 141 +++++++++++++++ 6 files changed, 410 insertions(+), 45 deletions(-) create mode 100644 code/search/processors/SearchUpdateBatchedProcessor.php create mode 100644 tests/BatchedProcessorTest.php diff --git a/.travis.yml b/.travis.yml index feb2700..42d4d1b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -14,13 +14,16 @@ matrix: env: DB=PGSQL CORE_RELEASE=3.1 - php: 5.3 env: DB=MYSQL CORE_RELEASE=3.1 SUBSITES=1 + - php: 5.3 + env: DB=MYSQL CORE_RELEASE=3.1 QUEUEDJOBS=1 before_script: - composer self-update - phpenv rehash - git clone git://github.com/silverstripe-labs/silverstripe-travis-support.git ~/travis-support - - "if [ \"$SUBSITES\" = \"\" ]; then php ~/travis-support/travis_setup.php --source `pwd` --target ~/builds/ss; fi" + - "if [ \"$SUBSITES\" = \"\" -a \"$QUEUEDJOBS\" = \"\" ]; then php ~/travis-support/travis_setup.php --source `pwd` --target ~/builds/ss; fi" - "if [ \"$SUBSITES\" = \"1\" ]; then php ~/travis-support/travis_setup.php --source `pwd` --target ~/builds/ss --require silverstripe/subsites; fi" + - "if [ \"$QUEUEDJOBS\" = \"1\" ]; then php ~/travis-support/travis_setup.php --source `pwd` --target ~/builds/ss --require silverstripe/queuedjobs; fi" - cd ~/builds/ss script: diff --git a/code/search/SearchIndex.php b/code/search/SearchIndex.php index 2a2527c..2e411b0 100644 --- a/code/search/SearchIndex.php +++ b/code/search/SearchIndex.php @@ -573,4 +573,8 @@ abstract class SearchIndex_Recording extends SearchIndex { function commit() { } + function getIndexName() { + return get_class($this); + } + } diff --git a/code/search/processors/SearchUpdateBatchedProcessor.php b/code/search/processors/SearchUpdateBatchedProcessor.php new file mode 100644 index 0000000..6b49c7c --- /dev/null +++ b/code/search/processors/SearchUpdateBatchedProcessor.php @@ -0,0 +1,171 @@ +batches = array(); + $this->setBatch(0); + } + + protected function commitIndex($index) { + $name = $index->getIndexName(); + + // If this is a resurrected batch then it's not necessary to commit the index + // twice, assuming it has successfully been comitted before + if(isset($this->completedIndexes[$name])) return true; + + // Commit index and mark as completed + $result = parent::commitIndex($index); + if($result) $this->completedIndexes[$name] = $name; + return $result; + } + + /** + * Set the current batch index + * + * @param int $batch Index of the batch + */ + protected function setBatch($batch) { + $this->currentBatch = $batch; + $this->completedIndexes = array(); + } + + protected function getSource() { + if(isset($this->batches[$this->currentBatch])) { + return $this->batches[$this->currentBatch]; + } + } + + /** + * Process the current queue + * + * @return boolean + */ + public function process() { + // Skip blank queues + if(empty($this->batches)) return true; + + // Don't re-process completed queue + if($this->currentBatch >= count($this->batches)) return true; + + // Process batch + $result = parent::process(); + + // Advance to next batch if successful + if($result) $this->setBatch($this->currentBatch + 1); + return $result; + } + + /** + * Segments batches acording to the specified rules + * + * @param array $source Source input + * @return array Batches + */ + protected function segmentBatches($source) { + // Measure batch_size + $batchSize = Config::inst()->get(get_class(), 'batch_size'); + if($batchSize === 0) return array($source); + $softCap = Config::inst()->get(get_class(), 'batch_soft_cap'); + + // Clear batches + $batches = array(); + $current = array(); + $currentSize = 0; + + // Build batches from data + foreach ($source as $base => $statefulids) { + if (!$statefulids) continue; + + foreach ($statefulids as $stateKey => $statefulid) { + $state = $statefulid['state']; + $ids = $statefulid['ids']; + if(!$ids) continue; + + // Extract items from $ids until empty + while($ids) { + // Estimate maximum number of items to take for this iteration, allowing for the soft cap + $take = $batchSize - $currentSize; + if(count($ids) <= $take + $softCap) $take += $softCap; + $items = array_slice($ids, 0, $take, true); + $ids = array_slice($ids, count($items), null, true); + + // Update batch + $currentSize += count($items); + $merge = array( + $base => array( + $stateKey => array( + 'state' => $state, + 'ids' => $items + ) + ) + ); + $current = $current ? array_merge_recursive($current, $merge) : $merge; + if($currentSize >= $batchSize) { + $batches[] = $current; + $current = array(); + $currentSize = 0; + } + } + } + } + // Add incomplete batch + if($currentSize) $batches[] = $current; + + return $batches; + } + + public function batchData() { + $this->batches = $this->segmentBatches($this->dirty); + $this->setBatch(0); + } + + public function triggerProcessing() { + $this->batchData(); + } +} diff --git a/code/search/processors/SearchUpdateProcessor.php b/code/search/processors/SearchUpdateProcessor.php index ced8725..71c40a3 100644 --- a/code/search/processors/SearchUpdateProcessor.php +++ b/code/search/processors/SearchUpdateProcessor.php @@ -1,9 +1,33 @@ array( + * '$State Key' => array( + * 'state' => array( + * 'key1' => 'value', + * 'key2' => 'value' + * ), + * 'ids' => array( + * '*id*' => array( + * '*Index Name 1*', + * '*Index Name 2*' + * ) + * ) + * ) + * ) + * ) + * + * @var array + */ + protected $dirty; + + public function __construct() { $this->dirty = array(); - $this->dirtyindexes = array(); } public function addDirtyIDs($class, $statefulids, $index) { @@ -28,12 +52,18 @@ abstract class SearchUpdateProcessor { $this->dirty[$base] = $forclass; } - - public function process() { - $indexes = FullTextSearch::get_indexes(); + + /** + * Generates the list of indexes to process for the dirty items + * + * @return array + */ + protected function prepareIndexes() { $originalState = SearchVariant::current_state(); - - foreach ($this->dirty as $base => $statefulids) { + $dirtyIndexes = array(); + $dirty = $this->getSource(); + $indexes = FullTextSearch::get_indexes(); + foreach ($dirty as $base => $statefulids) { if (!$statefulids) continue; foreach ($statefulids as $statefulid) { @@ -42,38 +72,66 @@ abstract class SearchUpdateProcessor { SearchVariant::activate_state($state); - $objs = DataObject::get($base, '"'.$base.'"."ID" IN ('.implode(',', array_keys($ids)).')'); - if ($objs) foreach ($objs as $obj) { + // Ensure that indexes for all new / updated objects are included + $objs = DataObject::get($base)->byIDs(array_keys($ids)); + foreach ($objs as $obj) { foreach ($ids[$obj->ID] as $index) { if (!$indexes[$index]->variantStateExcluded($state)) { $indexes[$index]->add($obj); - $this->dirtyindexes[$index] = $index; + $dirtyIndexes[$index] = $indexes[$index]; } } unset($ids[$obj->ID]); } + // Generate list of records that do not exist and should be removed foreach ($ids as $id => $fromindexes) { foreach ($fromindexes as $index) { if (!$indexes[$index]->variantStateExcluded($state)) { $indexes[$index]->delete($base, $id, $state); - $this->dirtyindexes[$index] = $index; + $dirtyIndexes[$index] = $indexes[$index]; } } } } } - + SearchVariant::activate_state($originalState); + return $dirtyIndexes; + } + + /** + * Commits the specified index to the Solr service + * + * @param SolrIndex $index Index object + * @return bool Flag indicating success + */ + protected function commitIndex($index) { + return $index->commit() !== false; + } + + /** + * Gets the record data source to process + * + * @return array + */ + protected function getSource() { + return $this->dirty; + } - // Then commit all indexes - foreach ($this->dirtyindexes as $index) { - if ($indexes[$index]->commit() === false) return false; + /** + * Process all indexes, returning true if successful + * + * @return bool Flag indicating success + */ + public function process() { + // Generate and commit all instances + $indexes = $this->prepareIndexes(); + foreach ($indexes as $index) { + if(!$this->commitIndex($index)) return false; } + return true; } abstract public function triggerProcessing(); } - - - diff --git a/code/search/processors/SearchUpdateQueuedJobProcessor.php b/code/search/processors/SearchUpdateQueuedJobProcessor.php index 909077e..d8d37f8 100644 --- a/code/search/processors/SearchUpdateQueuedJobProcessor.php +++ b/code/search/processors/SearchUpdateQueuedJobProcessor.php @@ -1,7 +1,8 @@ queueJob($this); } @@ -32,11 +31,11 @@ class SearchUpdateQueuedJobProcessor extends SearchUpdateProcessor implements Qu } public function jobFinished() { - return $this->isComplete; + return $this->currentBatch >= count($this->batches); } public function setup() { - $this->totalSteps = count(array_keys($this->dirty)); + // NOP } public function prepareForRestart() { @@ -47,39 +46,28 @@ class SearchUpdateQueuedJobProcessor extends SearchUpdateProcessor implements Qu // NOP } - public function process() { - if (parent::process() === false) { - $this->currentStep += 1; - $this->totalSteps += 1; - } - else { - $this->currentStep = $this->totalSteps; - $this->isComplete = true; - } - } - public function getJobData() { $data = new stdClass(); - $data->totalSteps = $this->totalSteps; - $data->currentStep = $this->currentStep; - $data->isComplete = $this->isComplete; + $data->totalSteps = count($this->batches); + $data->currentStep = $this->currentBatch; + $data->isComplete = $this->jobFinished(); $data->messages = $this->messages; $data->jobData = new stdClass(); - $data->jobData->dirty = $this->dirty; - $data->jobData->dirtyindexes = $this->dirtyindexes; + $data->jobData->batches = $this->batches; + $data->jobData->currentBatch = $this->currentBatch; + $data->jobData->completedIndexes = $this->completedIndexes; return $data; } public function setJobData($totalSteps, $currentStep, $isComplete, $jobData, $messages) { - $this->totalSteps = $totalSteps; - $this->currentStep = $currentStep; $this->isComplete = $isComplete; $this->messages = $messages; - $this->dirty = $jobData->dirty; - $this->dirtyindexes = $jobData->dirtyindexes; + $this->batches = $jobData->batches; + $this->currentBatch = $jobData->currentBatch; + $this->completedIndexes = $jobData->completedIndexes; } public function addMessage($message, $severity='INFO') { diff --git a/tests/BatchedProcessorTest.php b/tests/BatchedProcessorTest.php new file mode 100644 index 0000000..e8f5d09 --- /dev/null +++ b/tests/BatchedProcessorTest.php @@ -0,0 +1,141 @@ + 'Varchar' + ); +} + +class BatchedProcessorTest_Index extends SearchIndex_Recording implements TestOnly { + function init() { + $this->addClass('BatchedProcessorTest_Object'); + $this->addFilterField('TestText'); + } +} + +/** + * Tests {@see SearchUpdateQueuedJobProcessor} + */ +class BatchedProcessorTest extends SapphireTest { + + protected $oldProcessor; + + protected $extraDataObjects = array( + 'BatchedProcessorTest_Object' + ); + + public function setUp() { + parent::setUp(); + Config::nest(); + + if (!interface_exists('QueuedJob')) { + $this->markTestSkipped("These tests need the QueuedJobs module installed to run"); + $this->skipTest = true; + } + + Config::inst()->update('SearchUpdateBatchedProcessor', 'batch_size', 5); + Config::inst()->update('SearchUpdateBatchedProcessor', 'batch_soft_cap', 0); + + Versioned::reading_stage("Stage"); + + $this->oldProcessor = SearchUpdater::$processor; + SearchUpdater::$processor = new SearchUpdateQueuedJobProcessor(); + } + + public function tearDown() { + + SearchUpdater::$processor = $this->oldProcessor; + Config::unnest(); + parent::tearDown(); + } + + protected function generateDirtyIds() { + $processor = SearchUpdater::$processor; + for($id = 1; $id <= 42; $id++) { + // Save to db + $object = new BatchedProcessorTest_Object(); + $object->TestText = 'Object ' . $id; + $object->write(); + // Add to index manually + $processor->addDirtyIDs( + 'BatchedProcessorTest_Object', + array(array( + 'id' => $id, + 'state' => array('SearchVariantVersioned' => 'Stage') + )), + 'BatchedProcessorTest_Index' + ); + } + $processor->batchData(); + return $processor; + } + + /** + * Tests that large jobs are broken up into a suitable number of batches + */ + public function testBatching() { + $index = singleton('BatchedProcessorTest_Index'); + $index->reset(); + $processor = $this->generateDirtyIds(); + + // Check initial state + $data = $processor->getJobData(); + $this->assertEquals(9, $data->totalSteps); + $this->assertEquals(0, $data->currentStep); + $this->assertEmpty($data->isComplete); + $this->assertEquals(0, count($index->getAdded())); + + // Advance state + for($pass = 1; $pass <= 8; $pass++) { + $processor->process(); + $data = $processor->getJobData(); + $this->assertEquals($pass, $data->currentStep); + $this->assertEquals($pass * 5, count($index->getAdded())); + } + + // Last run should have two hanging items + $processor->process(); + $data = $processor->getJobData(); + $this->assertEquals(9, $data->currentStep); + $this->assertEquals(42, count($index->getAdded())); + $this->assertTrue($data->isComplete); + } + + /** + * Tests that the batch_soft_cap setting is properly respected + */ + public function testSoftCap() { + $index = singleton('BatchedProcessorTest_Index'); + $index->reset(); + $processor = $this->generateDirtyIds(); + + // Test that increasing the soft cap to 2 will reduce the number of batches + Config::inst()->update('SearchUpdateBatchedProcessor', 'batch_soft_cap', 2); + $processor->batchData(); + $data = $processor->getJobData(); + //Debug::dump($data);die; + $this->assertEquals(8, $data->totalSteps); + + // A soft cap of 1 should not fit in the hanging two items + Config::inst()->update('SearchUpdateBatchedProcessor', 'batch_soft_cap', 1); + $processor->batchData(); + $data = $processor->getJobData(); + $this->assertEquals(9, $data->totalSteps); + + // Extra large soft cap should fit both items + Config::inst()->update('SearchUpdateBatchedProcessor', 'batch_soft_cap', 4); + $processor->batchData(); + $data = $processor->getJobData(); + $this->assertEquals(8, $data->totalSteps); + + // Process all data and ensure that all are processed adequately + for($pass = 1; $pass <= 8; $pass++) { + $processor->process(); + } + $data = $processor->getJobData(); + $this->assertEquals(8, $data->currentStep); + $this->assertEquals(42, count($index->getAdded())); + $this->assertTrue($data->isComplete); + } +}