A classe Pool

(PECL pthreads >= 2.0.0)

Introdução

Um Pool é um contêiner e controlador de um número ajustável de Workers.

O pooling fornece uma abstração de nível mais alto da funcionalidade do Worker, incluindo o gerenciamento de referências da maneira exigida pelos pthreads.

Resumo da classe

class Pool {
/* Propriedades */
protected $size;
protected $class;
protected $workers;
protected $ctor;
protected $last;
/* Métodos */
public __construct(int $size, string $class = ?, array $ctor = ?)
public collect(Callable $collector = ?): int
public resize(int $size): void
public shutdown(): void
public submit(Threaded $task): int
public submitTo(int $worker, Threaded $task): int
}

Propriedades

size

número máximo de Workers que este pool pode usar

class

a classe do Worker

workers

referências a Workers

ctor

os argumentos para o construtor de novos Workers

last

deslocamento em workers do último Worker usado

Índice

adicione uma nota

Notas Enviadas por Usuários (em inglês) 4 notes

up
4
meadowsjared at gmail dot com
9 years ago
Please note, when using the collect function, it's important that you extend the pool class so you can keep checking for finished threads until they're all done.<?phpclass TestWork extends Threaded {    protected $complete;    //$pData is the data sent to your worker thread to do it's job.    public function __construct($pData){        //transfer all the variables to local variables        $this->complete = false;        $this->testData = $pData;    }    //This is where all of your work will be done.    public function run(){        usleep(2000000); //sleep 2 seconds to simulate a large job        $this->complete = true;    }    public function isGarbage() {        return $this->complete;    }}class ExamplePool extends Pool{    public $data = array();    public function process()    {        // Run this loop as long as we have        // jobs in the pool        while (count($this->work)) {            $this->collect(function (TestWork $task) {                // If a task was marked as done                // collect its results                if ($task->isGarbage()) {                    $tmpObj = new stdclass();                    $tmpObj->complete = $task->complete;                    //this is how you get your completed data back out [accessed by $pool->process()]                    $this->data[] = $tmpObj;                }                return $task->isGarbage();            });        }        // All jobs are done        // we can shutdown the pool        $this->shutdown();        return $this->data;    }}$pool = new ExamplePool(3);$testData = 'asdf';for($i=0;$i<5;$i++) {    $pool->submit(new TestWork($testData));}$retArr = $pool->process(); //get all of the resultsecho '<pre>';print_r($retArr); //return the array of results (and maybe errors)echo '</pre>';?>
up
4
olavk
10 years ago
Simple example with Collectable (basically Thread meant for Pool) and Pool<?phpclass job extends Collectable {  public $val;  public function __construct($val){    // init some properties    $this->val = $val;  }  public function run(){    // do some work    $this->val = $this->val . file_get_contents('http://www.example.com/', null, null, 3, 20);    $this->setGarbage();  }}// At most 3 threads will work at once$p = new Pool(3);$tasks = array(  new job('0'),  new job('1'),  new job('2'),  new job('3'),  new job('4'),  new job('5'),  new job('6'),  new job('7'),  new job('8'),  new job('9'),  new job('10'),);// Add tasks to pool queueforeach ($tasks as $task) {  $p->submit($task);}// shutdown will wait for current queue to be completed$p->shutdown();// garbage collection check / read results$p->collect(function($checkingTask){  echo $checkingTask->val;  return $checkingTask->isGarbage();});?>
up
1
meadowsjared at gmail dot com
4 years ago
In this example, it shows how to use a pool to get an array of results, using pThreads v3.2.1 and php 7.3.23<?phpclass TestWork extends Threaded {//updated version that works with pThreads v3.2.1 and php 7.3.23    protected $complete;    //$pData is the data sent to your worker thread to do it's job.    public function __construct($pData) {        //transfer all the variables to local variables        $this->complete = false;        $this->testData = $pData;    }    //This is where all of your work will be done.    public function run() {        usleep(2000000); //sleep 2 seconds to simulate a large job        $this->complete = true;    }    public function isDone() {        return $this->complete;    }}class ExamplePool extends Pool {    public $data = array(); // used to return data after we're done    private $numTasks = 0; // counter used to know when we're done    /**     * override the submit function from the parent     * to keep track of our jobs     */    public function submit(Threaded $task) {        $this->numTasks++;        parent::submit($task);    }    /**     * used to wait until all workers are done     */    public function process() {        // Run this loop as long as we have        // jobs in the pool        while (count($this->data) < $this->numTasks) {            $this->collect(function (TestWork $task) {                // If a task was marked as done, collect its results                if ($task->isDone()) {                    $tmpObj = new stdclass();                    $tmpObj->complete = $task->complete;                    //this is how you get your completed data back out [accessed by $pool->process()]                    $this->data[] = $tmpObj;                }                return $task->isDone();            });        }        // All jobs are done        // we can shutdown the pool        $this->shutdown();        return $this->data;    }}$pool = new ExamplePool(3);$testData = 'asdf';for($i=0;$i<5;$i++) {    $pool->submit(new TestWork($testData));}$retArr = $pool->process(); //get all of the resultsecho '<pre>';print_r($retArr); //return the array of results (and maybe errors)echo '</pre>';?>
up
-6
fajan
11 years ago
Example class to demonstrate usage of Pool/Worker mechanism, also to show a few tricks & hints ;)<?phpclass Config extends Threaded{    // shared global object    protected $val=0, $val2=0;    protected function inc(){++$this->val;}    // protected synchronizes by-object    public function inc2(){++$this->val2;}    // no synchronization}class WorkerClass extends Worker{    protected static $worker_id_next = -1;    protected $worker_id;    protected $config;    public function __construct($config){        $this->worker_id = ++static::$worker_id_next;    // static members are not avalable in thread but are in 'main thread'        $this->config = $config;    }    public function run(){        global $config;        $config = $this->config;    // NOTE: setting by reference WON'T work        global $worker_id;        $worker_id = $this->worker_id;        echo "working context {$worker_id} is created!\n";        //$this->say_config();    // globally synchronized function.    }    protected function say_config(){    // 'protected' is synchronized by-object so WON'T work between multiple instances        global $config;        // you can use the shared $config object as synchronization source.        $config->synchronized(function() use (&$config){    // NOTE: you can use Closures here, but if you attach a Closure to a Threaded object it will be destroyed as can't be serialized            var_dump($config);        });    }}class Task extends Stackable{    // Stackable still exists, it's just somehow dissappeared from docs (probably by mistake). See older version's docs for more details.    protected $set;    public function __construct($set){        $this->set = $set;    }    public function run(){        global $worker_id;        echo "task is running in {$worker_id}!\n";        usleep(mt_rand(1,100)*100);        $config = $this->getConfig();        $val = $config->arr->shift();        $config->arr[] = $this->set;        for ($i = 0 ; $i < 1000; ++$i){            $config->inc();            $config->inc2();        }    }    public function getConfig(){        global $config;    // WorkerClass set this on thread's scope, can be reused by Tasks for additional asynch data source. (ie: connection pool or taskqueue to demultiplexer)        return $config;    }}$config = new Config;$config->arr = new \Threaded();$config->arr->merge(array(1,2,3,4,5,6));class PoolClass extends Pool{    public function worker_list(){        if ($this->workers !== null)            return array_keys($this->workers);        return null;    }}$pool = new PoolClass(3, 'WorkerClass', [$config] );$pool->worker_list();//$pool->submitTo(0,new Task(-10));    // submitTo DOES NOT try to create worker$spammed_id = -1;for ($i = 1; $i <= 100; ++$i){    // add some jobs    if ($spammed_id == -1 && ($x = $pool->worker_list())!= null && @$x[2]){        $spammed_id = $x[2];        echo "spamming worker {$spammed_id} with lots of tasks from now on\n";    }    if ($spammed_id != -1 && ($i % 5) == 0)    // every 5th job is routed to one worker, so it has 20% of the total jobs (with 3 workers it should do ~33%, not it has (33+20)%, so only delegate to worker if you plan to do balancing as well... )        $pool->submitTo($spammed_id,new Task(10*$i));        else        $pool->submit(new Task(10*$i));}$pool->shutdown();var_dump($config); // "val" is exactly 100000, "val2" is probably a bit less// also: if you disable the spammer, you'll that the order of the "arr" is random.?>
To Top