骑驴找蚂蚁

全干工程师

PHP fiber示例多任务协作

PHP8.1中发布了一项重要功能有栈协程Fiber,相比原来的Generator(无栈协程),它无需自己实现调度器,可在任意地方中断执行,不需要改变自己的返回类型(生成器需要一直返回Generator)。让我们写几简单使用的程序,熟悉下Fiber类的使用方法。

什么是Fiber

虽然Fiber不是Thread,但它们可以帮助您的代码更有效地更快地完成多项任务,这从表面上看可能看起来像线程。Fiber的作用是允许您并行发起多个请求,然后等待所有请求完成(以任何顺序),从而让您的代码花更少的时间等待外部资源。

要使用Fiber,您的情况必须满足以下几个条件:

必须处理PHP以外的东西

您的资源必须位于 PHP 外部,以便可以与 PHP 代码并行处理。 PHP 仍然是单线程的,因此当特定的纤程正在执行时,脚本中的任何其他内容都不会执行。这意味着实际工作需要卸载到单独的进程中。满足这种要求的常见场景是网络请求或子流程。

需要能够以不阻止脚本执行的方式请求外部资源

例如,复杂的数据库查询不满足此要求,因为无法在查询运行时继续脚本并稍后收集结果。

Fiber类摘要

final class Fiber {
/* 方法 */
public __construct(callable $callback)
// 启动 fiber 的执行
public start(mixed ...$args): mixed
// 使用值恢复 fiber 的执行
public resume(mixed $value = null): mixed
// 用一个异常来恢复 fiber 的执行
public throw(Throwable $exception): mixed
// 获取 Fiber 的返回值
public getReturn(): mixed
// 确定 fiber 是否启动
public isStarted(): bool
// 确认 fiber 是否挂起
public isSuspended(): bool
// 确认 fiber 是否正在运行
public isRunning(): bool
// 确认 fiber 是否终止
public isTerminated(): bool
// 暂停当前 fiber 的执行
public static suspend(mixed $value = null): mixed
// 获取当前正在执行的 Fiber 实例
public static getCurrent(): ?Fiber
}

基础用法

编写一个使用基本的挂起、恢复、返回的程序。

<?php  
$fiber = new Fiber(function (): string { 
	//暂停协程, 调用 Fiber::start()、 Fiber::resume()、 Fiber::throw() 将执行切换到当前 fiber 时,提供给本方法的值,也将是这几个方法所返回的值, 
	$value = Fiber::suspend('fiber');  
	echo "Value used to resume fiber: ", $value, PHP_EOL;  
	return "Return Response";
});
// 启动协程
$value = $fiber->start();  // 调用构造函数的回调函数, 执行`suspend`暂停了协程,接受暂停函数提供的值。
echo "Value from fiber suspending: ", $value, PHP_EOL;  
// 恢复协程,在原来暂停的地方继续执行之后的代码。
$fiber->resume('test');  
// 返回协程执行完成后的值
echo $fiber->getReturn();
?>

以上示例会输出:

Value from fiber suspending: fiber
Value used to resume fiber: test
Return Response

多任务

Fiber主要目的就是并行处理多个任务,在不同任务之间中断恢复即可快速完成所有任务。

同时读取多目录文件

在没有fiber之前可能读取多个目录只能同步一个个读,像下面这样的。

function readDirFiles(string $dir)  
{  
    $files = [];  
    foreach (new DirectoryIterator($dir) as $item){  
        $files[] = $item->getFilename();  
    }  
    return $files;  
}  
var_dump(readDirFiles(__DIR__));  
var_dump(readDirFiles('.'));

如何使用fiber改造呢,把两次读取当成两个任务,一个任务相当于一个fiber。改造之后的代码。

function readDirFiles(string $dir)  
{  
    $files = [];  
    foreach (new DirectoryIterator($dir) as $item){  
        $files[] = $item->getFilename();  
        // 并行处理的核心关键处代码, 不加此处还是相当于同步
        Fiber::suspend();  
    }  
    return $files;  
}  
//var_dump(readDirFiles(__DIR__));  
//var_dump(readDirFiles('.'));  
$fibers = [];  
foreach ([__DIR__, '.'] as $dir) {  
    $fiber = new Fiber(readDirFiles(...));  
    $fiber->start($dir);  
    $fibers[] = $fiber;  
}  
$files = [];  
while ($fibers){  
    foreach ($fibers as $idx => $fiber){  
        if ($fiber->isTerminated()) {  
            $files[$idx] = $fiber->getReturn();  
            unset($fibers[$idx]);  
        } else {  
            $fiber->resume();  
        }  
    }  
}  
var_dump($files);

上面的示例可能看起来提升不是很明显,试想下如果换成远程http请求,ffmpeg视频转换呢。

发送多次Http请求

$https = [  
    'baidu' => 'https://www.baidu.com/',  
    'sina' => 'https://www.sina.com.cn/',  
    '163' => 'https://www.163.com/',  
    'toutiao' => 'https://www.toutiao.com/',  
    'qq' => 'https://www.qq.com/',  
    'news' => 'http://www.xinhuanet.com/',  
];

$fibers = [];  
foreach ($https as $key => $http) {  
    $fiber = new Fiber(function (string $url) {  
        Fiber::suspend();  
        return file_get_contents($url);  
    });  
    $fiber->start($http);  
    $fibers[] = $fiber;  
}  
  
while ($fibers){  
    foreach ($fibers as $idx => $fiber){  
        if ($fiber->isTerminated()) {  
            $files[$idx] = $fiber->getReturn();  
            unset($fibers[$idx]);  
        } else {  
            $fiber->resume();  
        }  
    }  
}

ffmpeg处理外部视频

下面是未使用Fiber处理6次视频的示例和处理的时间.

function createVideoClip(string $cmd) : string {  
    $stdout = fopen('php://temporary', 'w+');  
    $stderr = fopen('php://temporary', 'w+');  
    $streams = [  
        0 => ['pipe', 'r'],
        1 => $stdout,
        2 => $stderr
	];
    $proc = proc_open($cmd, $streams, $pipes);
    if (!$proc) {  
        throw new \RuntimeException('Unable to launch download process');  
    }  
  
    do {  
        usleep(1000); //Wait 1ms before checking  
        $status = proc_get_status($proc);  
    } while ($status['running']);  
  
    proc_close($proc);  
    fclose($stdout);  
    fclose($stderr);  
    $success = $status['exitcode'] === 0;  
    if ($success){  
        return "finish";  
    } else {  
        throw new \RuntimeException('Unable to perform conversion');  
    }  
}
$cmds = [  
    'ffmpeg -threads 1 -i /Users/meshell/Downloads/weibo.mp4 -t 30 -crf 26 -c:v h264 -c:a ac3 /Users/meshell/Downloads/test1.mp4',  
    'ffmpeg -threads 1 -i /Users/meshell/Downloads/weibo.mp4 -t 30 -crf 26 -c:v h264 -c:a ac3 /Users/meshell/Downloads/test2.mp4',  
    'ffmpeg -threads 1 -i /Users/meshell/Downloads/weibo.mp4 -t 30 -crf 26 -c:v h264 -c:a ac3 /Users/meshell/Downloads/test3.mp4',  
    'ffmpeg -threads 1 -i /Users/meshell/Downloads/weibo.mp4 -t 30 -crf 26 -c:v h264 -c:a ac3 /Users/meshell/Downloads/test4.mp4',  
    'ffmpeg -threads 1 -i /Users/meshell/Downloads/weibo.mp4 -t 30 -crf 26 -c:v h264 -c:a ac3 /Users/meshell/Downloads/test5.mp4',  
    'ffmpeg -threads 1 -i /Users/meshell/Downloads/weibo.mp4 -t 30 -crf 26 -c:v h264 -c:a ac3 /Users/meshell/Downloads/test6.mp4',  
];  
  
$start = microtime(true);  
echo "before time to: ", $start, PHP_EOL;  
  
foreach ($cmds as $cmd) {  
    echo createVideoClip($cmd), PHP_EOL;  
}  
$end =  microtime(true);  
echo "end time to: ", $end - $start, PHP_EOL;
before time to: 1701239868.095
finish
finish
finish
finish
finish
finish
end time to: 59.067454099655

使用fiber之后,能让整成个程序速度提升30%。这是使用fiber后的代码。

function createVideoClip(string $cmd) : string {  
    $stdout = fopen('php://temporary', 'w+');  
    $stderr = fopen('php://temporary', 'w+');  
    $streams = [  
        0 => ['pipe', 'r'],
        1 => $stdout,
        2 => $stderr  
	];  
  
    $proc = proc_open($cmd, $streams, $pipes);  
    if (!$proc){  
        throw new \RuntimeException('Unable to launch download process');  
    }  
  
    do {  
        // Wait 1ms before checking  
 // usleep(1000);  Fiber::suspend();  
        $status = proc_get_status($proc);  
    } while ($status['running']);  
  
    proc_close($proc);  
    fclose($stdout);  
    fclose($stderr);  
    $success = $status['exitcode'] === 0;  
    if ($success){  
        return "finish";  
    } else {  
        throw new \RuntimeException('Unable to perform conversion');  
    }  
}

$cmds = [  
    'ffmpeg -threads 1 -i /Users/meshell/Downloads/weibo.mp4 -t 30 -crf 26 -c:v h264 -c:a ac3 /Users/meshell/Downloads/test1.mp4',  
    'ffmpeg -threads 1 -i /Users/meshell/Downloads/weibo.mp4 -t 30 -crf 26 -c:v h264 -c:a ac3 /Users/meshell/Downloads/test2.mp4',  
    'ffmpeg -threads 1 -i /Users/meshell/Downloads/weibo.mp4 -t 30 -crf 26 -c:v h264 -c:a ac3 /Users/meshell/Downloads/test3.mp4',  
    'ffmpeg -threads 1 -i /Users/meshell/Downloads/weibo.mp4 -t 30 -crf 26 -c:v h264 -c:a ac3 /Users/meshell/Downloads/test4.mp4',  
    'ffmpeg -threads 1 -i /Users/meshell/Downloads/weibo.mp4 -t 30 -crf 26 -c:v h264 -c:a ac3 /Users/meshell/Downloads/test5.mp4',  
    'ffmpeg -threads 1 -i /Users/meshell/Downloads/weibo.mp4 -t 30 -crf 26 -c:v h264 -c:a ac3 /Users/meshell/Downloads/test6.mp4',  
];  
  
$start = microtime(true);  
echo "before time to: ", $start, PHP_EOL;  
  
$fibers = [];  
foreach ($cmds as $cmd) {  
    $fiber = new Fiber(createVideoClip(...));  
    $fiber->start($cmd);  
    $fibers[] = $fiber;  
}  
  
while ($fibers){  
    foreach ($fibers as $idx => $fiber){  
        if ($fiber->isTerminated()) {  
            echo $fiber->getReturn(), PHP_EOL;  
            unset($fibers[$idx]);  
        } else {  
            $fiber->resume();  
        }  
    }  
}  
  
$end =  microtime(true);  
echo "end time to: ", $end - $start, PHP_EOL;

打印的结果.

before time to: 1701240690.3413
finish
finish
finish
finish
finish
finish
end time to: 41.346300125122

终结

相信你看完之后对phpfiber有个大概的了解和基本用法的使用以及它的使用场景。你可以从许多项目学习更深层的使用技巧。推荐你学习下Amp异步框架,你还可以学习事件循环框架revolt

相关程序解答推荐

留言