A classe parallel\Channel

(0.9.0)

Channels sem buffer

Um channel sem buffer bloqueará chamadas para parallel\Channel::send() até que haja um receptor e bloqueará chamadas para parallel\Channel::recv() até que haja um remetente. Isso significa que um channel sem buffer não é apenas uma forma de compartilhar dados entre tarefas, mas também um método simples de sincronização.

Um channel sem buffer é a maneira mais rápida de compartilhar dados entre tarefas, exigindo menos cópias.

Channels em buffer

m canal em buffer não bloqueará chamadas para parallel\Channel::send() até que a capacidade seja atingida, chamadas para parallel\Channel::recv() serão bloqueadas até que haja dados no buffer.

Closures sobre Channels

Uma característica poderosa dos channels paralelos é que eles permitem a troca de closures entre tarefas (e tempos de execução).

Quando um closure é enviado através de um channel, o closure é armazenado em buffer, isso não altera o buffer do channel que transmite o closure, mas afeta o escopo estático dentro do closure: O mesmo closure enviado para tempos de execução diferentes, ou o mesmo tempo de execução, não compartilharão seu escopo estático.

Isso significa que sempre que for executado um closure transmitido por um channel, o estado estático será o mesmo de quando o closure foi armazenado em buffer.

Channels Anônimos

O construtor de channel anônimo permite ao programador evitar atribuir nomes a cada channel: parallel gerará um nome exclusivo para channels anônimos.

Resumo da classe

final class parallel\Channel {
/* Construtor Anônimo */
public __construct()
public __construct(int $capacity)
/* Acesso */
public make(string $name): Channel
public make(string $name, int $capacity): Channel
public open(string $name): Channel
/* Compartilhamento */
public recv(): mixed
public send(mixed $value): void
/* Encerramento */
public close(): void
/* Constante para Buffer Infinito */
const Infinite;
}

Índice

adicionar nota

Notas de Usuários 5 notes

up
4
hdvianna
5 years ago
This is an example of using a channel to produce data for consumers. In this example, the producer Runtime instance will send the time in seconds in which the consumers shall sleep.

<?php

use parallel\{Runtime, Channel};

main($argv);

function main(array $argv)
{
    if (count($argv) !== 3) {
        echo "Type: hello-parallel.php <number-of-tasks> <maximum-time-of-sleep (in seconds)>" . PHP_EOL;
        echo "Example: hello-parallel.php 5 3" . PHP_EOL;
        die;
    } else {
        $numberOfTasks = intval($argv[1]);
        $maximumTimeOfSleep = intval($argv[2]);
        $t1 = microtime(true);
        parallelize($numberOfTasks, $maximumTimeOfSleep);
        $endTime = microtime(true) - $t1;
        echo PHP_EOL."Finished $numberOfTasks task(s) in {$endTime}s".PHP_EOL;
    }
}

function parallelize(int $numberOfTasks, int $maximumTimeOfSleep)
{
    $channel = new Channel();

    $taskIds = array_map(function () use ($maximumTimeOfSleep) {
        return $id = uniqid("task::");
    }, range(0, $numberOfTasks - 1));

    $timesToSleep = array_map(function () use ($maximumTimeOfSleep) {
        return rand(1, $maximumTimeOfSleep);
    }, $taskIds);

    $producer = new Runtime();
    $producerFuture = $producer->run(function (Channel $channel, array $timesToSleep) {
        foreach ($timesToSleep as $timeToSleep) {
            $channel->send($timeToSleep);
        }
    }, [$channel, $timesToSleep]);

    $consumerFutures = array_map(function (string $id) use ($channel) {
        $runtime = new Runtime();
        return $runtime->run(function (string $id, Channel $channel) {
            $timeToSleep = $channel->recv();
            echo "Hello from $id. I will sleep for $timeToSleep second(s).".PHP_EOL;
            sleep($timeToSleep);
            echo "$id slept for $timeToSleep second(s).".PHP_EOL;
            return $timeToSleep;
        }, [$id, $channel]);
    }, $taskIds);

    wait($consumerFutures);
    wait([$producerFuture]);
}

function wait(array $futures)
{
    return array_map(function ($future) {
        return $future->value();
    }, $futures);
}
up
2
rustysun
6 years ago
an example used unbuffered channel.
<?php

use parallel\{Channel,Runtime};

$sum=function(array $a, Channel $ch) {
    $sum=0;
    foreach ($a as $v) {
        $sum+=$v;
    }
    $ch->send($sum);
};
try {
    $a=[7, 2, 8, 1, 4, 0, 9, 10];
    //unbuffered channel
    $runtime=new Runtime;
    $ch2=new Channel;
    $runtime->run($sum, [array_slice($a, 0, $num), $ch2]);
    $runtime->run($sum, [array_slice($a, $num), $ch2]);
    //receive from channel
    $x=$ch2->recv();
    $y=$ch2->recv();
    $ch2->close();
    echo "\nch2:", $x, "\t", $y, "\t", $x + $y, "\n";
} catch(Error $err) {
    echo "\nError:", $err->getMessage();
} catch(Exception $e) {
    echo "\nException:", $e->getMessage();
}

//output
//ch2:18  23      41
up
2
gam6itko
4 years ago
<?php

// the very weird way to calculate factorial ^_^
// we create one thread and synching them with buffered channel
// at fact only one thread is executing at the time 

use parallel\{Channel, Future, Runtime};

for ($n = 0; $n <= 10; $n++) {
    echo "!$n = " . factorial($n) . PHP_EOL;
}

/**
 * Creates $n threads.
 */
function factorial(int $n): int
{
    // buffered channel - using for sync threads ^_^
    $channel = new Channel(1);
    $futureList = [];
    for ($i = 2; $i <= $n; $i++) {
        $runtime = new Runtime();
        $futureList[] = $runtime->run(
            static function (Channel $channel, $multiplier): void {
                $f = $channel->recv();
                $channel->send($f * $multiplier);
            },
            [$channel, $i]
        );
    }

    $channel->send(1);

    // waiting until all threads are done
    do {
        $allDone = array_reduce(
            $futureList,
            function (bool $c, Future $future): bool {

                return $c && $future->done();
            },
            true
        );
    } while (false === $allDone);

    return $channel->recv();
}

// output:
// !0 = 1
// !1 = 1
// !2 = 2
// !3 = 6
// !4 = 24
// !5 = 120
// !6 = 720
// !7 = 5040
// !8 = 40320
// !9 = 362880
// !10 = 3628800
up
1
rustysun
6 years ago
<?php
use parallel\Channel;

function sum(array $a, Channel $ch) {
    $sum=0;
    foreach ($a as $v) {
        $sum+=$v;
    }
    $ch->send($sum);
}

try {
    $a=[7, 2, 8, 1, 4, 0, 9, 10];
    $ch1=Channel::make('sum', 2);
    $ch2=new Channel;
    $num=count($a) / 2;
    sum(array_slice($a, 0, $num), $ch1);
    sum(array_slice($a, $num), $ch1);

    //receive from channel
    $x=$ch1->recv();
    $y=$ch1->recv();
    $ch1->close();
    echo "\nch1:", $x, "\t", $y, "\t", $x + $y, "\n";
} catch(Error $err) {
    echo "\nError:", $err->getMessage();
} catch(Exception $e) {
    echo "\nException:", $e->getMessage();
}
up
0
thierry at pielo dot net
1 year ago
<?php

/**
 * Bzz reloaded!
 * Run two simple tasks in parallel and synchronize them with a channel
 * 
 * parallel\Channel(int $capacity): Buffered channel
 * Creates a buffered channel for communication between tasks
 * @ref https://www.php.net/manual/en/class.parallel-channel.php
 */

 echo "zzz... " . PHP_EOL;

// Create new buffered channel
$channel = new \parallel\Channel(2);

\parallel\run(
    function($channel) {
        $snaps_count = rand (8, 12);
        echo "Number of snaps: $snaps_count" . PHP_EOL;
        for ($i=1; $i<=$snaps_count; $i++) {
            $other_sleep_time = rand(3, 5);
            $my_sleep_time = rand(1, 3);
            echo "Send sleep time to buffer" . PHP_EOL;
            $start = microtime(true);
            $channel->send($other_sleep_time);
            $wait_time = microtime(true) - $start;
            if ($wait_time > .1) {
                echo "Buffer was full. I waited " . round($wait_time) . "s" . PHP_EOL;
            }
            echo "I sleep for {$my_sleep_time}s" . PHP_EOL;
            sleep($my_sleep_time);
        }
        echo "I finished sleeping. Closing channel" . PHP_EOL;
        $channel->close();
    },
    [$channel]
);

\parallel\run(
    function($channel) {
        try {
            while(true) {
                $my_sleep_time = $channel->recv();
                echo "Other sleeps for {$my_sleep_time}s" . PHP_EOL;
                sleep($my_sleep_time);
            }
        } catch(\parallel\Channel\Error\Closed $e) {
            echo "Channel is closed. Other die.";
            die;
        }
    },
    [$channel]
);
To Top