Given the following code, how can you ensure that the completed MyWorker objects are destroyed/their memory freed?
Due to what my script does I need ~50 threads constantly obtaining data from cURL, and processing it.
I've tried both having the threads never leave run()
, or as shown in this sample code where they leave run and have the collect function spawn a new copy of them.
But not matter what I hit the memory limits after a minute or so. Could you tell me what I'm doing wrong?
class MyWorker extends Threaded
{
public $complete;
public function __construct() {$this->complete = false;}
public function run() {$this->complete = true;}
}
$pool = new Pool(50);
for($i=0; $i<50; $i++)
$pool->submit(new MyWorker());
$pool->collect(function($worker)
{
global $pool;
if($worker->complete == true)
$pool->submit(new MyWorker());
return $worker->complete;
});
$pool->shutdown();
Why
Why should I collect anyway ?
The Worker
threads provided by pthreads require that the programmer retain the correct references to Threaded
objects that are being executed. This is difficult for the programmer to achieve in userland reliably, so pthreads provides the Pool
abstraction of Workers
which maintains references for you.
In order to maintain those reference pthreads needs to know when an object is garbage, it provides the Pool::collect
interface for this purpose. Pool::collect
takes a Closure which should accept a Threaded
object and return boolean true
if the passed object is finished executing.
How
The task at hand ...
In order to keep submitting tasks for execution and not exhaust resources, you must create a queue of completed tasks for resubmission to the Pool
The following code demonstrates a sane way of doing this:
<?php
define("LOG", Mutex::create());
/* thread safe log to stdout */
function slog($message, $args = []) {
$args = func_get_args();
if (($message = array_shift($args))) {
Mutex::lock(LOG);
echo vsprintf(
"{$message}\n", $args);
Mutex::unlock(LOG);
}
}
class Request extends Threaded {
public function __construct($url) {
$this->url = $url;
}
public function run() {
$response = @file_get_contents($this->url);
slog("%s returned %d bytes",
$this->url, strlen($response));
$this->reQueue();
}
public function getURL() { return $this->url; }
public function isQueued() { return $this->queued; }
public function reQueue() { $this->queued = true; }
protected $url;
protected $queued = false;
}
/* create a pool of 50 threads */
$pool = new Pool(50);
/* submit 50 requests for execution */
while (@$i++<50) {
$pool->submit(new Request(sprintf(
"http://google.com/?q=%s", md5($i))));
}
do {
$queue = array();
$pool->collect(function($request) use ($pool, &$queue) {
/* check for items to requeue */
if ($request->isQueued()) {
/* get the url for the request, insert into queue */
$queue[] =
$request->getURL();
/* allow this job to be collected */
return true;
}
});
/* resubmit completed tasks to pool */
if (count($queue)) {
foreach ($queue as $queued)
$pool->submit(new Request($queued));
}
/* sleep for a couple of seconds here ... because, be nice ! */
usleep(2.5 * 1000000);
} while (true);
?>