Job 链、批处理与失败处理
High Contrast
Dark Mode
Light Mode
Sepia
Forest
1 min read182 words

Job 链、批处理与失败处理

单个 Job 解决了基本异步问题。Job 链(Chain)让多步骤按顺序执行,Job 批处理(Batch)让多个 Job 并发执行并追踪整体进度。失败处理策略则决定了你的系统有多健壮。


Job 链(顺序执行)

use Illuminate\Support\Facades\Bus;
// 顺序执行:前一个完成后才执行下一个
// 如果其中一个失败,后续的 Job 不会执行
Bus::chain([
new ValidatePayment($order),
new ChargeCustomer($order),
new SendOrderConfirmation($order),
new NotifyWarehouse($order),
])->dispatch();
// 链中的 Job 失败时的处理
Bus::chain([
new ImportData($file),
new ProcessData(),
new GenerateReport(),
])->catch(function (Throwable $e) {
// 链中任何 Job 失败时执行
logger()->error('Import chain failed: ' . $e->getMessage());
Notification::route('slack', config('services.slack.webhook'))
->notify(new ImportFailedAlert($e));
})->dispatch();
// 也可以用静态方法:
ValidatePayment::withChain([
new ChargeCustomer($order),
new SendOrderConfirmation($order),
])->dispatch($order);

Job 批处理(并发 + 进度追踪)

use Illuminate\Support\Facades\Bus;
use Illuminate\Bus\Batch;
// 并发执行多个 Job,追踪整体进度
// 需要创建 job_batches 表:
// php artisan queue:batches-table
// php artisan migrate
// 批处理示例:批量导入用户
$batch = Bus::batch(
collect($csvRows)->map(fn($row) => new ImportUserRow($row))
)->then(function (Batch $batch) {
// 所有 Job 成功后执行
logger()->info("Imported {$batch->totalJobs} users successfully");
event(new ImportCompleted($batch->id));
})->catch(function (Batch $batch, Throwable $e) {
// 任意 Job 失败时执行
logger()->error("Import batch failed: {$e->getMessage()}");
})->finally(function (Batch $batch) {
// 无论成功失败都执行(清理临时文件等)
Storage::delete($this->tempFile);
})->allowFailures()   // 允许部分失败(不取消整个批次)
->onQueue('imports')
->dispatch();
// 获取批次 ID(存储后可以查询进度)
$batchId = $batch->id;
// 查询批次进度(轮询接口)
// GET /api/imports/{batchId}/progress
public function progress(string $batchId)
{
$batch = Bus::findBatch($batchId);
if (!$batch) abort(404);
return response()->json([
'total'      => $batch->totalJobs,
'processed'  => $batch->processedJobs(),
'failed'     => $batch->failedJobs,
'pending'    => $batch->pendingJobs,
'progress'   => $batch->progress(),  // 0-100
'finished'   => $batch->finished(),
'cancelled'  => $batch->cancelled(),
]);
}

失败处理策略

失败任务的存储与重试

# 创建 failed_jobs 表
php artisan queue:failed-table
php artisan migrate
# 查看失败的任务
php artisan queue:failed
# +-----------+-----------------------------------+---------------------+
# | ID        | Connection  Queue  Class          | Failed At           |
# +-----------+-----------------------------------+---------------------+
# | 1         | redis       emails SendWelcome... | 2024-03-22 10:30:00 |
# 重试单个失败任务
php artisan queue:retry 1
# 重试所有失败任务
php artisan queue:retry all
# 删除失败任务
php artisan queue:forget 1
php artisan queue:flush   # 清空所有失败任务

在 Job 中处理失败

class ProcessPayment implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public int $tries = 3;          // 最多重试 3 次
// 指数退避重试(第1次重试等30秒,第2次等60秒,第3次等120秒)
public function backoff(): array
{
return [30, 60, 120];
}
// 唯一 Job(防止重复处理同一订单)
public function uniqueId(): string
{
return 'payment_' . $this->order->id;
}
public function handle(): void
{
// 检查是否已处理(幂等性检查)
if ($this->order->payment_status === 'paid') {
return;  // 已支付,跳过
}
try {
$result = $this->paymentGateway->charge($this->order);
$this->order->update(['payment_status' => 'paid']);
} catch (\App\Exceptions\PaymentDeclinedException $e) {
// 不重试的错误(如卡被拒绝)
$this->fail($e);  // 立即标记为失败,不再重试
} catch (\App\Exceptions\NetworkException $e) {
// 网络错误:重试
throw $e;
}
}
public function failed(\Throwable $exception): void
{
// 最终失败(重试耗尽后调用)
$this->order->update(['payment_status' => 'failed']);
$this->order->user->notify(new PaymentFailedNotification($this->order));
logger()->critical("Payment permanently failed for order {$this->order->id}");
}
}

定时任务(Schedule + Queue)

// routes/console.php(Laravel 11,原来在 app/Console/Kernel.php)
use Illuminate\Support\Facades\Schedule;
Schedule::job(new SendDailyDigest())->dailyAt('08:00');
Schedule::job(new CleanupExpiredTokens())->daily();
Schedule::job(new GenerateWeeklyReport())->weekly()->mondays()->at('09:00');
// 非 Job 的定时任务
Schedule::command('db:prune --model=PasswordResetToken')->daily();
Schedule::call(fn() => cache()->forget('leaderboard'))->hourly();
// Cron 格式
Schedule::job(new SyncExternalData())->cron('*/15 * * * *');  // 每15分钟
// 防止任务重叠(长任务)
Schedule::job(new LongRunningJob())->daily()->withoutOverlapping();
# 启动调度器(服务器 crontab 只需一行)
# crontab -e:
* * * * * cd /var/www/taskflow && php artisan schedule:run >> /dev/null 2>&1
# 本地测试调度器
php artisan schedule:work   # 持续运行,每分钟触发一次

下一节Laravel Horizon 监控仪表盘——Job 失败了你要第一时间知道。Horizon 提供实时的队列监控、失败任务告警和处理速率统计。