pcntl runs the same code several times, assistance

2019-01-15 04:33发布

I am using pcntl in order to speed up a quite heave CLI php script, that consists mostly of a class, that is in charge of sending all of the auto-emailing on my application.

My goal is as following: I want to assign each process to a certain task, within a foreach loop, the implementation I've used is the one shown in the code example below.

The problem is that once you fork a process, it executes asynchronously, and also gets a copy of the parent's process stack. In my case, what happens is that one task simply executes several times, My question is how can I design this script to be smarter in order to avoid such behavior?.

Code:

/**
@description this is the main procedure of this class, it iteratates over the relevant tasks and sends the emails using the SendGrid wrapper class
@see SendGridWrapper
@return void
*/
public function execute(){
    if(!isset($this->tasks)){
        throw new exception("Please call getRelevantTasks() prior to trying to execute anything");
    }
$proccesses = array();
foreach($this->tasks as $myTask){
    $pid = pcntl_fork();
    if($pid){
        $proccesses[] = $pid;
    }
    else if($pid == -1){
        die('FORK FAILED, STATUS -1');
    }
    else{   
        if(isset($myTask['recipient_model'])){

            $this->currentModel = $myTask['recipient_model'];
            $lang = $myTask['lang'];
            $classPath = self::$modelsDir . $myTask['recipient_model'] . '.php';
            $className = $myTask['recipient_model'];
            if(!class_exists($myTask['recipient_model'] )){
                require_once(dirname(__FILE__) . '/../' .  $classPath); 
            }
            else if(isset($recipientFetcher)){
                unset($this->model);
                unset($this->mailingList);
                unset($this->substitutionList);
            }
            $this->model = null;
            $this->mailingList = null;
            $this->substitutionList = null;
            $this->model = new $className($myTask['lang']);
            $addresses = $this->model->getMailRecipients();
            if(empty($addresses) || sizeof($addresses) == 0){
                continue;
            }
            $this->model->prepare();
            $this->substitutionList = $this->model->getDynamicParams();

        }
        else{
            throw new exception('No recipient model was found');
        }

        foreach($addresses as $myMail){
            $this->mailingList[$myMail['personal_email']] = $myMail['contact_name'];
        }
        $templatePath = dirname(__FILE__) . '/../'; 
        $templatePath .= $lang ? self::$templatesDirEn . $myTask['html_email_path'] : self::$templatesDirHe . $myTask['html_email_path'];
        $html = file_get_contents($templatePath);
        $this->sendMail($html, $myTask['task_schedule_id']);
        echo "model:" . $myTask['recipient_model'];
        echo $this->log;
        $this->log = "";
        die("\r\n Child proccess has been executed successfully\r\n");  

    }   
}
if($pid){
    foreach($proccesses as $key => $val){
         pcntl_waitpid($val, $status, WUNTRACED);
    }
}           

}

Thanks in advance, Oleg.

标签: php fork pcntl
1条回答
够拽才男人
2楼-- · 2019-01-15 05:02

Introduction

I see you are trying to send mails $this->sendMail($html, $myTask['task_schedule_id']); and I think it's a really bad idea trying to use multiple process for this task. You should consider using message queue for this task because emails can be very slow.

Use a Queue System

You should be using Gearman, ZeroMQ or Beanstalkd for this task. Worst case scenario use Implement your own simple message queue with memcached.

Here is a typical Gearman Example: https://stackoverflow.com/questions/13855907/when-to-send-auto-email-instantly-on-button-click-or-later

Quick Fix

Remove all those code and put it in a function called execute_worker where you can push the task to it

// Break Task to groups
$tasks = array_chunk(range("A", "Z"), 10);
foreach($tasks as $task) {
    $pid = pcntl_fork();
    if ($pid == - 1) {
        throw new ErrorException('FORK FAILED, STATUS -1');
        break;
    }

    if ($pid == 0) {
        execute_worker($task); // In Child
        exit(); // In Child
    }
}

Using Threads

You can also use Worker or Thread in PHP with pThreads to speed up processing.

  • An easy to use, quick to learn Threading API for PHP5.3+
  • Execute any and all predefined and user declared methods and functions asynchronously
  • Ready made synchronization included, geared towards the PHP environment
  • Yes! Windows support

Simple Project

file_get_contents is said to be slow when compared with curl and no where close to the power of curl_multi_init which allows the processing of multiple cURL handles in parallel.

See:

Our Objective would be to implement our own Multi file_get_contents version

Multi file_get_contents Example

// My Storage
$s = new Storage();

// Threads Storage
$ts = array();

// Total Threads same as total pages
$pages = 100;

// Porpulate Threads (Don't Start Yet)
$i = 0;
while($i ++ < $pages) {
    $ts[] = new Process($s, $i);
}

// Start the timer
$start = microtime(true);

// Lets start all our Threads
foreach($ts as $t) {
    $t->start();
}

// Wait for all threads to compleate
foreach($ts as $t) {
    $t->join();
}

printf("\n\nFound %s in %d pages", number_format($s->total), $pages);
printf("\nFinished %0.3f sec", microtime(true) - $start);

Output

php a.php
3:01:37: 3548 #START    {"page":1}
3:01:37: 7064 #START    {"page":2}
3:01:37: 10908 #START   {"page":3}
3:01:37: 10424 #START   {"page":4}
3:01:37: 11472 #START   {"page":5}
3:01:37: 3876 #START    {"page":6}
3:01:37: 7276 #START    {"page":7}
3:01:37: 11484 #START   {"page":8}
3:01:37: 932 #START     {"page":9}
3:01:37: 11492 #START   {"page":10}
3:01:37: 11500 #START   {"page":11}
3:01:37: 11508 #START   {"page":12}
3:01:37: 11504 #START   {"page":13}
3:01:37: 11512 #START   {"page":14}
3:01:37: 11516 #START   {"page":15}
3:01:37: 11520 #START   {"page":16}
3:01:37: 11524 #START   {"page":17}
3:01:37: 11528 #START   {"page":18}
3:01:37: 10816 #START   {"page":19}
3:01:37: 7280 #START    {"page":20}
3:01:37: 11556 #START   {"page":21}
3:01:37: 11560 #START   {"page":22}
3:01:37: 11564 #START   {"page":23}
3:01:37: 11612 #START   {"page":24}
3:01:37: 11616 #START   {"page":25}
3:01:37: 11600 #START   {"page":26}
3:01:37: 11608 #START   {"page":27}
3:01:37: 11568 #START   {"page":28}
3:01:37: 11452 #START   {"page":29}
3:01:38: 11624 #START   {"page":30}
3:01:38: 11628 #START   {"page":31}
3:01:38: 11632 #START   {"page":32}
3:01:38: 11636 #START   {"page":33}
3:01:38: 11644 #START   {"page":34}
3:01:38: 11648 #START   {"page":35}
3:01:38: 11652 #START   {"page":36}
3:01:38: 11656 #START   {"page":37}
3:01:38: 11660 #START   {"page":38}
3:01:38: 11664 #START   {"page":39}
3:01:38: 11668 #START   {"page":40}
3:01:38: 11672 #START   {"page":41}
3:01:38: 11676 #START   {"page":42}
3:01:38: 11680 #START   {"page":43}
3:01:38: 11684 #START   {"page":44}
3:01:38: 11688 #START   {"page":45}
3:01:38: 11692 #START   {"page":46}
3:01:38: 11696 #START   {"page":47}
3:01:38: 11700 #START   {"page":48}
3:01:38: 11704 #START   {"page":49}
3:01:38: 11712 #START   {"page":50}
3:01:38: 11708 #START   {"page":51}
3:01:38: 11716 #START   {"page":52}
3:01:38: 11720 #START   {"page":53}
3:01:38: 11724 #START   {"page":54}
3:01:38: 11728 #START   {"page":55}
3:01:38: 11732 #START   {"page":56}
3:01:38: 11736 #START   {"page":57}
3:01:38: 11740 #START   {"page":58}
3:01:38: 11744 #START   {"page":59}
3:01:38: 11748 #START   {"page":60}
3:01:38: 11752 #START   {"page":61}
3:01:38: 11756 #START   {"page":62}
3:01:38: 11760 #START   {"page":63}
3:01:38: 11764 #START   {"page":64}
3:01:38: 11768 #START   {"page":65}
3:01:38: 11772 #START   {"page":66}
3:01:38: 11776 #START   {"page":67}
3:01:38: 11780 #START   {"page":68}
3:01:38: 11784 #START   {"page":69}
3:01:38: 11788 #START   {"page":70}
3:01:38: 11792 #START   {"page":71}
3:01:38: 11796 #START   {"page":72}
3:01:38: 11800 #START   {"page":73}
3:01:38: 11804 #START   {"page":74}
3:01:38: 11808 #START   {"page":75}
3:01:38: 11812 #START   {"page":76}
3:01:38: 11816 #START   {"page":77}
3:01:38: 11820 #START   {"page":78}
3:01:38: 11824 #START   {"page":79}
3:01:38: 11828 #START   {"page":80}
3:01:38: 11832 #START   {"page":81}
3:01:38: 11836 #START   {"page":82}
3:01:38: 11840 #START   {"page":83}
3:01:38: 11844 #START   {"page":84}
3:01:38: 11848 #START   {"page":85}
3:01:38: 11852 #START   {"page":86}
3:01:38: 11856 #START   {"page":87}
3:01:38: 11860 #START   {"page":88}
3:01:38: 11864 #START   {"page":89}
3:01:38: 11868 #START   {"page":90}
3:01:38: 11872 #START   {"page":91}
3:01:38: 11876 #START   {"page":92}
3:01:38: 11880 #START   {"page":93}
3:01:38: 11884 #START   {"page":94}
3:01:38: 11888 #START   {"page":95}
3:01:38: 11892 #START   {"page":96}
3:01:38: 11896 #START   {"page":97}
3:01:38: 11900 #START   {"page":98}
3:01:38: 11904 #START   {"page":99}
3:01:38: 11908 #START   {"page":100}
3:01:38: 11508 #END             {"page":12,"byte":1141,"count":155839}
3:01:38: 10424 #END             {"page":4,"byte":1201,"count":553595}
3:01:38: 11516 #END             {"page":15,"byte":1204,"count":119612}
3:01:38: 3548 #END              {"page":1,"byte":1208,"count":6737525}
3:01:38: 11484 #END             {"page":8,"byte":1160,"count":257021}
3:01:38: 11472 #END             {"page":5,"byte":1175,"count":446411}
3:01:38: 10908 #END             {"page":3,"byte":1222,"count":787301}
3:01:38: 11492 #END             {"page":10,"byte":1175,"count":193958}
3:01:38: 11504 #END             {"page":13,"byte":1130,"count":141450}
3:01:38: 11528 #END             {"page":18,"byte":1102,"count":95511}
3:01:38: 11524 #END             {"page":17,"byte":1147,"count":102727}
3:01:38: 11560 #END             {"page":22,"byte":1111,"count":73536}
3:01:38: 11556 #END             {"page":21,"byte":1101,"count":78097}
3:01:38: 11500 #END             {"page":11,"byte":1201,"count":172820}
3:01:38: 932 #END               {"page":9,"byte":1159,"count":222922}
3:01:38: 11520 #END             {"page":16,"byte":1135,"count":110510}
3:01:38: 7064 #END              {"page":2,"byte":1165,"count":1264444}
3:01:38: 11512 #END             {"page":14,"byte":1123,"count":129721}
3:01:38: 11612 #END             {"page":24,"byte":1115,"count":65012}
3:01:38: 11600 #END             {"page":26,"byte":1134,"count":58928}
3:01:38: 7276 #END              {"page":7,"byte":1189,"count":301469}
3:01:38: 10816 #END             {"page":19,"byte":1120,"count":89609}
3:01:38: 11616 #END             {"page":25,"byte":1052,"count":61793}
3:01:38: 3876 #END              {"page":6,"byte":1188,"count":362101}
3:01:38: 7280 #END              {"page":20,"byte":1079,"count":83632}
3:01:38: 11564 #END             {"page":23,"byte":1076,"count":68909}
3:01:38: 11632 #END             {"page":32,"byte":1095,"count":44013}
3:01:38: 11652 #END             {"page":36,"byte":1042,"count":37185}
3:01:38: 11452 #END             {"page":29,"byte":1097,"count":50532}
3:01:38: 11636 #END             {"page":33,"byte":1097,"count":42148}
3:01:38: 11644 #END             {"page":34,"byte":1124,"count":40236}
3:01:38: 11664 #END             {"page":39,"byte":1078,"count":32792}
3:01:38: 11668 #END             {"page":40,"byte":1017,"count":31487}
3:01:38: 11608 #END             {"page":27,"byte":1117,"count":55561}
3:01:38: 11628 #END             {"page":31,"byte":1076,"count":46133}
3:01:38: 11624 #END             {"page":30,"byte":1111,"count":48265}
3:01:38: 11568 #END             {"page":28,"byte":1076,"count":52851}
3:01:38: 11656 #END             {"page":37,"byte":1068,"count":35590}
3:01:38: 11688 #END             {"page":45,"byte":1062,"count":26060}
3:01:38: 11680 #END             {"page":43,"byte":1081,"count":28013}
3:01:38: 11672 #END             {"page":41,"byte":1086,"count":30320}
3:01:38: 11724 #END             {"page":54,"byte":1060,"count":19900}
3:01:38: 11716 #END             {"page":52,"byte":1069,"count":21079}
3:01:38: 11732 #END             {"page":56,"byte":1038,"count":18748}
3:01:38: 11692 #END             {"page":46,"byte":1033,"count":25230}
3:01:38: 11696 #END             {"page":47,"byte":1098,"count":24469}
3:01:38: 11728 #END             {"page":55,"byte":1003,"count":19353}
3:01:38: 11648 #END             {"page":35,"byte":1105,"count":38651}
3:01:38: 11660 #END             {"page":38,"byte":1075,"count":34037}
3:01:38: 11700 #END             {"page":48,"byte":1059,"count":23725}
3:01:39: 11720 #END             {"page":53,"byte":1028,"count":20463}
3:01:39: 11704 #END             {"page":49,"byte":1006,"count":22966}
3:01:39: 11712 #END             {"page":50,"byte":988,"count":22369}
3:01:39: 11676 #END             {"page":42,"byte":1113,"count":29144}
3:01:39: 11748 #END             {"page":60,"byte":1054,"count":17002}
3:01:39: 11684 #END             {"page":44,"byte":1041,"count":26999}
3:01:39: 11756 #END             {"page":62,"byte":1024,"count":16165}
3:01:39: 11760 #END             {"page":63,"byte":1036,"count":15814}
3:01:39: 11740 #END             {"page":58,"byte":1075,"count":17833}
3:01:39: 11736 #END             {"page":57,"byte":1064,"count":18293}
3:01:39: 11752 #END             {"page":61,"byte":1077,"count":16607}
3:01:39: 11708 #END             {"page":51,"byte":1045,"count":21668}
3:01:39: 11768 #END             {"page":65,"byte":1041,"count":15021}
3:01:39: 11764 #END             {"page":64,"byte":1063,"count":15405}
3:01:39: 11744 #END             {"page":59,"byte":1052,"count":17394}
3:01:39: 11800 #END             {"page":73,"byte":1025,"count":12361}
3:01:39: 11792 #END             {"page":71,"byte":1053,"count":13051}
3:01:39: 11796 #END             {"page":72,"byte":1092,"count":12721}
3:01:39: 11784 #END             {"page":69,"byte":1031,"count":13677}
3:01:39: 11780 #END             {"page":68,"byte":1019,"count":13967}
3:01:39: 11772 #END             {"page":66,"byte":1068,"count":14644}
3:01:39: 11816 #END             {"page":77,"byte":1045,"count":11185}
3:01:39: 11804 #END             {"page":74,"byte":1062,"count":12071}
3:01:39: 11824 #END             {"page":79,"byte":1047,"count":10719}
3:01:39: 11820 #END             {"page":78,"byte":1035,"count":10940}
3:01:39: 11788 #END             {"page":70,"byte":987,"count":13354}
3:01:39: 11776 #END             {"page":67,"byte":1036,"count":14278}
3:01:39: 11828 #END             {"page":80,"byte":1013,"count":10519}
3:01:39: 11832 #END             {"page":81,"byte":1052,"count":10318}
3:01:39: 11812 #END             {"page":76,"byte":991,"count":11465}
3:01:39: 11808 #END             {"page":75,"byte":1043,"count":11769}
3:01:39: 11860 #END             {"page":88,"byte":1018,"count":8991}
3:01:39: 11852 #END             {"page":86,"byte":971,"count":9362}
3:01:39: 11868 #END             {"page":90,"byte":1006,"count":8641}
3:01:39: 11840 #END             {"page":83,"byte":1026,"count":9922}
3:01:39: 11872 #END             {"page":91,"byte":980,"count":8464}
3:01:39: 11892 #END             {"page":96,"byte":936,"count":7727}
3:01:39: 11836 #END             {"page":82,"byte":1052,"count":10117}
3:01:39: 11844 #END             {"page":84,"byte":973,"count":9739}
3:01:39: 11864 #END             {"page":89,"byte":1033,"count":8821}
3:01:39: 11856 #END             {"page":87,"byte":994,"count":9169}
3:01:39: 11848 #END             {"page":85,"byte":1040,"count":9544}
3:01:39: 11896 #END             {"page":97,"byte":988,"count":7562}
3:01:39: 11876 #END             {"page":92,"byte":1003,"count":8294}
3:01:39: 11888 #END             {"page":95,"byte":995,"count":7860}
3:01:39: 11880 #END             {"page":93,"byte":1052,"count":8143}
3:01:39: 11900 #END             {"page":98,"byte":977,"count":7418}
3:01:39: 11904 #END             {"page":99,"byte":999,"count":7270}
3:01:39: 11884 #END             {"page":94,"byte":931,"count":8002}
3:01:39: 11908 #END             {"page":100,"byte":977,"count":7144}


Found 14,075,927 in 100 pages
Finished 1.489 sec

Time Taken

Found 14,075,927 in 100 pages
Finished 1.489 sec

Classes Used

class Process extends Thread {

    public function __construct($storage, $page) {
        $this->storage = $storage;
        $this->page = $page;
        // $this->start();
    }

    public function run() {
        $format = "%s: %1u %s\t%s\n";
        $formatTime = "g:i:s";
        $sleep = mt_rand(0, 1); // Just for Demo

        printf($format, date($formatTime), $this->getThreadId(), "#START", "{\"page\":$this->page}");

        // Do something useful
        $data = file_get_contents(sprintf("http://api.stackoverflow.com/1.1/tags?pagesize=100&page=%s", $this->page));

        // Decode the Data from API
        $json = json_decode(gzdecode($data));

        // Lets Build A profile
        $profile = array();
        $profile['page'] = $this->page;
        $profile['byte'] = strlen($data);

        // Do more work
        $profile['count'] = array_sum(array_map(function ($v) {
            return $v->count;
        }, $json->tags));

        $this->storage->total = bcadd($this->storage->total, $profile['count']);
        // Print Information
        printf($format, date($formatTime), $this->getThreadId(), "#END\t", json_encode($profile));
    }
}

class Storage extends Stackable {
    public $total = 0;

    public function run() {
    }
}

Conclusion

Did file_get_contents just get 100 pages in just 1.489 sec with my crappy connection. Yes it did. Tested the same code on my live server and It took me less than 0.939 sec to fetch 200 pages.

Your application can be faster in so many ways you just have to use the right technology at the right place.

查看更多
登录 后发表回答