骑驴找蚂蚁

全干工程师

Swoole定时器实现Linux的Crontab

Crontab

crontab是要定期运行的命令的列表,以及用于管理该列表的命令的名称。 crontab代表“cron表”,因为它使用作业调度程序cron来执行任务; cron本身以“chronos”命名,希腊语为时间。cron是系统进程,它会根据一个安排的时间表为你自动执行任务。该计划称为crontab,它也是用于编辑该计划的程序的名称

Crontab配置语法

 * * * * * 要执行的命令
 - - - - -
 | | | | |
 | | | | ----- 星期几 (0 - 7) (星期天=0 或者 7)
 | | | ------- 月份 (1 - 12)
 | | --------- 天数 (1 - 31)
 | ----------- 小时 (0 - 23)
 ------------- 分钟 (0 - 59)

Note:

  • 每天八点执行: 0 8 * * * php Eight.php
  • 每月10号8点执行: 0 8 10 * * php Eight.php
  • 星期天8点执行:0 8 * * 7 php Eight.php

Swoole定时器

在swoole扩展中提供一个叫[swoole_timer_tick][3]函数,它的作用是在设定的毫秒内重复执行回调函数(当然你也可以清除掉这个定时器)。这个和JavaScriptsetTimeoutsetInterval类似。swoole_timer_tick底层是[epoll]模型和堆结构实现。

    //file:swoole.c line:857 这个函数在php启动时候执行一次,
    swoole_init();

    //file:swoole_timer.c  line: 249
    swTimer_node *tnode = SwooleG.timer.add(&SwooleG.timer, ms, persistent, cb, timer_func);

    //file:Timer.c line:161
    tnode->heap_node = swHeap_push(timer->heap, tnode->exec_msec, tnode);

    //这段代码是整个swoole的核心,file:ReactorEpoll.c line:215
    while (reactor->running > 0)
    {
        ...
        if (reactor->onFinish != NULL)
        {
            reactor->onFinish(reactor);
        }
    }

这个是使用gdb调试swoole_timer_tick的调用堆栈。

Breakpoint 5, swReactor_onTimeout (reactor=0x15c1880) at /home/soft/swoole/src/reactor/ReactorBase.c:194
194 {
(gdb) bt
#0  swReactor_onTimeout (reactor=0x15c1880) at /home/soft/swoole/src/reactor/ReactorBase.c:194
#1  0x00007fffe82a2ff1 in swReactorEpoll_wait (reactor=0x15c1880, timeo=<optimized out>) at /home/soft/swoole/src/reactor/ReactorEpoll.c:289
#2  0x00007fffe826d874 in php_swoole_event_wait () at /home/soft/swoole/swoole_event.c:212
#3  0x0000000000843779 in zend_call_function (fci=fci@entry=0x7fffffffcd90, fci_cache=0x7fffffffccd0, fci_cache@entry=0x0)
    at /home/soft/php-7.1.5/Zend/zend_execute_API.c:869
#4  0x0000000000843cd5 in _call_user_function_ex (object=object@entry=0x0, function_name=<optimized out>, 
    retval_ptr=retval_ptr@entry=0x7fffffffcdf0, param_count=<optimized out>, params=<optimized out>, no_separation=no_separation@entry=1)
    at /home/soft/php-7.1.5/Zend/zend_execute_API.c:672
#5  0x000000000075c99f in user_shutdown_function_call (zv=<optimized out>) at /home/soft/php-7.1.5/ext/standard/basic_functions.c:4982
#6  0x0000000000863aab in zend_hash_apply (ht=0x7fffefa561f8, apply_func=apply_func@entry=0x75c8e0 <user_shutdown_function_call>)
    at /home/soft/php-7.1.5/Zend/zend_hash.c:1508
#7  0x000000000075f726 in php_call_shutdown_functions () at /home/soft/php-7.1.5/ext/standard/basic_functions.c:5066
#8  0x00000000007f2c65 in php_request_shutdown (dummy=dummy@entry=0x0) at /home/soft/php-7.1.5/main/main.c:1819
#9  0x00000000008ebe1b in do_cli (argc=2, argv=0x11f4c20) at /home/soft/php-7.1.5/sapi/cli/php_cli.c:1160
#10 0x0000000000448820 in main (argc=2, argv=0x11f4c20) at /home/soft/php-7.1.5/sapi/cli/php_cli.c:1381

onTimeout的调用是通过epoll_wait来的,之后调用swReactor_onTimeout_and_Finish函数, 然后调用php_swoole_onInterval函数这个会调用我们定义的回调函数。

/**
 * execute when reactor timeout and reactor finish
 */
static void swReactor_onTimeout_and_Finish(swReactor *reactor)
{
    //check timer
    if (reactor->check_timer)
    {
        swTimer_select(&SwooleG.timer); //这个函数进去之后就是回调我们定义的函数 
    }
    ...
}

各位可以用gdb跟踪下swoole_timer_tick整个运行流程。

Crond实现

我们这里的实现主要是任务控制部分,任务的执行部分交给SymfonyProcess`类库去处理。

Task类

既然用得crontab配置, 那肯定是要解析配置。我们在类的构造函数去解析当前任务的配置主要包括:

  • 星期几
  • 某月
  • 某日
  • 某小时
  • 某分钟
  • 执行命令

当内容为 * 号时,说明当前配置列所有段都执行。

class Task
{
    private $taskString;

    private $min;

    private $hour;

    private $day;

    private $month;

    private $week;

    private $command;

    private $process;

    private $runTime;

    /**
     * @var string $taskString example: 10 * * * * php example.php
     */
    public function __construct(string $taskString)
    {
        $this->taskString = $taskString;
        $this->runTime = time();
        $this->initialize();
    }

    /**
     * 初始化任务配置
     */
    private function initialize()
    {
        //过滤多余的空格
        $rule = array_filter(explode(" ", $this->taskString), function($value) {
            return $value != "";
        });
        if (count($rule) < 7) {
            throw new ErrorException("'taskString' parse failed");
        }
        $this->min = $this->format($rule[0], 'min');
        $this->hour= $this->format($rule[1], 'hour');
        $this->day = $this->format($rule[2], 'day');
        $this->month = $this->format($rule[3], 'month');
        $this->week= $this->format($rule[4], 'week');
        $this->command = array_slice($rule, 5);
    }

}

在初始化配置时候使用了format函数格式化列的内容,因为我知道列的内容是可以配置成1, 2, 3*/10, 1-10这种形式,所以我们能过这个函数统一处理。函数接收到两个参数一个为列值,一个为列名。

private function format($value, $field)
    {
        if ($value === '*') {
            return $value;
        }
        if (is_numeric($value)) {
            return [$this->checkFieldRule($value, $field)];
        }
        $steps = explode(',', $value);
        $scope = [];
        foreach ($steps as $step) {
            if (strpos($step, '-') !== false) {
                $range = explode('-', $step);
                $scope = array_merge($scope, range(
                    $this->checkFieldRule($range[0], $field),
                    $this->checkFieldRule($range[1], $field)
                ));
                continue;
            }
            if (strpos($step, '/') !== false) {
                $inter = explode('/', $step);
                $confirmInter = isset($inter[1]) ? $inter[1] : $inter[0];
                if ($confirmInter === '/') {
                    $confirmInter = 1; 
                }
                $scope = array_merge($scope, range(
                    constant('FIRST_' . strtoupper($field)),
                    constant('LAST_' . strtoupper($field)),
                    $confirmInter
                ));
                continue;
            }
            $scope[] = $step;
        }
        return $scope;
    }

format返回*或者一个数组, 因为1-10这种实际就是1到10的一个数组,*/2 这种实际也是一个有步长的数组,使用range就实现了这个数组,1,2,3,30,31,32 这种就是直接分割成数组。使用了checkFieldRule来验证值的合法性.



private function checkFieldRule($value, $field)
{
    $first = constant('FIRST_' . strtoupper($field));
    $last  = constant('LAST_' . strtoupper($field));
    if ($value < $first) {
        return $first;
    }
    if ($value > $last) {
        return $last;
    }
    return (int) $value;
}

函数使用了几个常量来验证。

define('FIRST_MIN', 0);
define('LAST_MIN', 59);
define('FIRST_HOUR', 0);
define('LAST_HOUR', 23);
define('FIRST_DAY', 1);
define('LAST_DAY', 31);
define('FIRST_MONTH', 1);
define('LAST_MONTH', 12);
define('FIRST_WEEK', 0);
define('LAST_WEEK', 6);

为任务类添加一个执行命令的方法, 在任务达到条件时执行此方法来执行命令,$this->process->start()以异步方式运行任务

public function run()
{
    if (null === $this->process) {
        $this->process = new Process(implode(" ", $this->command));
    }
    $this->process->start();
}

因为类中的属性都是私有的需要创建几个辅助函数来处理setget问题不使用__get__set

public function getTimeAttribute($attribute)
{
    if (!in_array($attribute, ['min', 'hour', 'day', 'month', 'week', 'runTime'])) return null;
    return $this->{$attribute} ?? null;
}

public function setRunTime($time)
{
    $this->runTime = $time;
}

Job类

Job类是一个实现迭代器接口的类,方遍我们管理Task。

class Job implements Iterator
{
    private $position = 0;
    private $jobs = [];

    public function addJob(Task $task)
    {
        $this->jobs[] = $task;
    }

    public function current()
    {
        return $this->jobs[$this->position];
    }

    public function key()
    {
        return $this->position;
    }

    public function next()
    {
        ++$this->position;
    }

    public function rewind()
    {
        $this->position = 0;
    }

    public function valid()
    {
        return isset($this->jobs[$this->position]);
    }
}

swoole_timer_tick回调

在回调我们需要从Job类中把所有Task拿出来,检测Task是否可以执行。

swoole_timer_tick(1000, function($timeId, $params = null) use ($jobs, &$prevTime) {
    $current = time();
    //week, month, day, hour, min
    $ref = explode('|', date('w|n|j|G|i', $current));
    if ($prevTime) {
        $prevMin  = date('i', $prevTime);
        if ($prevMin == $ref[4]) {
            return true;
        }
    }
    foreach ($jobs as $task) {
        $ready = 0;
        //$diff = $task->getTimeAttribute('runTime') - $current;
        //对应上面的$ref数组
        foreach (['week', 'month', 'day', 'hour', 'min'] as $key => $field) {
            $value = $task->getTimeAttribute($field);
            if ($value === '*') {
                $ready += 1;
                continue;
            }
            $ready += in_array($ref[$key], $value) ? 1: 0;
        }
        if (5 === $ready) {
            //执行任务
            $task->run();
            //更新运行时间
            $task->setRunTime($current);
        }
    }
    $prevTime = $current;
    return true;
    //swoole_timer_clear($timeId);
});

Note:
我们这里设置的是秒级别,而我们的任务是分级别。而且这样的话我们的任务会在当前分钟重复执行60次,这是我们不愿意看到的。所以程序里面定义了一个$prevTime的变量,来控制这次执行是不是和上次同一份钟,用秒来定义是精确任务执行时间。当任务达到了当前周、月、日、时、分条件时才执行任务。

完成

根据上面的实现,重新规范下代码。

swoole-crontab

测试

  • 新建一个crontabexample.php文件 内容为下面代码。
#!/usr/bin/php
<?php

include __DIR__ . '/vendor/autoload.php';

use Cron\Cron;

$command = "php " . __DIR__ . '/example.php';

$crontab = <<<EOF
*/1 * * * * $command
EOF;

$cron = new Cron( ($tasks = explode("\n", $crontab)));
$cron->start();
//example.php文件内容
<?php

$time = date('Y-m-d H:i:s');

file_put_contents("crontab.txt", $time . "\r\n", FILE_APPEND);
  • 执行命令
[root@meshell swoole-crontab]# php crontab
  • 效果

swoole-crontab-example

  • 项目源码

https://github.com/TianLiangZhou/loocode-example/tree/master/swoole-crontab

推荐阅读

  1. crontab介绍使用
  2. gdb调试
  3. crond源码

留言