server = $server; $this->size = (int) $size; for($i = 0; $i < $this->size; ++$i){ $this->workerUsage[$i] = 0; $this->workers[$i] = new AsyncWorker($server->getLoader()); $this->workers[$i]->start(); } } public function submitTask(AsyncTask $task){ if(isset($this->tasks[$task->getTaskId()]) or $task->isGarbage()){ return; } $this->tasks[$task->getTaskId()] = $task; $selectedWorker = mt_rand(0, $this->size - 1); $selectedTasks = $this->workerUsage[$selectedWorker]; for($i = 0; $i < $this->size; ++$i){ if($this->workerUsage[$i] < $selectedTasks){ $selectedWorker = $i; $selectedTasks = $this->workerUsage[$i]; } } $this->workers[$selectedWorker]->stack($task); $this->workerUsage[$selectedWorker]++; $this->taskWorkers[$task->getTaskId()] = $selectedWorker; } private function removeTask(AsyncTask $task){ if(isset($this->taskWorkers[$task->getTaskId()])){ $this->workers[$w = $this->taskWorkers[$task->getTaskId()]]->unstack($task); $this->workerUsage[$w]--; } unset($this->tasks[$task->getTaskId()]); unset($this->taskWorkers[$task->getTaskId()]); } public function removeTasks(){ foreach($this->tasks as $task){ $this->removeTask($task); } for($i = 0; $i < $this->size; ++$i){ $this->workerUsage[$i] = 0; } $this->taskWorkers = []; $this->tasks = []; } public function collectTasks(){ foreach($this->tasks as $task){ if($task->isGarbage()){ $task->onCompletion($this->server); $this->removeTask($task); }elseif($task->isTerminated()){ $info = $task->getTerminationInfo(); $this->removeTask($task); $this->server->getLogger()->critical("Could not execute asynchronous task " . (new \ReflectionClass($task))->getShortName() . ": " . $info["message"]); $this->server->getLogger()->critical("On ".$info["scope"].", line ".$info["line"] .", ".$info["function"]."()"); } } } }