The Pool class

(PECL pthreads >= 2.0.0)

Introduction

A Pool is a container for, and controller of, an adjustable number of Workers.

Pooling provides a higher level abstraction of the Worker functionality, including the management of references in the way required by pthreads.

Class synopsis

class Pool {
/* Properties */
protected $size;
protected $class;
protected $workers;
protected $ctor;
protected $last;
/* Methods */
public __construct(int $size, string $class = ?, array $ctor = ?)
public collect(Callable $collector = ?): int
public resize(int $size): void
public shutdown(): void
public submit(Threaded $task): int
public submitTo(int $worker, Threaded $task): int
}

Properties

size

maximum number of Workers this Pool can use

class

the class of the Worker

workers

references to Workers

ctor

the arguments for constructor of new Workers

last

offset in workers of the last Worker used

Table of Contents

add a note

User Contributed Notes 4 notes

up
4
meadowsjared at gmail dot com
9 years ago
Please note, when using the collect function, it's important that you extend the pool class so you can keep checking for finished threads until they're all done.<?phpclass TestWork extends Threaded {    protected $complete;    //$pData is the data sent to your worker thread to do it's job.    public function __construct($pData){        //transfer all the variables to local variables        $this->complete = false;        $this->testData = $pData;    }    //This is where all of your work will be done.    public function run(){        usleep(2000000); //sleep 2 seconds to simulate a large job        $this->complete = true;    }    public function isGarbage() {        return $this->complete;    }}class ExamplePool extends Pool{    public $data = array();    public function process()    {        // Run this loop as long as we have        // jobs in the pool        while (count($this->work)) {            $this->collect(function (TestWork $task) {                // If a task was marked as done                // collect its results                if ($task->isGarbage()) {                    $tmpObj = new stdclass();                    $tmpObj->complete = $task->complete;                    //this is how you get your completed data back out [accessed by $pool->process()]                    $this->data[] = $tmpObj;                }                return $task->isGarbage();            });        }        // All jobs are done        // we can shutdown the pool        $this->shutdown();        return $this->data;    }}$pool = new ExamplePool(3);$testData = 'asdf';for($i=0;$i<5;$i++) {    $pool->submit(new TestWork($testData));}$retArr = $pool->process(); //get all of the resultsecho '<pre>';print_r($retArr); //return the array of results (and maybe errors)echo '</pre>';?>
up
4
olavk
10 years ago
Simple example with Collectable (basically Thread meant for Pool) and Pool<?phpclass job extends Collectable {  public $val;  public function __construct($val){    // init some properties    $this->val = $val;  }  public function run(){    // do some work    $this->val = $this->val . file_get_contents('http://www.example.com/', null, null, 3, 20);    $this->setGarbage();  }}// At most 3 threads will work at once$p = new Pool(3);$tasks = array(  new job('0'),  new job('1'),  new job('2'),  new job('3'),  new job('4'),  new job('5'),  new job('6'),  new job('7'),  new job('8'),  new job('9'),  new job('10'),);// Add tasks to pool queueforeach ($tasks as $task) {  $p->submit($task);}// shutdown will wait for current queue to be completed$p->shutdown();// garbage collection check / read results$p->collect(function($checkingTask){  echo $checkingTask->val;  return $checkingTask->isGarbage();});?>
up
1
meadowsjared at gmail dot com
4 years ago
In this example, it shows how to use a pool to get an array of results, using pThreads v3.2.1 and php 7.3.23<?phpclass TestWork extends Threaded {//updated version that works with pThreads v3.2.1 and php 7.3.23    protected $complete;    //$pData is the data sent to your worker thread to do it's job.    public function __construct($pData) {        //transfer all the variables to local variables        $this->complete = false;        $this->testData = $pData;    }    //This is where all of your work will be done.    public function run() {        usleep(2000000); //sleep 2 seconds to simulate a large job        $this->complete = true;    }    public function isDone() {        return $this->complete;    }}class ExamplePool extends Pool {    public $data = array(); // used to return data after we're done    private $numTasks = 0; // counter used to know when we're done    /**     * override the submit function from the parent     * to keep track of our jobs     */    public function submit(Threaded $task) {        $this->numTasks++;        parent::submit($task);    }    /**     * used to wait until all workers are done     */    public function process() {        // Run this loop as long as we have        // jobs in the pool        while (count($this->data) < $this->numTasks) {            $this->collect(function (TestWork $task) {                // If a task was marked as done, collect its results                if ($task->isDone()) {                    $tmpObj = new stdclass();                    $tmpObj->complete = $task->complete;                    //this is how you get your completed data back out [accessed by $pool->process()]                    $this->data[] = $tmpObj;                }                return $task->isDone();            });        }        // All jobs are done        // we can shutdown the pool        $this->shutdown();        return $this->data;    }}$pool = new ExamplePool(3);$testData = 'asdf';for($i=0;$i<5;$i++) {    $pool->submit(new TestWork($testData));}$retArr = $pool->process(); //get all of the resultsecho '<pre>';print_r($retArr); //return the array of results (and maybe errors)echo '</pre>';?>
up
-6
fajan
11 years ago
Example class to demonstrate usage of Pool/Worker mechanism, also to show a few tricks & hints ;)<?phpclass Config extends Threaded{    // shared global object    protected $val=0, $val2=0;    protected function inc(){++$this->val;}    // protected synchronizes by-object    public function inc2(){++$this->val2;}    // no synchronization}class WorkerClass extends Worker{    protected static $worker_id_next = -1;    protected $worker_id;    protected $config;    public function __construct($config){        $this->worker_id = ++static::$worker_id_next;    // static members are not avalable in thread but are in 'main thread'        $this->config = $config;    }    public function run(){        global $config;        $config = $this->config;    // NOTE: setting by reference WON'T work        global $worker_id;        $worker_id = $this->worker_id;        echo "working context {$worker_id} is created!\n";        //$this->say_config();    // globally synchronized function.    }    protected function say_config(){    // 'protected' is synchronized by-object so WON'T work between multiple instances        global $config;        // you can use the shared $config object as synchronization source.        $config->synchronized(function() use (&$config){    // NOTE: you can use Closures here, but if you attach a Closure to a Threaded object it will be destroyed as can't be serialized            var_dump($config);        });    }}class Task extends Stackable{    // Stackable still exists, it's just somehow dissappeared from docs (probably by mistake). See older version's docs for more details.    protected $set;    public function __construct($set){        $this->set = $set;    }    public function run(){        global $worker_id;        echo "task is running in {$worker_id}!\n";        usleep(mt_rand(1,100)*100);        $config = $this->getConfig();        $val = $config->arr->shift();        $config->arr[] = $this->set;        for ($i = 0 ; $i < 1000; ++$i){            $config->inc();            $config->inc2();        }    }    public function getConfig(){        global $config;    // WorkerClass set this on thread's scope, can be reused by Tasks for additional asynch data source. (ie: connection pool or taskqueue to demultiplexer)        return $config;    }}$config = new Config;$config->arr = new \Threaded();$config->arr->merge(array(1,2,3,4,5,6));class PoolClass extends Pool{    public function worker_list(){        if ($this->workers !== null)            return array_keys($this->workers);        return null;    }}$pool = new PoolClass(3, 'WorkerClass', [$config] );$pool->worker_list();//$pool->submitTo(0,new Task(-10));    // submitTo DOES NOT try to create worker$spammed_id = -1;for ($i = 1; $i <= 100; ++$i){    // add some jobs    if ($spammed_id == -1 && ($x = $pool->worker_list())!= null && @$x[2]){        $spammed_id = $x[2];        echo "spamming worker {$spammed_id} with lots of tasks from now on\n";    }    if ($spammed_id != -1 && ($i % 5) == 0)    // every 5th job is routed to one worker, so it has 20% of the total jobs (with 3 workers it should do ~33%, not it has (33+20)%, so only delegate to worker if you plan to do balancing as well... )        $pool->submitTo($spammed_id,new Task(10*$i));        else        $pool->submit(new Task(10*$i));}$pool->shutdown();var_dump($config); // "val" is exactly 100000, "val2" is probably a bit less// also: if you disable the spammer, you'll that the order of the "arr" is random.?>
To Top