batches = array(); $this->setBatch(0); } protected function commitIndex($index) { $name = $index->getIndexName(); // If this is a resurrected batch then it's not necessary to commit the index // twice, assuming it has successfully been comitted before if(isset($this->completedIndexes[$name])) return true; // Commit index and mark as completed $result = parent::commitIndex($index); if($result) $this->completedIndexes[$name] = $name; return $result; } /** * Set the current batch index * * @param int $batch Index of the batch */ protected function setBatch($batch) { $this->currentBatch = $batch; $this->completedIndexes = array(); } protected function getSource() { if(isset($this->batches[$this->currentBatch])) { return $this->batches[$this->currentBatch]; } } /** * Process the current queue * * @return boolean */ public function process() { // Skip blank queues if(empty($this->batches)) return true; // Don't re-process completed queue if($this->currentBatch >= count($this->batches)) return true; // Process batch $result = parent::process(); // Advance to next batch if successful if($result) $this->setBatch($this->currentBatch + 1); return $result; } /** * Segments batches acording to the specified rules * * @param array $source Source input * @return array Batches */ protected function segmentBatches($source) { // Measure batch_size $batchSize = Config::inst()->get(get_class(), 'batch_size'); if($batchSize === 0) return array($source); $softCap = Config::inst()->get(get_class(), 'batch_soft_cap'); // Clear batches $batches = array(); $current = array(); $currentSize = 0; // Build batches from data foreach ($source as $base => $statefulids) { if (!$statefulids) continue; foreach ($statefulids as $stateKey => $statefulid) { $state = $statefulid['state']; $ids = $statefulid['ids']; if(!$ids) continue; // Extract items from $ids until empty while($ids) { // Estimate maximum number of items to take for this iteration, allowing for the soft cap $take = $batchSize - $currentSize; if(count($ids) <= $take + $softCap) $take += $softCap; $items = array_slice($ids, 0, $take, true); $ids = array_slice($ids, count($items), null, true); // Update batch $currentSize += count($items); $merge = array( $base => array( $stateKey => array( 'state' => $state, 'ids' => $items ) ) ); $current = $current ? array_merge_recursive($current, $merge) : $merge; if($currentSize >= $batchSize) { $batches[] = $current; $current = array(); $currentSize = 0; } } } } // Add incomplete batch if($currentSize) $batches[] = $current; return $batches; } public function batchData() { $this->batches = $this->segmentBatches($this->dirty); $this->setBatch(0); } public function triggerProcessing() { $this->batchData(); } }