API SearchUpdateQueuedJobProcessor now uses batching

This commit is contained in:
Damian Mooyman 2014-05-16 17:42:08 +12:00
parent 05c500b020
commit 9efe4baa16
6 changed files with 410 additions and 45 deletions

View File

@ -14,13 +14,16 @@ matrix:
env: DB=PGSQL CORE_RELEASE=3.1
- php: 5.3
env: DB=MYSQL CORE_RELEASE=3.1 SUBSITES=1
- php: 5.3
env: DB=MYSQL CORE_RELEASE=3.1 QUEUEDJOBS=1
before_script:
- composer self-update
- phpenv rehash
- git clone git://github.com/silverstripe-labs/silverstripe-travis-support.git ~/travis-support
- "if [ \"$SUBSITES\" = \"\" ]; then php ~/travis-support/travis_setup.php --source `pwd` --target ~/builds/ss; fi"
- "if [ \"$SUBSITES\" = \"\" -a \"$QUEUEDJOBS\" = \"\" ]; then php ~/travis-support/travis_setup.php --source `pwd` --target ~/builds/ss; fi"
- "if [ \"$SUBSITES\" = \"1\" ]; then php ~/travis-support/travis_setup.php --source `pwd` --target ~/builds/ss --require silverstripe/subsites; fi"
- "if [ \"$QUEUEDJOBS\" = \"1\" ]; then php ~/travis-support/travis_setup.php --source `pwd` --target ~/builds/ss --require silverstripe/queuedjobs; fi"
- cd ~/builds/ss
script:

View File

@ -573,4 +573,8 @@ abstract class SearchIndex_Recording extends SearchIndex {
function commit() { }
function getIndexName() {
return get_class($this);
}
}

View File

@ -0,0 +1,171 @@
<?php
/**
* Provides batching of search updates
*/
abstract class SearchUpdateBatchedProcessor extends SearchUpdateProcessor {
/**
* List of batches to be processed
*
* @var array
*/
protected $batches;
/**
* Pointer to index of $batches assigned to $current.
* Set to 0 (first index) if not started, or count + 1 if completed.
*
* @var int
*/
protected $currentBatch;
/**
* List of indexes successfully comitted in the current batch
*
* @var array
*/
protected $completedIndexes;
/**
* Maximum number of record-states to process in one batch.
* Set to zero to process all records in a single batch
*
* @config
* @var int
*/
private static $batch_size = 100;
/**
* Up to this number of additional ids can be added to any batch in order to reduce the number
* of batches
*
* @config
* @var int
*/
private static $batch_soft_cap = 10;
public function __construct() {
parent::__construct();
$this->batches = array();
$this->setBatch(0);
}
protected function commitIndex($index) {
$name = $index->getIndexName();
// If this is a resurrected batch then it's not necessary to commit the index
// twice, assuming it has successfully been comitted before
if(isset($this->completedIndexes[$name])) return true;
// Commit index and mark as completed
$result = parent::commitIndex($index);
if($result) $this->completedIndexes[$name] = $name;
return $result;
}
/**
* Set the current batch index
*
* @param int $batch Index of the batch
*/
protected function setBatch($batch) {
$this->currentBatch = $batch;
$this->completedIndexes = array();
}
protected function getSource() {
if(isset($this->batches[$this->currentBatch])) {
return $this->batches[$this->currentBatch];
}
}
/**
* Process the current queue
*
* @return boolean
*/
public function process() {
// Skip blank queues
if(empty($this->batches)) return true;
// Don't re-process completed queue
if($this->currentBatch >= count($this->batches)) return true;
// Process batch
$result = parent::process();
// Advance to next batch if successful
if($result) $this->setBatch($this->currentBatch + 1);
return $result;
}
/**
* Segments batches acording to the specified rules
*
* @param array $source Source input
* @return array Batches
*/
protected function segmentBatches($source) {
// Measure batch_size
$batchSize = Config::inst()->get(get_class(), 'batch_size');
if($batchSize === 0) return array($source);
$softCap = Config::inst()->get(get_class(), 'batch_soft_cap');
// Clear batches
$batches = array();
$current = array();
$currentSize = 0;
// Build batches from data
foreach ($source as $base => $statefulids) {
if (!$statefulids) continue;
foreach ($statefulids as $stateKey => $statefulid) {
$state = $statefulid['state'];
$ids = $statefulid['ids'];
if(!$ids) continue;
// Extract items from $ids until empty
while($ids) {
// Estimate maximum number of items to take for this iteration, allowing for the soft cap
$take = $batchSize - $currentSize;
if(count($ids) <= $take + $softCap) $take += $softCap;
$items = array_slice($ids, 0, $take, true);
$ids = array_slice($ids, count($items), null, true);
// Update batch
$currentSize += count($items);
$merge = array(
$base => array(
$stateKey => array(
'state' => $state,
'ids' => $items
)
)
);
$current = $current ? array_merge_recursive($current, $merge) : $merge;
if($currentSize >= $batchSize) {
$batches[] = $current;
$current = array();
$currentSize = 0;
}
}
}
}
// Add incomplete batch
if($currentSize) $batches[] = $current;
return $batches;
}
public function batchData() {
$this->batches = $this->segmentBatches($this->dirty);
$this->setBatch(0);
}
public function triggerProcessing() {
$this->batchData();
}
}

View File

@ -1,9 +1,33 @@
<?php
abstract class SearchUpdateProcessor {
function __construct() {
/**
* List of dirty records to process in format
*
* array(
* '$BaseClass' => array(
* '$State Key' => array(
* 'state' => array(
* 'key1' => 'value',
* 'key2' => 'value'
* ),
* 'ids' => array(
* '*id*' => array(
* '*Index Name 1*',
* '*Index Name 2*'
* )
* )
* )
* )
* )
*
* @var array
*/
protected $dirty;
public function __construct() {
$this->dirty = array();
$this->dirtyindexes = array();
}
public function addDirtyIDs($class, $statefulids, $index) {
@ -28,12 +52,18 @@ abstract class SearchUpdateProcessor {
$this->dirty[$base] = $forclass;
}
public function process() {
$indexes = FullTextSearch::get_indexes();
/**
* Generates the list of indexes to process for the dirty items
*
* @return array
*/
protected function prepareIndexes() {
$originalState = SearchVariant::current_state();
foreach ($this->dirty as $base => $statefulids) {
$dirtyIndexes = array();
$dirty = $this->getSource();
$indexes = FullTextSearch::get_indexes();
foreach ($dirty as $base => $statefulids) {
if (!$statefulids) continue;
foreach ($statefulids as $statefulid) {
@ -42,38 +72,66 @@ abstract class SearchUpdateProcessor {
SearchVariant::activate_state($state);
$objs = DataObject::get($base, '"'.$base.'"."ID" IN ('.implode(',', array_keys($ids)).')');
if ($objs) foreach ($objs as $obj) {
// Ensure that indexes for all new / updated objects are included
$objs = DataObject::get($base)->byIDs(array_keys($ids));
foreach ($objs as $obj) {
foreach ($ids[$obj->ID] as $index) {
if (!$indexes[$index]->variantStateExcluded($state)) {
$indexes[$index]->add($obj);
$this->dirtyindexes[$index] = $index;
$dirtyIndexes[$index] = $indexes[$index];
}
}
unset($ids[$obj->ID]);
}
// Generate list of records that do not exist and should be removed
foreach ($ids as $id => $fromindexes) {
foreach ($fromindexes as $index) {
if (!$indexes[$index]->variantStateExcluded($state)) {
$indexes[$index]->delete($base, $id, $state);
$this->dirtyindexes[$index] = $index;
$dirtyIndexes[$index] = $indexes[$index];
}
}
}
}
}
SearchVariant::activate_state($originalState);
return $dirtyIndexes;
}
/**
* Commits the specified index to the Solr service
*
* @param SolrIndex $index Index object
* @return bool Flag indicating success
*/
protected function commitIndex($index) {
return $index->commit() !== false;
}
/**
* Gets the record data source to process
*
* @return array
*/
protected function getSource() {
return $this->dirty;
}
// Then commit all indexes
foreach ($this->dirtyindexes as $index) {
if ($indexes[$index]->commit() === false) return false;
/**
* Process all indexes, returning true if successful
*
* @return bool Flag indicating success
*/
public function process() {
// Generate and commit all instances
$indexes = $this->prepareIndexes();
foreach ($indexes as $index) {
if(!$this->commitIndex($index)) return false;
}
return true;
}
abstract public function triggerProcessing();
}

View File

@ -1,7 +1,8 @@
<?php
if(!interface_exists('QueuedJob')) return;
class SearchUpdateQueuedJobProcessor extends SearchUpdateProcessor implements QueuedJob {
class SearchUpdateQueuedJobProcessor extends SearchUpdateBatchedProcessor implements QueuedJob {
/**
* The QueuedJob queue to use when processing updates
@ -11,11 +12,9 @@ class SearchUpdateQueuedJobProcessor extends SearchUpdateProcessor implements Qu
private static $reindex_queue = 2; // QueuedJob::QUEUED;
protected $messages = array();
protected $totalSteps = 0;
protected $currentStep = 0;
protected $isComplete = false;
public function triggerProcessing() {
parent::triggerProcessing();
singleton('QueuedJobService')->queueJob($this);
}
@ -32,11 +31,11 @@ class SearchUpdateQueuedJobProcessor extends SearchUpdateProcessor implements Qu
}
public function jobFinished() {
return $this->isComplete;
return $this->currentBatch >= count($this->batches);
}
public function setup() {
$this->totalSteps = count(array_keys($this->dirty));
// NOP
}
public function prepareForRestart() {
@ -47,39 +46,28 @@ class SearchUpdateQueuedJobProcessor extends SearchUpdateProcessor implements Qu
// NOP
}
public function process() {
if (parent::process() === false) {
$this->currentStep += 1;
$this->totalSteps += 1;
}
else {
$this->currentStep = $this->totalSteps;
$this->isComplete = true;
}
}
public function getJobData() {
$data = new stdClass();
$data->totalSteps = $this->totalSteps;
$data->currentStep = $this->currentStep;
$data->isComplete = $this->isComplete;
$data->totalSteps = count($this->batches);
$data->currentStep = $this->currentBatch;
$data->isComplete = $this->jobFinished();
$data->messages = $this->messages;
$data->jobData = new stdClass();
$data->jobData->dirty = $this->dirty;
$data->jobData->dirtyindexes = $this->dirtyindexes;
$data->jobData->batches = $this->batches;
$data->jobData->currentBatch = $this->currentBatch;
$data->jobData->completedIndexes = $this->completedIndexes;
return $data;
}
public function setJobData($totalSteps, $currentStep, $isComplete, $jobData, $messages) {
$this->totalSteps = $totalSteps;
$this->currentStep = $currentStep;
$this->isComplete = $isComplete;
$this->messages = $messages;
$this->dirty = $jobData->dirty;
$this->dirtyindexes = $jobData->dirtyindexes;
$this->batches = $jobData->batches;
$this->currentBatch = $jobData->currentBatch;
$this->completedIndexes = $jobData->completedIndexes;
}
public function addMessage($message, $severity='INFO') {

View File

@ -0,0 +1,141 @@
<?php
class BatchedProcessorTest_Object extends SiteTree implements TestOnly {
private static $db = array(
'TestText' => 'Varchar'
);
}
class BatchedProcessorTest_Index extends SearchIndex_Recording implements TestOnly {
function init() {
$this->addClass('BatchedProcessorTest_Object');
$this->addFilterField('TestText');
}
}
/**
* Tests {@see SearchUpdateQueuedJobProcessor}
*/
class BatchedProcessorTest extends SapphireTest {
protected $oldProcessor;
protected $extraDataObjects = array(
'BatchedProcessorTest_Object'
);
public function setUp() {
parent::setUp();
Config::nest();
if (!interface_exists('QueuedJob')) {
$this->markTestSkipped("These tests need the QueuedJobs module installed to run");
$this->skipTest = true;
}
Config::inst()->update('SearchUpdateBatchedProcessor', 'batch_size', 5);
Config::inst()->update('SearchUpdateBatchedProcessor', 'batch_soft_cap', 0);
Versioned::reading_stage("Stage");
$this->oldProcessor = SearchUpdater::$processor;
SearchUpdater::$processor = new SearchUpdateQueuedJobProcessor();
}
public function tearDown() {
SearchUpdater::$processor = $this->oldProcessor;
Config::unnest();
parent::tearDown();
}
protected function generateDirtyIds() {
$processor = SearchUpdater::$processor;
for($id = 1; $id <= 42; $id++) {
// Save to db
$object = new BatchedProcessorTest_Object();
$object->TestText = 'Object ' . $id;
$object->write();
// Add to index manually
$processor->addDirtyIDs(
'BatchedProcessorTest_Object',
array(array(
'id' => $id,
'state' => array('SearchVariantVersioned' => 'Stage')
)),
'BatchedProcessorTest_Index'
);
}
$processor->batchData();
return $processor;
}
/**
* Tests that large jobs are broken up into a suitable number of batches
*/
public function testBatching() {
$index = singleton('BatchedProcessorTest_Index');
$index->reset();
$processor = $this->generateDirtyIds();
// Check initial state
$data = $processor->getJobData();
$this->assertEquals(9, $data->totalSteps);
$this->assertEquals(0, $data->currentStep);
$this->assertEmpty($data->isComplete);
$this->assertEquals(0, count($index->getAdded()));
// Advance state
for($pass = 1; $pass <= 8; $pass++) {
$processor->process();
$data = $processor->getJobData();
$this->assertEquals($pass, $data->currentStep);
$this->assertEquals($pass * 5, count($index->getAdded()));
}
// Last run should have two hanging items
$processor->process();
$data = $processor->getJobData();
$this->assertEquals(9, $data->currentStep);
$this->assertEquals(42, count($index->getAdded()));
$this->assertTrue($data->isComplete);
}
/**
* Tests that the batch_soft_cap setting is properly respected
*/
public function testSoftCap() {
$index = singleton('BatchedProcessorTest_Index');
$index->reset();
$processor = $this->generateDirtyIds();
// Test that increasing the soft cap to 2 will reduce the number of batches
Config::inst()->update('SearchUpdateBatchedProcessor', 'batch_soft_cap', 2);
$processor->batchData();
$data = $processor->getJobData();
//Debug::dump($data);die;
$this->assertEquals(8, $data->totalSteps);
// A soft cap of 1 should not fit in the hanging two items
Config::inst()->update('SearchUpdateBatchedProcessor', 'batch_soft_cap', 1);
$processor->batchData();
$data = $processor->getJobData();
$this->assertEquals(9, $data->totalSteps);
// Extra large soft cap should fit both items
Config::inst()->update('SearchUpdateBatchedProcessor', 'batch_soft_cap', 4);
$processor->batchData();
$data = $processor->getJobData();
$this->assertEquals(8, $data->totalSteps);
// Process all data and ensure that all are processed adequately
for($pass = 1; $pass <= 8; $pass++) {
$processor->process();
}
$data = $processor->getJobData();
$this->assertEquals(8, $data->currentStep);
$this->assertEquals(42, count($index->getAdded()));
$this->assertTrue($data->isComplete);
}
}