The parallel\Channel class

(0.9.0)

Unbuffered Channels

An unbuffered channel will block on calls to parallel\Channel::send() until there is a receiver, and block on calls to parallel\Channel::recv() until there is a sender. This means an unbuffered channel is not only a way to share data among tasks but also a simple method of synchronization.

An unbuffered channel is the fastest way to share data among tasks, requiring the least copying.

Buffered Channels

A buffered channel will not block on calls to parallel\Channel::send() until capacity is reached, calls to parallel\Channel::recv() will block until there is data in the buffer.

Closures over Channels

A powerful feature of parallel channels is that they allow the exchange of closures between tasks (and runtimes).

When a closure is sent over a channel the closure is buffered, it doesn't change the buffering of the channel transmitting the closure, but it does effect the static scope inside the closure: The same closure sent to different runtimes, or the same runtime, will not share their static scope.

This means that whenever a closure is executed that was transmitted by a channel, static state will be as it was when the closure was buffered.

Anonymous Channels

The anonymous channel constructor allows the programmer to avoid assigning names to every channel: parallel will generate a unique name for anonymous channels.

Короткий огляд класу

final class parallel\Channel {
/* Anonymous Constructor */
public __construct()
public __construct(int $capacity)
/* Access */
public make(string $name): Channel
public make(string $name, int $capacity): Channel
public open(string $name): Channel
/* Sharing */
public recv(): mixed
public send(mixed $value): void
/* Closing */
public close(): void
/* Constant for Infinitely Buffered */
const Infinite;
}

Зміст

add a note

User Contributed Notes 5 notes

up
4
hdvianna
5 years ago
This is an example of using a channel to produce data for consumers. In this example, the producer Runtime instance will send the time in seconds in which the consumers shall sleep.<?phpuse parallel\{Runtime, Channel};main($argv);function main(array $argv){    if (count($argv) !== 3) {        echo "Type: hello-parallel.php <number-of-tasks> <maximum-time-of-sleep (in seconds)>" . PHP_EOL;        echo "Example: hello-parallel.php 5 3" . PHP_EOL;        die;    } else {        $numberOfTasks = intval($argv[1]);        $maximumTimeOfSleep = intval($argv[2]);        $t1 = microtime(true);        parallelize($numberOfTasks, $maximumTimeOfSleep);        $endTime = microtime(true) - $t1;        echo PHP_EOL."Finished $numberOfTasks task(s) in {$endTime}s".PHP_EOL;    }}function parallelize(int $numberOfTasks, int $maximumTimeOfSleep){    $channel = new Channel();    $taskIds = array_map(function () use ($maximumTimeOfSleep) {        return $id = uniqid("task::");    }, range(0, $numberOfTasks - 1));    $timesToSleep = array_map(function () use ($maximumTimeOfSleep) {        return rand(1, $maximumTimeOfSleep);    }, $taskIds);    $producer = new Runtime();    $producerFuture = $producer->run(function (Channel $channel, array $timesToSleep) {        foreach ($timesToSleep as $timeToSleep) {            $channel->send($timeToSleep);        }    }, [$channel, $timesToSleep]);    $consumerFutures = array_map(function (string $id) use ($channel) {        $runtime = new Runtime();        return $runtime->run(function (string $id, Channel $channel) {            $timeToSleep = $channel->recv();            echo "Hello from $id. I will sleep for $timeToSleep second(s).".PHP_EOL;            sleep($timeToSleep);            echo "$id slept for $timeToSleep second(s).".PHP_EOL;            return $timeToSleep;        }, [$id, $channel]);    }, $taskIds);    wait($consumerFutures);    wait([$producerFuture]);}function wait(array $futures){    return array_map(function ($future) {        return $future->value();    }, $futures);}
up
2
rustysun
5 years ago
an example used unbuffered channel.<?phpuse parallel\{Channel,Runtime};$sum=function(array $a, Channel $ch) {    $sum=0;    foreach ($a as $v) {        $sum+=$v;    }    $ch->send($sum);};try {    $a=[7, 2, 8, 1, 4, 0, 9, 10];    //unbuffered channel    $runtime=new Runtime;    $ch2=new Channel;    $runtime->run($sum, [array_slice($a, 0, $num), $ch2]);    $runtime->run($sum, [array_slice($a, $num), $ch2]);    //receive from channel    $x=$ch2->recv();    $y=$ch2->recv();    $ch2->close();    echo "\nch2:", $x, "\t", $y, "\t", $x + $y, "\n";} catch(Error $err) {    echo "\nError:", $err->getMessage();} catch(Exception $e) {    echo "\nException:", $e->getMessage();}//output//ch2:18  23      41
up
2
gam6itko
3 years ago
<?php// the very weird way to calculate factorial ^_^// we create one thread and synching them with buffered channel// at fact only one thread is executing at the time use parallel\{Channel, Future, Runtime};for ($n = 0; $n <= 10; $n++) {    echo "!$n = " . factorial($n) . PHP_EOL;}/** * Creates $n threads. */function factorial(int $n): int{    // buffered channel - using for sync threads ^_^    $channel = new Channel(1);    $futureList = [];    for ($i = 2; $i <= $n; $i++) {        $runtime = new Runtime();        $futureList[] = $runtime->run(            static function (Channel $channel, $multiplier): void {                $f = $channel->recv();                $channel->send($f * $multiplier);            },            [$channel, $i]        );    }    $channel->send(1);    // waiting until all threads are done    do {        $allDone = array_reduce(            $futureList,            function (bool $c, Future $future): bool {                return $c && $future->done();            },            true        );    } while (false === $allDone);    return $channel->recv();}// output:// !0 = 1// !1 = 1// !2 = 2// !3 = 6// !4 = 24// !5 = 120// !6 = 720// !7 = 5040// !8 = 40320// !9 = 362880// !10 = 3628800
up
1
rustysun
5 years ago
<?phpuse parallel\Channel;function sum(array $a, Channel $ch) {    $sum=0;    foreach ($a as $v) {        $sum+=$v;    }    $ch->send($sum);}try {    $a=[7, 2, 8, 1, 4, 0, 9, 10];    $ch1=Channel::make('sum', 2);    $ch2=new Channel;    $num=count($a) / 2;    sum(array_slice($a, 0, $num), $ch1);    sum(array_slice($a, $num), $ch1);    //receive from channel    $x=$ch1->recv();    $y=$ch1->recv();    $ch1->close();    echo "\nch1:", $x, "\t", $y, "\t", $x + $y, "\n";} catch(Error $err) {    echo "\nError:", $err->getMessage();} catch(Exception $e) {    echo "\nException:", $e->getMessage();}
up
0
thierry at pielo dot net
11 months ago
<?php/** * Bzz reloaded! * Run two simple tasks in parallel and synchronize them with a channel *  * parallel\Channel(int $capacity): Buffered channel * Creates a buffered channel for communication between tasks * @ref https://www.php.net/manual/en/class.parallel-channel.php */ echo "zzz... " . PHP_EOL;// Create new buffered channel$channel = new \parallel\Channel(2);\parallel\run(    function($channel) {        $snaps_count = rand (8, 12);        echo "Number of snaps: $snaps_count" . PHP_EOL;        for ($i=1; $i<=$snaps_count; $i++) {            $other_sleep_time = rand(3, 5);            $my_sleep_time = rand(1, 3);            echo "Send sleep time to buffer" . PHP_EOL;            $start = microtime(true);            $channel->send($other_sleep_time);            $wait_time = microtime(true) - $start;            if ($wait_time > .1) {                echo "Buffer was full. I waited " . round($wait_time) . "s" . PHP_EOL;            }            echo "I sleep for {$my_sleep_time}s" . PHP_EOL;            sleep($my_sleep_time);        }        echo "I finished sleeping. Closing channel" . PHP_EOL;        $channel->close();    },    [$channel]);\parallel\run(    function($channel) {        try {            while(true) {                $my_sleep_time = $channel->recv();                echo "Other sleeps for {$my_sleep_time}s" . PHP_EOL;                sleep($my_sleep_time);            }        } catch(\parallel\Channel\Error\Closed $e) {            echo "Channel is closed. Other die.";            die;        }    },    [$channel]);
To Top