PHPerKaigi 2025

parallel\run

(1.0.0)

parallel\runExécution

Description

parallel\run(Closure $task): ?Future

Planifie task pour exécution en parallèle.

parallel\run(Closure $task, array $argv): ?Future

Planifie task pour exécution en parallèle, passant argv à l'exécution.

Planification automatique

Si un \parallel\Runtime créé et mis en cache par un appel précédent à parallel\run() est inactif, il sera utilisé pour exécuter la tâche. Si aucun \parallel\Runtime n'est inactif, parallel créera et mettra en cache un \parallel\Runtime.

Note:

Les objets \parallel\Runtime créés par le développeur ne sont pas utilisés pour la planification automatique.

Liste de paramètres

task

Une Closure avec des caractéristiques spécifiques.

argv

Un array d'arguments avec des caractéristiques spécifiques à passer à task au moment de l'exécution.

Task Characteristics

Les fermetures planifiées pour l'exécution en parallèle ne doivent pas :

  • accepter ou retourner par référence
  • accepter ou retourner des objets internes (voir notes)
  • exécuter un ensemble limité d'instructions

Les instructions interdites dans les fermetures destinées à l'exécution en parallèle sont :

  • yield
  • utiliser by-reference
  • déclarer des classes
  • déclarer des fonctions nommées

Note:

Les fermetures imbriquées peuvent yield ou utiliser by-reference, mais ne doivent pas contenir de déclarations de classes ou de fonctions nommées.

Note:

Aucune instruction n'est interdite dans les fichiers que la tâche peut inclure.

Caractéristiques des arguments

Les arguments ne doivent pas:

  • contenir des références
  • contenir des ressources
  • contenir des objets internes (voir notes)

Note:

Dans le cas des ressources de flux de fichiers, la ressource sera convertie en descripteur de fichier et passée en int si possible, ceci n'est pas supporté sur Windows.

Notes sur les objets internes

Les objets internes utilisent généralement une structure personnalisée qui ne peut pas être copiée en toute sécurité par valeur, PHP manque actuellement des mécanismes pour le faire (sans sérialisation) et donc seuls les objets qui n'utilisent pas une structure personnalisée peuvent être partagés.

Certains objets internes n'utilisent pas de structure personnalisée, par exemple parallel\Events\Event et peuvent donc être partagés.

Les fermetures sont un type spécial d'objet interne et supportent d'être copiées par valeur, et peuvent donc être partagées.

Les canaux sont centraux pour l'écriture de code parallèle et supportent l'accès et l'exécution concurrents par nécessité, et peuvent donc être partagés.

Avertissement

Une classe utilisateur qui étend une classe interne peut utiliser une structure personnalisée telle que définie par la classe interne, auquel cas elle ne peut pas être copiée en toute sécurité par valeur, et ne peut donc pas être partagée.

Valeurs de retour

Avertissement

La Future retournée ne doit pas être ignorée lorsque la tâche contient une déclaration de retour ou de lancer.

Exceptions

Avertissement

Lance une parallel\Runtime\Error\Closed si parallel\Runtime était fermé.

Avertissement

Lance une parallel\Runtime\Error\IllegalFunction si task est une fermeture créée à partir d'une fonction interne.

Avertissement

Lance une parallel\Runtime\Error\IllegalInstruction si task contient des instructions illégales.

Avertissement

Lance une parallel\Runtime\Error\IllegalParameter si task accepte ou argv contient des variables illégales.

Avertissement

Lance une parallel\Runtime\Error\IllegalReturn si task retourne illégalement.

add a note

User Contributed Notes 3 notes

up
21
john_2885 at yahoo dot com
4 years ago
Here's a more substantial example of how to use the run functional API.

<?php
/*********************************************
* Sample parallel functional API
*
* Scenario
* -------------------------------------------
* Given a large number of rows of
* data to process, divide the work amongst
* a set of workers. Each worker is responsible
* for finishing their assigned task.
*
* In the code below, assume we have arbitrary
* start and end IDs (rows) - we will try to
* divide the number of IDs (rows) evenly
* across 8 workers. The workers will get the
* following batches to process to completion:
*
* Total number of IDs (rows): 1371129
* Each worker will get 171392 IDs to process
*
* Worker 1: IDs from 11001 to 182393
* Worker 2: IDs from 182393 to 353785
* Worker 3: IDs from 353785 to 525177
* Worker 4: IDs from 525177 to 696569
* Worker 5: IDs from 696569 to 867961
* Worker 6: IDs from 867961 to 1039353
* Worker 7: IDs from 1039353 to 1210745
* Worker 8: IDs from 1210745 to 1382130
*
* Each worker then processes 5000 rows at a time
* until they are done with their assigned work
*
*********************************************/

use \parallel\{Runtime, Future, Channel, Events};

$minId = 11001;
$maxId = 1382130;
$workers = 8;
$totalIds = $maxId - $minId;
// Try to divide IDs evenly across the number of workers
$batchSize = ceil($totalIds / $workers);
// The last batch gets whatever is left over
$lastBatch = $totalIds % $batchSize;
// The number of IDs (rows) to divide the overall
// task into sub-batches
$rowsToFetch = 5000;

print
"Total IDs: " . $totalIds . "\n";
print
"Batch Size: " . $batchSize . "\n";
print
"Last Batch: " . $lastBatch . "\n";

$producer = function(int $worker, int $startId, int $endId, int $fetchSize) {
$tempMinId = $startId;
$tempMaxId = $tempMinId + $fetchSize;
$fetchCount = 1;

print
"Worker " . $worker . " working on IDs from " . $startId . " to " . $endId . "\n";

while(
$tempMinId < $endId) {
for(
$i = $tempMinId; $i < $tempMaxId; $i++) {
$usleep = rand(500000, 1000000);
usleep($usleep);
print
"Worker " . $worker . " finished batch " . $fetchCount . " from ID " . $tempMinId . " to " . $tempMaxId . "\n";
// Need to explicitly break out of the for loop once complete or else it will forever process only the first sub-batch
break;
}

// Now we move on to the next sub-batch for this worker
$tempMinId = $tempMaxId;
$tempMaxId = $tempMinId + $fetchSize;
if(
$tempMaxId > $endId) {
$tempMaxId = $endId;
}
// Introduce some timing randomness
$sleep = rand(1,5);
sleep($sleep);
$fetchCount++;
}

// This worker has completed their entire batch
print "Worker " . $worker . " finished\n";

};

// Create our workers and have them start working on their task
// In this case, it's a set of 171392 IDs to process
for($i = 0; $i < $workers; $i++) {
$startId = $minId + ($i * $batchSize);
$endId = $startId + $batchSize;
if(
$i == ($workers - 1)) {
$endId = $maxId;
}
\parallel\run($producer, array(($i+1), $startId, $endId, $rowsToFetch));
}

?>
up
9
anonymous user
3 years ago
Although function declaration is not allowed inside thread exec code, include is allowed. So if we want to declare a function, we could write another file that contain the function and include it.
# main.php
<?php
$runtime
= new parallel\Runtime ();
$future = $runtime->run ( function () {
$future = $runtime->run ( function () {
include
"included.php";
return
add (1, 3);
}, [ ] );
echo
$future->value ();
# output: 4
# included.php
<?php
function add($a, $b){
return
$a + $b;
}
up
2
Thierry Kauffmann
3 years ago
<?php

/**
* Sample parralel functional API
* using a generator instead of a static list of items to process
*
* Items to process in parallel come from a generator
* It could be anything : e.g fetch a mysql array, a DirectoryIterator...
* Thus the number of items to process in parallel is NOT known in advance
*
* This algorithm attributes items to each parallel thread dynamically
* As soon as a thread has finished working
* It is assigned a new item to process
* until all items are processed (generator closes)
*
* In this example we process 50 items in 5 parallel threads
* It produces output in this form (output changes at each run) :
*
* ThreadId: 1 => Item: 1 (Start)
* ThreadId: 2 => Item: 2 (Start)
* ThreadId: 3 => Item: 3 (Start)
* ThreadId: 4 => Item: 4 (Start)
* ThreadId: 5 => Item: 5 (Start)
* ThreadId: 5 => Item: 5 Sleep: 3s (End)
* ThreadId: 5 => Item: 6 (Start)
* ThreadId: 3 => Item: 3 Sleep: 4s (End)
* ThreadId: 3 => Item: 7 (Start)
* ThreadId: 2 => Item: 2 Sleep: 6s (End)
* ThreadId: 2 => Item: 8 (Start)
* ...
* ThreadId: 4 => Item: 44 Sleep: 6s (End)
* ThreadId: 4 => Item: 49 (Start)
* ThreadId: 3 => Item: 46 Sleep: 5s (End)
* ThreadId: 3 => Item: 50 (Start)
* ThreadId: 2 => Item: 43 Sleep: 9s (End)
* Destroy ThreadId: 2
* ThreadId: 1 => Item: 47 Sleep: 5s (End)
* Destroy ThreadId: 1
* ThreadId: 4 => Item: 49 Sleep: 7s (End)
* Destroy ThreadId: 4
* ThreadId: 5 => Item: 48 Sleep: 10s (End)
* Destroy ThreadId: 5
* ThreadId: 3 => Item: 50 Sleep: 10s (End)
* Destroy ThreadId: 3
*/

use \parallel\{Runtime, Future, Channel, Events};

// Generate list of items to process with a generator
function generator(int $item_count) {
for (
$i=1; $i <= $item_count; $i++) {
yield
$i;
}
}

function
testConcurrency(int $concurrency, int $item_count) {

$generator = generator($item_count);

// Function executing in each thread. Have a snap for a random time for example !
$producer = function (int $item_id) {
$seconds = rand(1, 10);
sleep($seconds);
return [
'item_id' => $item_id, 'sleep_seconds' => $seconds];
};

// Fill up threads with initial 'inactive' state
$threads = array_fill(1, $concurrency, ['is_active' => false]);

while (
true) {
// Loop through threads until all threads are finished
foreach ($threads as $thread_id => $thread) {
if (!
$thread['is_active'] and $generator->valid()) {
// Thread is inactive and generator still has values : run something in the thread
$item_id = $generator->current();
$threads[$thread_id]['run'] = \parallel\run($producer, [$item_id]);
echo
"ThreadId: $thread_id => Item: $item_id (Start)\n";
$threads[$thread_id]['is_active'] = true;
$generator->next();
} elseif (!isset(
$threads[$thread_id]['run'])) {
// Destroy supplementary threads in case generator closes sooner than number of threads
echo "Destroy ThreadId: $thread_id\n";
unset(
$threads[$thread_id]);
} elseif (
$threads[$thread_id]['run']->done()) {
// Thread finished. Get results
$item = $threads[$thread_id]['run']->value();
echo
"ThreadId: $thread_id => Item: {$item['item_id']} Sleep: {$item['sleep_seconds']}s (End)\n";

if (!
$generator->valid()) {
// Generator is closed then destroy thread
echo "Destroy ThreadId: $thread_id\n";
unset(
$threads[$thread_id]);
} else {
// Thread is ready to run again
$threads[$thread_id]['is_active'] = false;
}
}
}

// Escape loop when all threads are destroyed
if (empty($threads)) break;
}
}

$concurrency = 5;
$item_count = 50;

testConcurrency($concurrency, $item_count);

?>
To Top