PHPerKaigi 2025

La classe parallel\Channel

(0.9.0)

canaux non tamponnés

Un canal non tamponné bloquera les appels à parallel\Channel::send() jusqu'à ce qu'il y ait un récepteur, et bloquera les appels à parallel\Channel::recv() jusqu'à ce qu'il y ait un émetteur. Cela signifie qu'un canal non tamponné n'est pas seulement un moyen de partager des données entre les tâches mais aussi une méthode simple de synchronisation.

Un canal non tamponné est le moyen le plus rapide de partager des données entre les tâches, nécessitant le moins de copie.

canaux tamponnés

Un canal tamponné ne bloquera pas les appels à parallel\Channel::send() jusqu'à ce que la capacité soit atteinte, les appels à parallel\Channel::recv() bloqueront jusqu'à ce qu'il y ait des données dans le tampon.

Fermetures sur les canaux

Une fonctionnalité puissante des canaux parallèles est qu'elles permettent l'échange de fermetures entre les tâches (et les runtimes).

Lorsqu'une fermeture est envoyée sur un canal, la fermeture est tamponnée, cela ne change pas le tampon du canal transmettant la fermeture, mais cela affecte la portée statique à l'intérieur de la fermeture: la même fermeture envoyée à différentes exécutions, ou à la même exécution, ne partagera pas leur portée statique.

Cela signifie que chaque fois qu'une fermeture est exécutée qui a été transmise par un canal, l'état statique sera tel qu'il était lorsque la fermeture a été tamponnée.

canaux anonymes

La construction de canaux anonymes permet au développeur d'éviter d'attribuer des noms à chaque canal: parallel générera un nom unique pour les canaux anonymes.

Synopsis de la classe

final class parallel\Channel {
/* Constructeur anonymes */
public __construct()
public __construct(int $capacity)
/* Accès */
public make(string $name): Channel
public make(string $name, int $capacity): Channel
public open(string $name): Channel
/* Partage */
public recv(): mixed
public send(mixed $value): void
/* Fermer */
public close(): void
/* Constantes pour le tamponnement infini */
const Infinite;
}

Sommaire

add a note

User Contributed Notes 5 notes

up
4
hdvianna
5 years ago
This is an example of using a channel to produce data for consumers. In this example, the producer Runtime instance will send the time in seconds in which the consumers shall sleep.

<?php

use parallel\{Runtime, Channel};

main($argv);

function
main(array $argv)
{
if (
count($argv) !== 3) {
echo
"Type: hello-parallel.php <number-of-tasks> <maximum-time-of-sleep (in seconds)>" . PHP_EOL;
echo
"Example: hello-parallel.php 5 3" . PHP_EOL;
die;
} else {
$numberOfTasks = intval($argv[1]);
$maximumTimeOfSleep = intval($argv[2]);
$t1 = microtime(true);
parallelize($numberOfTasks, $maximumTimeOfSleep);
$endTime = microtime(true) - $t1;
echo
PHP_EOL."Finished $numberOfTasks task(s) in {$endTime}s".PHP_EOL;
}
}

function
parallelize(int $numberOfTasks, int $maximumTimeOfSleep)
{
$channel = new Channel();

$taskIds = array_map(function () use ($maximumTimeOfSleep) {
return
$id = uniqid("task::");
},
range(0, $numberOfTasks - 1));

$timesToSleep = array_map(function () use ($maximumTimeOfSleep) {
return
rand(1, $maximumTimeOfSleep);
},
$taskIds);

$producer = new Runtime();
$producerFuture = $producer->run(function (Channel $channel, array $timesToSleep) {
foreach (
$timesToSleep as $timeToSleep) {
$channel->send($timeToSleep);
}
}, [
$channel, $timesToSleep]);

$consumerFutures = array_map(function (string $id) use ($channel) {
$runtime = new Runtime();
return
$runtime->run(function (string $id, Channel $channel) {
$timeToSleep = $channel->recv();
echo
"Hello from $id. I will sleep for $timeToSleep second(s).".PHP_EOL;
sleep($timeToSleep);
echo
"$id slept for $timeToSleep second(s).".PHP_EOL;
return
$timeToSleep;
}, [
$id, $channel]);
},
$taskIds);

wait($consumerFutures);
wait([$producerFuture]);
}

function
wait(array $futures)
{
return
array_map(function ($future) {
return
$future->value();
},
$futures);
}
up
2
rustysun
5 years ago
an example used unbuffered channel.
<?php

use parallel\{Channel,Runtime};

$sum=function(array $a, Channel $ch) {
$sum=0;
foreach (
$a as $v) {
$sum+=$v;
}
$ch->send($sum);
};
try {
$a=[7, 2, 8, 1, 4, 0, 9, 10];
//unbuffered channel
$runtime=new Runtime;
$ch2=new Channel;
$runtime->run($sum, [array_slice($a, 0, $num), $ch2]);
$runtime->run($sum, [array_slice($a, $num), $ch2]);
//receive from channel
$x=$ch2->recv();
$y=$ch2->recv();
$ch2->close();
echo
"\nch2:", $x, "\t", $y, "\t", $x + $y, "\n";
} catch(
Error $err) {
echo
"\nError:", $err->getMessage();
} catch(
Exception $e) {
echo
"\nException:", $e->getMessage();
}

//output
//ch2:18 23 41
up
2
gam6itko
3 years ago
<?php

// the very weird way to calculate factorial ^_^
// we create one thread and synching them with buffered channel
// at fact only one thread is executing at the time

use parallel\{Channel, Future, Runtime};

for (
$n = 0; $n <= 10; $n++) {
echo
"!$n = " . factorial($n) . PHP_EOL;
}

/**
* Creates $n threads.
*/
function factorial(int $n): int
{
// buffered channel - using for sync threads ^_^
$channel = new Channel(1);
$futureList = [];
for (
$i = 2; $i <= $n; $i++) {
$runtime = new Runtime();
$futureList[] = $runtime->run(
static function (
Channel $channel, $multiplier): void {
$f = $channel->recv();
$channel->send($f * $multiplier);
},
[
$channel, $i]
);
}

$channel->send(1);

// waiting until all threads are done
do {
$allDone = array_reduce(
$futureList,
function (
bool $c, Future $future): bool {

return
$c && $future->done();
},
true
);
} while (
false === $allDone);

return
$channel->recv();
}

// output:
// !0 = 1
// !1 = 1
// !2 = 2
// !3 = 6
// !4 = 24
// !5 = 120
// !6 = 720
// !7 = 5040
// !8 = 40320
// !9 = 362880
// !10 = 3628800
up
1
rustysun
5 years ago
<?php
use parallel\Channel;

function
sum(array $a, Channel $ch) {
$sum=0;
foreach (
$a as $v) {
$sum+=$v;
}
$ch->send($sum);
}

try {
$a=[7, 2, 8, 1, 4, 0, 9, 10];
$ch1=Channel::make('sum', 2);
$ch2=new Channel;
$num=count($a) / 2;
sum(array_slice($a, 0, $num), $ch1);
sum(array_slice($a, $num), $ch1);

//receive from channel
$x=$ch1->recv();
$y=$ch1->recv();
$ch1->close();
echo
"\nch1:", $x, "\t", $y, "\t", $x + $y, "\n";
} catch(
Error $err) {
echo
"\nError:", $err->getMessage();
} catch(
Exception $e) {
echo
"\nException:", $e->getMessage();
}
up
0
thierry at pielo dot net
5 months ago
<?php

/**
* Bzz reloaded!
* Run two simple tasks in parallel and synchronize them with a channel
*
* parallel\Channel(int $capacity): Buffered channel
* Creates a buffered channel for communication between tasks
* @ref https://www.php.net/manual/en/class.parallel-channel.php
*/

echo "zzz... " . PHP_EOL;

// Create new buffered channel
$channel = new \parallel\Channel(2);

\parallel\run(
function(
$channel) {
$snaps_count = rand (8, 12);
echo
"Number of snaps: $snaps_count" . PHP_EOL;
for (
$i=1; $i<=$snaps_count; $i++) {
$other_sleep_time = rand(3, 5);
$my_sleep_time = rand(1, 3);
echo
"Send sleep time to buffer" . PHP_EOL;
$start = microtime(true);
$channel->send($other_sleep_time);
$wait_time = microtime(true) - $start;
if (
$wait_time > .1) {
echo
"Buffer was full. I waited " . round($wait_time) . "s" . PHP_EOL;
}
echo
"I sleep for {$my_sleep_time}s" . PHP_EOL;
sleep($my_sleep_time);
}
echo
"I finished sleeping. Closing channel" . PHP_EOL;
$channel->close();
},
[
$channel]
);

\parallel\run(
function(
$channel) {
try {
while(
true) {
$my_sleep_time = $channel->recv();
echo
"Other sleeps for {$my_sleep_time}s" . PHP_EOL;
sleep($my_sleep_time);
}
} catch(
\parallel\Channel\Error\Closed $e) {
echo
"Channel is closed. Other die.";
die;
}
},
[
$channel]
);
To Top