Pool 类

(PECL pthreads >= 2.0.0)

简介

Pool 对象是多个 Worker 对象的容器,同时也是它们的控制器。

线程池是对 Worker 功能的高层抽象,包括按照 pthreads 需要的方式来管理应用的功能。

类摘要

Pool {
/* 属性 */
protected $size ;
protected $class ;
protected $workers ;
protected $work ;
protected $ctor ;
protected $last ;
/* 方法 */
public int collect ([ Callable $collector ] )
public Pool __construct ( integer $size [, string $class [, array $ctor ]] )
public void resize ( integer $size )
public void shutdown ( void )
public int submit ( Threaded $task )
public int submitTo ( int $worker , Threaded $task )
}

属性

size

Pool 对象可容纳的 Worker 对象的最大数量

class

Worker 的类

ctor

构造新的 Worker 对象时所需的参数

workers

指向 Worker 对象的引用

work

指向提交到 Pool 对象中的 Threaded 对象的引用

last

最后使用的 Worker 对象在池中的位置偏移量

Table of Contents

User Contributed Notes

meadowsjared at gmail dot com 15-Mar-2016 04:16
Please note, when using the collect function, it's important that you extend the pool class so you can keep checking for finished threads until they're all done.

<?php
class TestWork extends Threaded {
    protected
$complete;
   
//$pData is the data sent to your worker thread to do it's job.
   
public function __construct($pData){
       
//transfer all the variables to local variables
       
$this->complete = false;
       
$this->testData = $pData;
    }
   
//This is where all of your work will be done.
   
public function run(){
       
usleep(2000000); //sleep 2 seconds to simulate a large job
       
$this->complete = true;
    }
    public function
isGarbage() {
        return
$this->complete;
    }
}
class
ExamplePool extends Pool
{
    public
$data = array();
    public function
process()
    {
       
// Run this loop as long as we have
        // jobs in the pool
       
while (count($this->work)) {
           
$this->collect(function (TestWork $task) {
               
// If a task was marked as done
                // collect its results
               
if ($task->isGarbage()) {
                   
$tmpObj = new stdclass();
                   
$tmpObj->complete = $task->complete;
                   
//this is how you get your completed data back out [accessed by $pool->process()]
                   
$this->data[] = $tmpObj;
                }
                return
$task->isGarbage();
            });
        }
       
// All jobs are done
        // we can shutdown the pool
       
$this->shutdown();
        return
$this->data;
    }
}
$pool = new ExamplePool(3);
$testData = 'asdf';
for(
$i=0;$i<5;$i++) {
   
$pool->submit(new TestWork($testData));
}
$retArr = $pool->process(); //get all of the results
echo '<pre>';
print_r($retArr); //return the array of results (and maybe errors)
echo '</pre>';
?>
olavk 30-Mar-2015 10:48
Simple example with Collectable (basically Thread meant for Pool) and Pool

<?php

class job extends Collectable {
  public
$val;

  public function
__construct($val){
   
// init some properties
   
$this->val = $val;
  }
  public function
run(){
   
// do some work
   
$this->val = $this->val . file_get_contents('http://www.example.com/', null, null, 3, 20);
   
$this->setGarbage();
  }
}

// At most 3 threads will work at once
$p = new Pool(3);

$tasks = array(
  new
job('0'),
  new
job('1'),
  new
job('2'),
  new
job('3'),
  new
job('4'),
  new
job('5'),
  new
job('6'),
  new
job('7'),
  new
job('8'),
  new
job('9'),
  new
job('10'),
);
// Add tasks to pool queue
foreach ($tasks as $task) {
 
$p->submit($task);
}

// shutdown will wait for current queue to be completed
$p->shutdown();
// garbage collection check / read results
$p->collect(function($checkingTask){
  echo
$checkingTask->val;
  return
$checkingTask->isGarbage();
});

?>
fajan 28-May-2014 10:18
Example class to demonstrate usage of Pool/Worker mechanism, also to show a few tricks & hints ;)
<?php
class Config extends Threaded{    // shared global object
   
protected $val=0, $val2=0;
    protected function
inc(){++$this->val;}    // protected synchronizes by-object
   
public function inc2(){++$this->val2;}    // no synchronization
}
class
WorkerClass extends Worker{
    protected static
$worker_id_next = -1;
    protected
$worker_id;
    protected
$config;
    public function
__construct($config){
       
$this->worker_id = ++static::$worker_id_next;    // static members are not avalable in thread but are in 'main thread'
       
$this->config = $config;
    }
    public function
run(){
        global
$config;
       
$config = $this->config;    // NOTE: setting by reference WON'T work
       
global $worker_id;
       
$worker_id = $this->worker_id;
        echo
"working context {$worker_id} is created!\n";
       
//$this->say_config();    // globally synchronized function.
   
}
    protected function
say_config(){    // 'protected' is synchronized by-object so WON'T work between multiple instances
       
global $config;        // you can use the shared $config object as synchronization source.
       
$config->synchronized(function() use (&$config){    // NOTE: you can use Closures here, but if you attach a Closure to a Threaded object it will be destroyed as can't be serialized
           
var_dump($config);
        });
    }
}
class
Task extends Stackable{    // Stackable still exists, it's just somehow dissappeared from docs (probably by mistake). See older version's docs for more details.
   
protected $set;
    public function
__construct($set){
       
$this->set = $set;
    }
    public function
run(){
        global
$worker_id;
        echo
"task is running in {$worker_id}!\n";
       
usleep(mt_rand(1,100)*100);
       
$config = $this->getConfig();
       
$val = $config->arr->shift();
       
$config->arr[] = $this->set;
        for (
$i = 0 ; $i < 1000; ++$i){
           
$config->inc();
           
$config->inc2();
        }
    }
    public function
getConfig(){
        global
$config;    // WorkerClass set this on thread's scope, can be reused by Tasks for additional asynch data source. (ie: connection pool or taskqueue to demultiplexer)
       
return $config;
    }
}

$config = new Config;
$config->arr = new \Threaded();
$config->arr->merge(array(1,2,3,4,5,6));
class
PoolClass extends Pool{
    public function
worker_list(){
        if (
$this->workers !== null)
            return
array_keys($this->workers);
        return
null;
    }
}
$pool = new PoolClass(3, 'WorkerClass', [$config] );
$pool->worker_list();
//$pool->submitTo(0,new Task(-10));    // submitTo DOES NOT try to create worker

$spammed_id = -1;
for (
$i = 1; $i <= 100; ++$i){    // add some jobs
   
if ($spammed_id == -1 && ($x = $pool->worker_list())!= null && @$x[2]){
       
$spammed_id = $x[2];
        echo
"spamming worker {$spammed_id} with lots of tasks from now on\n";
    }
    if (
$spammed_id != -1 && ($i % 5) == 0)    // every 5th job is routed to one worker, so it has 20% of the total jobs (with 3 workers it should do ~33%, not it has (33+20)%, so only delegate to worker if you plan to do balancing as well... )
       
$pool->submitTo($spammed_id,new Task(10*$i));   
    else
       
$pool->submit(new Task(10*$i));
}
$pool->shutdown();
var_dump($config); // "val" is exactly 100000, "val2" is probably a bit less
// also: if you disable the spammer, you'll that the order of the "arr" is random.
?>