Pool::collect

(PECL pthreads >= 2.0.0)

Pool::collectCollect references to completed tasks

Description

public Pool::collect(Callable $collector = ?): int

Allows the pool to collect references determined to be garbage by the optionally given collector.

Parameters

collector

A Callable collector that returns a boolean on whether the task can be collected or not. Only in rare cases should a custom collector need to be used.

Return Values

The number of remaining tasks in the pool to be collected.

Changelog

Version Description
PECL pthreads 3.0.0 An integer is now returned, and the collector parameter is now optional.

Examples

Example #1 A basic example of Pool::collect()

<?php
$pool
= new Pool(4);

for (
$i = 0; $i < 15; ++$i) {
$pool->submit(new class extends Threaded {});
}

while (
$pool->collect()); // blocks until all tasks have finished executing

$pool->shutdown();

add a note

User Contributed Notes 4 notes

up
3
meadowsjared at gmail dot com
4 years ago
In this example, it shows how to use a Threaded with a pool to get an array of results, using pThreads v3.2.1 and php 7.3.23<?phpclass TestWork extends Threaded {//updated version that works with pThreads v3.2.1 and php 7.3.23    protected $complete;    //$pData is the data sent to your worker thread to do it's job.    public function __construct($pData) {        //transfer all the variables to local variables        $this->complete = false;        $this->testData = $pData;    }    //This is where all of your work will be done.    public function run() {        usleep(2000000); //sleep 2 seconds to simulate a large job        $this->complete = true;    }    public function isDone() {        return $this->complete;    }}class ExamplePool extends Pool {    public $data = array(); // used to return data after we're done    private $numTasks = 0; // counter used to know when we're done    /**     * override the submit function from the parent     * to keep track of our jobs     */    public function submit(Threaded $task) {        $this->numTasks++;        parent::submit($task);    }    /**     * used to wait until all workers are done     */    public function process() {        // Run this loop as long as we have        // jobs in the pool        while (count($this->data) < $this->numTasks) {            $this->collect(function (TestWork $task) {                // If a task was marked as done, collect its results                if ($task->isDone()) {                    $tmpObj = new stdclass();                    $tmpObj->complete = $task->complete;                    //this is how you get your completed data back out [accessed by $pool->process()]                    $this->data[] = $tmpObj;                }                return $task->isDone();            });        }        // All jobs are done        // we can shutdown the pool        $this->shutdown();        return $this->data;    }}$pool = new ExamplePool(3);$testData = 'asdf';for($i=0;$i<5;$i++) {    $pool->submit(new TestWork($testData));}$retArr = $pool->process(); //get all of the resultsecho '<pre>';print_r($retArr); //return the array of results (and maybe errors)echo '</pre>';?>
up
4
your dot brother dot t at hotmail dot com
10 years ago
The example code crashes and made me waste 2 working daysFirst of all, `Stackable` has no attribute named $worker or it's access method made it inaccessible.Secondly, `Stackable` also doesn't have `getThreadId()` . It's best practice to use `Thread` class for realization of a thread since it has more control functions. It's better to use `Stackable` for object storage and use it's `run()` as its initialization.The working example is<?php    class MyWork extends Thread {        protected $complete;        public function __construct() {            $this->complete = false;        }        public function run() {            printf(                "Hello from %s in Thread #%lu\n",                __CLASS__, $this->getThreadId());            $this->complete = true;        }        public function isComplete() {            return $this->complete;        }    }    class Something {}    class MyWorker extends Worker {        public function __construct(Something $something) {            $this->something = $something;        }        public function run() {            /** ... **/        }    }    $pool = new Pool(8, \MyWorker::class, [new Something()]);    $pool->submit(new MyWork());    usleep(1000);    $pool->collect(function($work){        return $work->isComplete();    });    var_dump($pool);?>
up
3
meadowsjared at gmail dot com
9 years ago
Please note, when using the collect function, it's important that you extend the pool class so you can keep checking for finished threads until they're all done.<?phpclass TestWork extends Threaded {    protected $complete;    //$pData is the data sent to your worker thread to do it's job.    public function __construct($pData){        //transfer all the variables to local variables        $this->complete = false;        $this->testData = $pData;    }    //This is where all of your work will be done.    public function run(){        usleep(2000000); //sleep 2 seconds to simulate a large job        $this->complete = true;    }    public function isGarbage() {        return $this->complete;    }}class ExamplePool extends Pool{    public $data = array();    public function process()    {        // Run this loop as long as we have        // jobs in the pool        while (count($this->work)) {            $this->collect(function (TestWork $task) {                // If a task was marked as done                // collect its results                if ($task->isGarbage()) {                    $tmpObj = new stdclass();                    $tmpObj->complete = $task->complete;                    //this is how you get your completed data back out [accessed by $pool->process()]                    $this->data[] = $tmpObj;                }                return $task->isGarbage();            });        }        // All jobs are done        // we can shutdown the pool        $this->shutdown();        return $this->data;    }}$pool = new ExamplePool(3);$testData = 'asdf';for($i=0;$i<5;$i++) {    $pool->submit(new TestWork($testData));}$retArr = $pool->process(); //get all of the resultsecho '<pre>';print_r($retArr); //return the array of results (and maybe errors)echo '</pre>';?>
up
0
l00k at protonmail dot com
7 years ago
This example demonstrates various aspects of MTP with pthreads - esspecially worth of note is bidirectional communication with child threads.I could not find anything about that so I would like to present you my research result.<?phpclass Model{        public $id;    public $value;    }class Connection    extends Worker{        protected static $link;            public function __construct($hostname, $username, $password, $database, $port = 3306)    {        $this->hostname = $hostname;        $this->username = $username;        $this->password = $password;        $this->database = $database;        $this->port = $port;    }        public function getConnection()    {        if(!self::$link)        {            echo 'Thread: '. $this->getThreadId() ." Connecting to db\n";            self::$link = new \PDO(...);        }                return self::$link;    }    }/** @property Connection $worker */class QueryTask    extends Threaded{        public $data;    public $result;        protected $_complete;            public function __construct(Model $data)    {        $this->_complete = false;        $this->data = $data;    }        public function run()    {        /** @var \PDO $pdo */        $pdo = $this->worker->getConnection();                $text = 'Thread: '. $this->worker->getThreadId() .' Job: '. $this->data->id .' Data: '. $this->data->value;                $t = microtime(true);                $stmt = $pdo->prepare("            INSERT INTO `test` (`id`, `text`) VALUES (NULL, '". $text ."')        ");        $stmt->execute();                $dt = microtime(true) - $t;                $result = (int) $stmt->rowCount();                echo $text .' Result: '. $result .' Exec time: '. $dt ."s\n";                $this->result = $result;        $this->_complete = true;    }        public function isGarbage() : bool    {        return $this->_complete;    }    }$t = microtime(true);// uruchomienie$pool = new Pool(5, 'Connection', [ 'localhost', 'root', 'password', 'test' ]);// zadania$tasks = 10;for($i=0; $i<$tasks; ++$i){    $object = new Model();    $object->id = $i;    $object->value = rand();    $pool->submit(new QueryTask($object));}// oczekiwanie na zakonczenie$data = [];while(1){    $newData = [];    $pool->collect(function(QueryTask $task) use (&$newData) {        if($task->isGarbage())        {            $tmpObj = new stdclass();            $tmpObj->complete = $task->complete;                        $newData[ $task->data->id ] = $task->data->value;        }                return $task->isGarbage();    });        $data = array_merge($data, $newData);        if(count($data) == $tasks)        break;        usleep(100000);}var_dump($data);?>Result:Thread: 6796 Connecting to dbThread: 3156 Connecting to dbThread: 9040 Connecting to dbThread: 7748 Connecting to dbThread: 8836 Connecting to dbJob: 0 Done in: 0.0070011615753174sJob: 4 Done in: 0.0069999694824219sJob: 2 Done in: 0.0090010166168213sJob: 3 Done in: 0.0090010166168213sJob: 1 Done in: 0.003000020980835sJob: 5 Done in: 0.0069999694824219sJob: 7 Done in: 0.0079998970031738sJob: 6 Done in: 0.0049998760223389sJob: 9 Done in: 0.0079998970031738sJob: 8 Done in: 0.0069999694824219sarray(10) {  [0] =>  int(17730)  [1] =>  int(18771)  [2] =>  int(12944)  [3] =>  int(6025)  [4] =>  int(29582)  [5] =>  int(10159)  [6] =>  int(26556)  [7] =>  int(9029)  [8] =>  int(15002)  [9] =>  int(4043)}Things worth noting here:1. Constructing of 5 workers for 10 tasks. 5 last task are runned on existing threads with already set up connection to db.2. You can "send" data to thread by creating new task and submiting it.3. You can retrive result by collect function.4. You can pass simple object to task constructor.
To Top