Queue Driver 与 Job 基础
队列把耗时操作从 HTTP 请求中分离出去——用户提交表单后立即收到响应,后台 Worker 异步处理发邮件、生成报告、调用第三方 API 等任务。这是从"Demo 应用"到"生产级应用"的关键跃迁。
为什么需要队列
没有队列(同步):
用户提交注册 → 发验证邮件(3秒)→ 同步第三方服务(2秒)→ 5秒后响应 ❌
有队列(异步):
用户提交注册 → 把任务推入队列 → 立即响应(<100ms)✅
后台 Worker → 发验证邮件 → 同步第三方服务(异步执行)
Queue Driver 对比
| Driver | 特点 | 适用场景 |
|---|---|---|
sync | 同步执行(不异步) | 测试、本地开发 |
database | 任务存到数据库 | 简单场景,无需 Redis |
redis | Redis 列表实现 | 推荐,高性能 |
sqs | AWS SQS | 云原生,托管可靠 |
beanstalkd | Beanstalkd | 传统方案 |
# .env
QUEUE_CONNECTION=redis # 生产环境推荐
QUEUE_CONNECTION=sync # 测试环境(立即同步执行)
QUEUE_CONNECTION=database # 没有 Redis 时的替代方案
# database driver 需要创建 jobs 表:
# php artisan queue:table
# php artisan migrate
创建第一个 Job
php artisan make:job SendWelcomeEmail
php artisan make:job ProcessTaskExport
<?php
// app/Jobs/SendWelcomeEmail.php
namespace App\Jobs;
use App\Models\User;
use App\Mail\WelcomeMail;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Mail;
class SendWelcomeEmail implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
// 失败前最大尝试次数
public int $tries = 3;
// 任务超时时间(秒)
public int $timeout = 60;
// 重试前等待时间(秒)
public int $backoff = 30;
// 构造函数中注入数据(模型会被自动序列化/反序列化)
public function __construct(
private User $user
) {}
public function handle(): void
{
Mail::to($this->user->email)
->send(new WelcomeMail($this->user));
}
// 任务失败时调用
public function failed(\Throwable $exception): void
{
logger()->error("WelcomeEmail failed for user {$this->user->id}", [
'error' => $exception->getMessage(),
]);
// 可以在这里发告警、退款等补偿操作
}
}
分发 Job
// 立即推入队列
SendWelcomeEmail::dispatch($user);
// 延迟执行(5 分钟后)
SendWelcomeEmail::dispatch($user)->delay(now()->addMinutes(5));
// 指定队列名称(优先级队列)
SendWelcomeEmail::dispatch($user)->onQueue('emails');
ProcessVideoTranscode::dispatch($video)->onQueue('heavy');
// 条件分发
SendWelcomeEmail::dispatchIf($user->wantsEmail, $user);
// 同步执行(不进入队列,用于调试)
SendWelcomeEmail::dispatchSync($user);
// 在注册控制器中使用
class RegisterController extends Controller
{
public function __invoke(StoreUserRequest $request)
{
$user = User::create($request->validated());
$user->assignRole('member');
// 推入队列,立即返回响应
SendWelcomeEmail::dispatch($user);
SyncUserToExternalCRM::dispatch($user)->delay(now()->addSeconds(10));
return response()->json(['user' => new UserResource($user)], 201);
}
}
启动 Queue Worker
# 启动 Worker(开发环境)
php artisan queue:work
# 指定队列(多队列优先级)
php artisan queue:work --queue=emails,default,heavy
# Worker 优先处理 emails 队列,再处理 default,最后处理 heavy
# 指定 Driver
php artisan queue:work redis
# 监听模式(代码变更后自动重启,开发用)
php artisan queue:listen
# 处理单个任务后退出(Supervisor 监控时适合)
php artisan queue:work --once
# 查看失败任务
php artisan queue:failed
# 重试失败任务
php artisan queue:retry all
php artisan queue:retry 5 # 重试 ID=5 的任务
# 清空队列
php artisan queue:clear
php artisan queue:clear --queue=emails
生产环境:Supervisor 管理 Worker
; /etc/supervisor/conf.d/taskflow-worker.conf
[program:taskflow-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /var/www/taskflow/artisan queue:work redis --queue=emails,default --sleep=3 --tries=3 --max-time=3600
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
user=www-data
numprocs=4 ; 启动 4 个 Worker 进程
redirect_stderr=true
stdout_logfile=/var/www/taskflow/storage/logs/worker.log
stopwaitsecs=3600
# 启动 Supervisor
sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start taskflow-worker:*
# 代码部署后优雅重启 Worker
php artisan queue:restart
# Worker 处理完当前任务后自动退出,Supervisor 自动重启新进程
下一节:Job 链、批处理与失败处理——单个 Job 解决了基本异步问题,Job 链和 Batch 解决更复杂的场景:多个步骤顺序执行、并发执行多任务后汇总结果。