PHP Conference Fukuoka 2025

La clase parallel\Channel

(0.9.0)

canales no tamponados

Un canal no tamponado bloqueará las llamadas a parallel\Channel::send() hasta que haya un receptor, y bloqueará las llamadas a parallel\Channel::recv() hasta que haya un emisor. Esto significa que un canal no tamponado no es solo un medio para compartir datos entre las tareas sino también un método simple de sincronización.

Un canal no tamponado es la forma más rápida de compartir datos entre las tareas, requiriendo la menor cantidad de copias.

canales tamponados

Un canal tamponado no bloqueará las llamadas a parallel\Channel::send() hasta que se alcance la capacidad, las llamadas a parallel\Channel::recv() bloquearán hasta que haya datos en el búfer.

Cierres en los canales

Una funcionalidad poderosa de los canales paralelos es que permiten el intercambio de cierres entre las tareas (y los entornos de ejecución).

Cuando un cierre es enviado a través de un canal, el cierre es tamponado, esto no cambia el búfer del canal que transmite el cierre, pero afecta el ámbito estático dentro del cierre: el mismo cierre enviado a diferentes ejecuciones, o a la misma ejecución, no compartirá su ámbito estático.

Esto significa que cada vez que un cierre es ejecutado y ha sido transmitido por un canal, el estado estático será el mismo que cuando el cierre fue tamponado.

canales anónimos

La construcción de canales anónimos permite al desarrollador evitar asignar nombres a cada canal: parallel generará un nombre único para los canales anónimos.

Sinopsis de la Clase

final class parallel\Channel {
/* Constructores anónimos */
public __construct()
public __construct(int $capacity)
/* Acceso */
public make(string $name): Channel
public make(string $name, int $capacity): Channel
public open(string $name): Channel
/* Compartir */
public recv(): mixed
public send(mixed $value): void
/* Cerrar */
public close(): void
/* Constantes para el tamponamiento infinito */
const Infinite;
}

Tabla de contenidos

add a note

User Contributed Notes 5 notes

up
4
hdvianna
5 years ago
This is an example of using a channel to produce data for consumers. In this example, the producer Runtime instance will send the time in seconds in which the consumers shall sleep.<?phpuse parallel\{Runtime, Channel};main($argv);function main(array $argv){    if (count($argv) !== 3) {        echo "Type: hello-parallel.php <number-of-tasks> <maximum-time-of-sleep (in seconds)>" . PHP_EOL;        echo "Example: hello-parallel.php 5 3" . PHP_EOL;        die;    } else {        $numberOfTasks = intval($argv[1]);        $maximumTimeOfSleep = intval($argv[2]);        $t1 = microtime(true);        parallelize($numberOfTasks, $maximumTimeOfSleep);        $endTime = microtime(true) - $t1;        echo PHP_EOL."Finished $numberOfTasks task(s) in {$endTime}s".PHP_EOL;    }}function parallelize(int $numberOfTasks, int $maximumTimeOfSleep){    $channel = new Channel();    $taskIds = array_map(function () use ($maximumTimeOfSleep) {        return $id = uniqid("task::");    }, range(0, $numberOfTasks - 1));    $timesToSleep = array_map(function () use ($maximumTimeOfSleep) {        return rand(1, $maximumTimeOfSleep);    }, $taskIds);    $producer = new Runtime();    $producerFuture = $producer->run(function (Channel $channel, array $timesToSleep) {        foreach ($timesToSleep as $timeToSleep) {            $channel->send($timeToSleep);        }    }, [$channel, $timesToSleep]);    $consumerFutures = array_map(function (string $id) use ($channel) {        $runtime = new Runtime();        return $runtime->run(function (string $id, Channel $channel) {            $timeToSleep = $channel->recv();            echo "Hello from $id. I will sleep for $timeToSleep second(s).".PHP_EOL;            sleep($timeToSleep);            echo "$id slept for $timeToSleep second(s).".PHP_EOL;            return $timeToSleep;        }, [$id, $channel]);    }, $taskIds);    wait($consumerFutures);    wait([$producerFuture]);}function wait(array $futures){    return array_map(function ($future) {        return $future->value();    }, $futures);}
up
2
rustysun
6 years ago
an example used unbuffered channel.<?phpuse parallel\{Channel,Runtime};$sum=function(array $a, Channel $ch) {    $sum=0;    foreach ($a as $v) {        $sum+=$v;    }    $ch->send($sum);};try {    $a=[7, 2, 8, 1, 4, 0, 9, 10];    //unbuffered channel    $runtime=new Runtime;    $ch2=new Channel;    $runtime->run($sum, [array_slice($a, 0, $num), $ch2]);    $runtime->run($sum, [array_slice($a, $num), $ch2]);    //receive from channel    $x=$ch2->recv();    $y=$ch2->recv();    $ch2->close();    echo "\nch2:", $x, "\t", $y, "\t", $x + $y, "\n";} catch(Error $err) {    echo "\nError:", $err->getMessage();} catch(Exception $e) {    echo "\nException:", $e->getMessage();}//output//ch2:18  23      41
up
2
gam6itko
4 years ago
<?php// the very weird way to calculate factorial ^_^// we create one thread and synching them with buffered channel// at fact only one thread is executing at the time use parallel\{Channel, Future, Runtime};for ($n = 0; $n <= 10; $n++) {    echo "!$n = " . factorial($n) . PHP_EOL;}/** * Creates $n threads. */function factorial(int $n): int{    // buffered channel - using for sync threads ^_^    $channel = new Channel(1);    $futureList = [];    for ($i = 2; $i <= $n; $i++) {        $runtime = new Runtime();        $futureList[] = $runtime->run(            static function (Channel $channel, $multiplier): void {                $f = $channel->recv();                $channel->send($f * $multiplier);            },            [$channel, $i]        );    }    $channel->send(1);    // waiting until all threads are done    do {        $allDone = array_reduce(            $futureList,            function (bool $c, Future $future): bool {                return $c && $future->done();            },            true        );    } while (false === $allDone);    return $channel->recv();}// output:// !0 = 1// !1 = 1// !2 = 2// !3 = 6// !4 = 24// !5 = 120// !6 = 720// !7 = 5040// !8 = 40320// !9 = 362880// !10 = 3628800
up
1
rustysun
6 years ago
<?phpuse parallel\Channel;function sum(array $a, Channel $ch) {    $sum=0;    foreach ($a as $v) {        $sum+=$v;    }    $ch->send($sum);}try {    $a=[7, 2, 8, 1, 4, 0, 9, 10];    $ch1=Channel::make('sum', 2);    $ch2=new Channel;    $num=count($a) / 2;    sum(array_slice($a, 0, $num), $ch1);    sum(array_slice($a, $num), $ch1);    //receive from channel    $x=$ch1->recv();    $y=$ch1->recv();    $ch1->close();    echo "\nch1:", $x, "\t", $y, "\t", $x + $y, "\n";} catch(Error $err) {    echo "\nError:", $err->getMessage();} catch(Exception $e) {    echo "\nException:", $e->getMessage();}
up
0
thierry at pielo dot net
1 year ago
<?php/** * Bzz reloaded! * Run two simple tasks in parallel and synchronize them with a channel *  * parallel\Channel(int $capacity): Buffered channel * Creates a buffered channel for communication between tasks * @ref https://www.php.net/manual/en/class.parallel-channel.php */ echo "zzz... " . PHP_EOL;// Create new buffered channel$channel = new \parallel\Channel(2);\parallel\run(    function($channel) {        $snaps_count = rand (8, 12);        echo "Number of snaps: $snaps_count" . PHP_EOL;        for ($i=1; $i<=$snaps_count; $i++) {            $other_sleep_time = rand(3, 5);            $my_sleep_time = rand(1, 3);            echo "Send sleep time to buffer" . PHP_EOL;            $start = microtime(true);            $channel->send($other_sleep_time);            $wait_time = microtime(true) - $start;            if ($wait_time > .1) {                echo "Buffer was full. I waited " . round($wait_time) . "s" . PHP_EOL;            }            echo "I sleep for {$my_sleep_time}s" . PHP_EOL;            sleep($my_sleep_time);        }        echo "I finished sleeping. Closing channel" . PHP_EOL;        $channel->close();    },    [$channel]);\parallel\run(    function($channel) {        try {            while(true) {                $my_sleep_time = $channel->recv();                echo "Other sleeps for {$my_sleep_time}s" . PHP_EOL;                sleep($my_sleep_time);            }        } catch(\parallel\Channel\Error\Closed $e) {            echo "Channel is closed. Other die.";            die;        }    },    [$channel]);
To Top