xiaolingzi's blog

每天都在成长...

欢迎您:亲

PHP守护进程和多进程的实现

xiaolingzi 发表于 2017-07-20 09:27:00

PHP除了做web相关的功能之后,有时我们还需要使用它来处理一些后台任务。对于一些繁重的工作,单一的进程处理无法满足我们的需要,这时候我们就需要多线程或者多进程来处理。PHP默认不支持多线程,但有对应的扩展程序来实现。PHP多进程可以使用pcntl来实现。今天这里我们主要来讲讲在cli模式下,PHP多进程、守护进程的使用,而且只是在linux下运行。

一、多进程

多进程可以通过pcntl_fork来实现,具体看一下代码:

<?php
            
for($i = 0; $i < 3; $i ++)
{
    $pid = pcntl_fork();
    if($pid == 0)
    {
        //子进程
        while (true)
        {
            echo "I am $i worker\n";
            sleep(2);
        }
            
    }
    else if($pid == - 1)
    {
        throw new Exception('fork process fail!');
    }
    else if($pid > 0)
    {
        // 父进程
    }
}
//让父进程一直运行
while (true)
{
    sleep(5);
}

调用pcntl_fork方法会返回进程Id,如上面的$pid。如果可以获得进程Id则为父进程运行,$pid > 0的条件区块里面可以运行父进程的代码如果;如果无法获取进程ID,$id为0时则为子进程运行,$pid == 0的条件区块里面运行子进程的处理代码;如果$pid等于-1,则进程开启失败。

运行结果如下:

I am 0 worker
I am 1 worker
I am 2 worker
I am 0 worker
I am 1 worker
I am 2 worker

我们可以看到三个进程都在运行。


二、守护进程

上面已经实现的多进程,但它们的运行都是基于当前的终端会话session,当我们关闭终端时他们也会相应地终止掉。在很多情况下,我们并不关心也不可能一直盯着关注程序的运行,这时我们就需要守护进程,让进程在后台处理,关闭终端也不会影响进程的运行。

守护进程的实现也很简单,给子进程创建新的会话(session),进程就会脱离终端session运行,从而达到守护进程的目的。

我们改造一下上面的代码,如下:

<?php
            
for($i = 0; $i < 3; $i ++)
{
    $pid = pcntl_fork();
    if($pid == 0)
    {
        // 建立新session以脱离终端
        $sid = posix_setsid();
        if($sid < 0)
        {
            echo "set sid fail,exit!";
            exit();
        }
                    
        //子进程
        while (true)
        {
            echo "I am $i worker\n";
            sleep(2);
        }
            
    }
    else if($pid == - 1)
    {
        throw new Exception('fork process fail!');
    }
    else if($pid > 0)
    {
        // 父进程
    }
}
//让父进程退出
exit();

子进程脱离终端运行,父进程代码运行完之后就退出了。


假设我们的文件路径为/var/temp/test.php,通过php /var/temp/test.php来运行,那么我们可以通过以下命令来查看名称包含test关键字的进程。

ps -ef | grep "test"

运行结果如下,我们可以看到有三个进程在运行。

root     19023     1  0 16:51 ?        00:00:00 php /var/temp/test.php
root     19024     1  0 16:51 ?        00:00:00 php /var/temp/test.php
root     19025     1  0 16:51 ?        00:00:00 php /var/temp/test.php
root     19033 18748  0 16:51 pts/0    00:00:00 grep test

运行时依然会想第一次运行那样在屏幕上打印输出,是不是觉得很烦人很影响操作,所以进程进入后台运行时我们希望它们不要再打印出来,所以我们把标准输出关闭。其实包括标准输入、输出和错误输出都可以关闭掉。我们再来改造一下上面的代码,如下:

<?php
            
for($i = 0; $i < 3; $i ++)
{
    $pid = pcntl_fork();
    if($pid == 0)
    {
        // 建立新session以脱离终端
        $sid = posix_setsid();
        if($sid < 0)
        {
            echo "set sid fail,exit!";
            exit();
        }
                    
        //关闭标准输入、输出、错误输出
        fclose(STDIN);
        fclose(STDOUT);
        fclose(STDERR);
                    
        //关闭了标准输出之后要指定输出位置,否则子进程出现echo等打印语句时会中断程序运行
        $STDIN = fopen('/dev/null', 'r');
        $STDOUT = fopen('/dev/null', 'a');
        $STDERR = fopen('/dev/null', 'a');
                    
        //子进程
        while (true)
        {
            echo "I am $i worker\n";
            sleep(5);
        }
            
    }
    else if($pid == - 1)
    {
        throw new Exception('fork process fail!');
    }
    else if($pid > 0)
    {
        // 父进程
    }
}
exit();

运行之后我们可以通过 ps -ef | grep "test" 看到3个进程在运行,但是已经没有了烦人的屏幕打印。


三、进程信号量

关于信号量这里只做简单的介绍,详细的介绍自行搜索。信号量可以用于进程之间的通讯,进程可以注册监听信号量来进行相应的操作。

几个本次会用到的信号量说明如下:

SIGTERM 进程终止信号量,比如kill 3023就会触发该信号量。

SIGKILL 进程强制终止,比如 kill -SIGKILL 3023 ,该信号量和SIGSTOP一样不能被捕获

SIGUSR1 用户自定义信号量

SIGUSR2 用户自定义信号量

SIGCHLD 子进程终止信号量,子进程终止后会触发该信号量,主进程可以监听该信号量进行相应的操作。

更多信号量的含义自行搜索。

僵尸进程的查看方法如下:

ps -A -o stat,ppid,pid,cmd | grep -e '^[Zz]'


四、实现守护多进程执行任务

这里我们实现一个多进程处理的任务的例子,除了多进程运行,还完成以下进程管理功能:

1.子进程强制重启

2.子进程优雅重启(完成单项任务重启,防止一项任务做一半退出导致数据不一致)

3.杀掉所有子进程

为了实现以上功能,该例子还加入了进程的管理部分,也就是在进程数量会做出一个主进程用于管理子进程。

主要逻辑如下:

1. fork一个子进程做为主进程A,并脱离终端运行。主进程注册SIGUSR1、SIGUSR2、SIGTERM,并将SIGCHLD注册到内核,让内核回收结束运行的子进程以防止僵尸进程。各注册信号量用户实现的功能如下:

(1)SIGUSR1 子进程强制重启。强行杀死现有子进程,然后启动同等数量的子进程。

(2)SIGUSR2 子进程优雅重启。通知现有子进程完成单项完整任务之后退出,然后重启同等数量的子进程

(3)SIGTERM 杀掉所有子进程。当主进程结束时,强制杀死所有子进程

2. 主进程A再fork用户配置数量的子进程用于执行具体的任务。子进程监听SIGUSR2信号量,当主进程接收到SIGUSR2信号量时,主进程向子进程发送SIGUSR2信号量通知子进程完成单项任务后退出。

3. 主进程A通过while循环保持一直运行和接收信号量。

4. 基于终端运行的进程退出,保留后端运行的主进程A及它的子进程。

最终代码看最后的代码,代码调用如下:

<?php
require_once 'Process.php';
            
function main()
{
    //具体任务
    echo 111;
}
            
$processConfig = array(
            "workFunction"=>"main" //要执行的任务的方法
            ,"workerNumber"=>3     //进程数量
            ,"daemonize"=>true    //是否守护运行,本例子非守护运行都按单进程前端运行
            ,"loopTimespan"=>2     //循环执行的时间间隔
        );
                    
(new Process())->start($processConfig);

需要注意的是,重启子进程只能重新加载main方法中加载的类。比如上面的main方法里面require的文件修改后重启子进程会生效,或者main里面调用的类是自动动态加载的类也会生效。

运行之后我们可以向进程发送相应的信号量进程测试,假设主进程的Id为6065.

执行 kill -SIGUSR1 6065 看看进程是否强制重启(对比子进程id变化)

执行 kill -SIGUSR2 6065 看看进程是否优雅重启(对比子进程id变化)

执行 kill 6065 看看主进程和子进程是否都被终止


最终代码:

<?php
class Process
{
    private $_workerArr = array();
    private $_configArr = array();
    private $_mainProcessStatus=TRUE;
    private $_isFockSubProcesses=TRUE;
    private $_subProcessStatus=TRUE;
    public function start($configArr)
    {
        //未指定执行函数则直接退出
        if(empty($configArr) || !array_key_exists("workFunction", $configArr) || empty($configArr["workFunction"]))
        {
            echo "work function not setted!\n";
            return ;
        }
        //初始化配置
        $this->initConfig($configArr);
                    
        $func = $this->_configArr["workFunction"];
        $parameterArr = $this->_configArr["parameters"];
        // 如果不支持,则直接运行
        if(! function_exists('pcntl_fork'))
        {
            echo "pcntl_fork not supported, run directly!\n";
            call_user_func_array($func, $parameterArr);
            return;
        }
                    
        //检查进程是否已经运行
        $processTitle = $this->_configArr["processTitle"];
        if($this->checkProcess($processTitle))
        {
            echo "Another process is runing, please kill first!\n";
            return ;
        }
                    
        // 如果不是以守护进程运行,就直接单进程运行
        if(! $this->_configArr["daemonize"])
        {
            call_user_func_array($func, $parameterArr);
            return;
        }
                    
        // 如果是非cli模式,就直接运行代码
        if(substr(php_sapi_name(), 0, 3) !== 'cli')
        {
            call_user_func_array($func, $parameterArr);
            return;
        }
                    
        $workerNumber = intval($this->_configArr["workerNumber"]);
        if($workerNumber<=0)
        {
            //如果设置的进程数量小于等于0,那么直接默认开启一个守护进程进行处理
            $this->_configArr["workerNumber"]=1;
            $this->fockSubProcesses();
        }
        else
        {
            //先开启一个主进程,然后再开启子进程,为了可以通过主进程管理子进程
            $this->fockMainProcess();
        }
        exit(0);
    }
                
                
            
    private function fockSubProcesses()
    {
        $this->log("fock start");
                    
        $this->_workerArr=array();
        $func = $this->_configArr["workFunction"];
        $workerNumber = $this->_configArr["workerNumber"];
        $loopTimespan = intval($this->_configArr["loopTimespan"]);
        $parameterArr = $this->_configArr["parameters"];
                    
        $this->_subProcessStatus = true;
        $this->log($workerNumber." fock start ".$loopTimespan);
        for($i = 0; $i < $workerNumber; $i ++)
        {
            $pid = pcntl_fork();
            if($pid == 0)
            {
                // 建立新session以脱离终端
                $sid = posix_setsid();
                if($sid < 0)
                {
                    $this->log("set sid fail,exit!");
                    exit();
                }
            
                declare(ticks = 1);
                pcntl_signal(SIGUSR2, array(__CLASS__,"subSignalHandler"));
                            
                // 子进程
                $subPID=getmypid();
                            
                if($loopTimespan<=0)
                {
                    call_user_func_array($func, $parameterArr);
                    //子进程执行完之后要执行exit让子进程退出
                    exit(0);
                }
                else
                {
                    while ($this->_subProcessStatus)
                    {
                        call_user_func_array($func, $parameterArr);
                        pcntl_signal_dispatch();
                        sleep($loopTimespan);
                    }
                    //子进程执行完之后要执行exit让子进程退出
                    exit(0);
                }
            }
            else if($pid == - 1)
            {
                throw new Exception('fork process fail!');
            }
            else if($pid > 0)
            {
                //父进程
                $this->_workerArr["$pid"]=true;
            }
        }
    }
            
    private function fockMainProcess()
    {
        $workerNumber = $this->_configArr["workerNumber"];
        $processTitle = $this->_configArr["processTitle"];
                    
        $this->log("main process");
        $pid = pcntl_fork();
        if($pid == 0)
        {
            // 建立一个有别于终端的新session以脱离终端
            $sid = posix_setsid();
            if($sid < 0)
            {
                echo "set sid fail,exit!\n";
                exit();
            }
            if(!empty($processTitle))
            {
                cli_set_process_title($processTitle);
            }
                        
            declare(ticks = 1);
            pcntl_signal(SIGUSR1, array(__CLASS__,"signalHandler"));
            pcntl_signal(SIGUSR2, array(__CLASS__,"signalHandler"));
            pcntl_signal(SIGTERM, array(__CLASS__,"signalHandler"));
            //不关心子进程退出,注册到内核让内核进行回收
            pcntl_signal(SIGCHLD, SIG_IGN);
                        
            // 关闭打开的文件描述符
            fclose(STDIN);
            fclose(STDOUT);
            fclose(STDERR);
                        
            $STDIN = fopen('/dev/null', 'r');
            $STDOUT = fopen('/dev/null', 'a');
            $STDERR = fopen('/dev/null', 'a');
                        
            while ($this->_mainProcessStatus)
            {
                if($this->_isFockSubProcesses)
                {
                    // 子进程
                    $this->fockSubProcesses();
                    $this->_isFockSubProcesses=false;
                }
                pcntl_signal_dispatch();
                sleep(1);
            }
        }
        else if($pid == - 1)
        {
            throw new Exception('fork process fail!');
        }
        else if($pid > 0)
        {
                        
        }
    }
            
    private function initConfig($configArr)
    {
        //进程数量,默认1
        if(! array_key_exists("workerNumber", $configArr))
        {
            $configArr["workerNumber"] = 1;
        }
        //是否守护运行,默认否
        if(! array_key_exists("daemonize", $configArr))
        {
            $configArr["daemonize"] = false;
        }
        //间隔执行时间,默认1秒。设置之后按时间间隔循环执行任务
        if(! array_key_exists("loopTimespan", $configArr))
        {
            $configArr["loopTimespan"] = 1;
        }
        //进程标题,默认不设置
        if(! array_key_exists("processTitle", $configArr))
        {
            $configArr["processTitle"] = "";
        }
        //执行方法的参数,默认为空
        if(! array_key_exists("parameters", $configArr))
        {
            $configArr["parameters"] = array();
        }
                    
        $this->_configArr = $configArr;
        return $configArr;
    }
                
    public function signalHandler($signal)
    {
        switch ($signal)
        {
            case SIGUSR1:
                $this->log("SIGUSR1 in");
                            
                //自定义信号
                $this->killAllSubProcesses(true);
                            
                //通知主进程重启所有子进程
                $this->_isFockSubProcesses=true;
                //直接在这里调用该方法fork新的子进程会有问题,比如会打断现有子进程的信号接收
                //$this->fockSubProcesses();
                            
                $this->log("SIGUSR1 out");
                break;
            case SIGUSR2:
                $this->log("SIGUSR2 in");
            
                //自定义信号
                $this->killAllSubProcesses(false);
                            
                //通知主进程重启所有子进程
                $this->_isFockSubProcesses=true;
                //直接在这里调用该方法fork新的子进程会有问题,比如会打断现有子进程的信号接收
                //$this->fockSubProcesses();
                            
                $this->log("SIGUSR2 out");
                break;
            case SIGTERM:
                $this->log("SIGTERM in");
                            
                $this->_mainProcessStatus=false;
                //父进程退出前退出所有子进程
                $this->killAllSubProcesses(true);
                            
                $this->log("SIGTERM out");
                //退出父进程
                exit(0);
                break;
            default:
                break;
        }
    }
                
    public function subSignalHandler($signal)
    {
        switch ($signal)
        {
            case SIGUSR2:
                $this->log("sub SIGUSR2 in");
                
                //通知子进程结束
                $this->_subProcessStatus=false;
            
                $this->log("sub SIGUSR2 out");
                break;
            default:
                break;
        }
    }
                
    private function killAllSubProcesses($isKill=TRUE)
    {
        $this->log(json_encode($this->_workerArr));
        if(!empty($this->_workerArr))
        {
            if($isKill)
            {
                foreach ($this->_workerArr as $pid=>$value)
                {
                    $this->_workerArr["$pid"] = false;
                    $result = posix_kill($pid, SIGKILL);
                    //如果正常终止,需要调用pcntl_waitpid等待子进程结束回收占用资源,防止僵尸进程出现
//                     $result = posix_kill($pid, SIGTERM);
//                     $status=-1;
//                     pcntl_waitpid($pid, $status);
//                     if(pcntl_wifexited($status))
//                     {
//                         unset($this->_workerArr["$pid"]);
//                     }
//                     $this->log("$pid"."--".strval(pcntl_wifexited($status)));
                }
            }
            else
            {
                foreach ($this->_workerArr as $pid=>$value)
                {
                    //通知子进程结束
                    $result = posix_kill($pid, SIGUSR2);
                }
                            
            }
        }
    }
                
    private function checkProcess($processTitle)
    {
        if(empty($processTitle))
        {
            global $command;
            $processTitle=ROOT_PATH."/App.php -i $command";
        }
        $cmd = "ps axu|grep \"$processTitle\"|grep -v \"grep\"|wc -l";
        $result = shell_exec("$cmd");
        $result = trim($result, "\r\n");
                     
        //为0则没有任何进程,考虑本进程就已经为1,只有为2才有另外的进程
        if($result==="1")
        {
            return false;
        }
        return true;
    }
                
    private function checkSubProcess($pid)
    {
        $cmd = "pstree -p $pid | wc -l";
        $result = shell_exec("$cmd");
        $result = trim($result, "\r\n");
        if($result>0)
        {
            return true;
        }
        return false;
    }
                
                
    private function log($txt)
    {
        $exceptionMessage = $txt."\n";
                     
        $filePath=__DIR__."/logs/process";
        if(!file_exists($filePath))
        {
            mkdir($filePath,0777,true);
        }
        $fileName=$filePath."/".date("Ymd",time()).".txt";
        $fp=fopen($fileName, "a");
        fwrite($fp, $exceptionMessage);
        fclose($fp);
    }
                
}


      

转载请注明出处:http://www.xxling.com/article/3113.aspx

  • 分类: PHP
  • 阅读: (1681)
  • 评论: (1)
砖墙
天天ZVB
reeeeeeeeeeeeeeee
拍砖 取消
请输入昵称
请输入邮箱
*
 选择评论类型
300字以内  请输入评论内容