feat(bmpa): queue batch mark paid and activate via job

This commit is contained in:
萝卜
2026-03-17 14:55:48 +08:00
parent c692e099aa
commit 95a52d3f49
3 changed files with 363 additions and 207 deletions

View File

@@ -1705,10 +1705,6 @@ class PlatformOrderController extends Controller
// 批次号:用于把一次批量执行关联起来,便于后续追溯/筛选/可观测。
$runId = 'BMPA' . now()->format('YmdHis') . str_pad((string) random_int(1, 9999), 4, '0', STR_PAD_LEFT);
$success = 0;
$failed = 0;
$failedReasonCounts = [];
// 筛选摘要:用于审计记录(便于追溯本次批量处理口径)
$filterSummaryParts = [];
foreach ($filters as $k => $v) {
@@ -1718,211 +1714,21 @@ class PlatformOrderController extends Controller
}
$filterSummary = implode('&', $filterSummaryParts);
$now = now();
$nowStr = $now->toDateTimeString();
// 队列化M3 可运维化第二步):把批量 BMPA 投递为一个 Job避免请求超时并对齐 BAS 的批次可观测模式。
$orderIds = $orders->pluck('id')->map(fn ($id) => (int) $id)->values()->all();
foreach ($orders as $row) {
$order = PlatformOrder::query()->find($row->id);
if (! $order) {
continue;
}
\App\Jobs\BatchMarkPaidAndActivateJob::dispatch(
$orderIds,
(int) $admin->id,
$scope,
(string) $filterSummary,
(int) $limit,
(int) $matchedTotal,
(int) $processed,
(string) $runId,
);
// 再次防御:仅推进 pending+unpaid
if ($order->status !== 'pending' || $order->payment_status !== 'unpaid') {
continue;
}
try {
// 治理优先:续费单必须绑定订阅(兼容历史脏数据/手工改库等场景)
if ((string) ($order->order_type ?? '') === 'renewal' && ! (int) ($order->site_subscription_id ?? 0)) {
throw new \InvalidArgumentException('续费单未绑定订阅site_subscription_id 为空),不允许批量标记支付并生效。');
}
// 治理优先:若该订单已有退款轨迹,则不允许推进
if ((float) $order->refundTotal() > 0) {
throw new \InvalidArgumentException('订单存在退款轨迹,不允许批量标记支付并生效,请先完成退款治理。');
}
// 治理优先:若该订单已有回执证据,但回执总额与应付金额不一致,则不允许推进
$receiptTotal = (float) $order->receiptTotal();
$hasReceiptEvidence = (data_get($order->meta, 'payment_summary.total_amount') !== null)
|| (data_get($order->meta, 'payment_receipts.0.amount') !== null);
if ($hasReceiptEvidence) {
$expectedPaid = (float) ($order->payable_amount ?? 0);
$receiptCents = (int) round($receiptTotal * 100);
$expectedCents = (int) round($expectedPaid * 100);
$tol = (float) config('saasshop.amounts.tolerance', 0.01);
$tolCents = (int) round($tol * 100);
$tolCents = max(1, $tolCents);
if (abs($receiptCents - $expectedCents) >= $tolCents) {
throw new \InvalidArgumentException('订单回执总额与应付金额不一致,不允许批量推进,请先修正回执/金额后再处理。');
}
}
// 最小状态推进:标记为已支付 + 已生效,并补齐时间与金额字段
$order->payment_status = 'paid';
$order->status = 'activated';
$order->paid_at = $order->paid_at ?: $now;
$order->activated_at = $order->activated_at ?: $now;
$order->paid_amount = (float) (($order->payable_amount ?? 0) > 0 ? $order->payable_amount : ($order->paid_amount ?? 0));
// 若尚无回执数组,则补一条(可治理留痕,便于后续对账)
$meta = (array) ($order->meta ?? []);
$receipts = (array) (data_get($meta, 'payment_receipts', []) ?? []);
if (count($receipts) === 0) {
$receipts[] = [
'type' => 'batch_mark_paid_and_activate',
'channel' => (string) ($order->payment_channel ?? ''),
'amount' => (float) ($order->paid_amount ?? 0),
'paid_at' => $order->paid_at ? $order->paid_at->format('Y-m-d H:i:s') : $nowStr,
'note' => '由【批量标记支付并生效】自动补记(可治理)',
'created_at' => $nowStr,
'admin_id' => $admin->id,
];
data_set($meta, 'payment_receipts', $receipts);
$totalPaid = 0.0;
foreach ($receipts as $r) {
$totalPaid += (float) (data_get($r, 'amount') ?? 0);
}
$latest = count($receipts) > 0 ? end($receipts) : null;
data_set($meta, 'payment_summary', [
'count' => count($receipts),
'total_amount' => $totalPaid,
'last_at' => (string) (data_get($latest, 'paid_at') ?? ''),
'last_amount' => (float) (data_get($latest, 'amount') ?? 0),
'last_channel' => (string) (data_get($latest, 'channel') ?? ''),
]);
}
// 清理历史错误
data_forget($meta, 'subscription_activation_error');
data_forget($meta, 'batch_mark_paid_and_activate_error');
$order->meta = $meta;
$order->save();
// 同步订阅
$subscription = $service->activateOrder($order->id, $admin->id);
// 审计:记录批量推进(包含订阅同步)
$order->refresh();
$meta2 = (array) ($order->meta ?? []);
$audit = (array) (data_get($meta2, 'audit', []) ?? []);
$audit[] = [
'action' => 'batch_mark_paid_and_activate',
'scope' => $scope,
'at' => $nowStr,
'admin_id' => $admin->id,
'subscription_id' => $subscription->id,
'filters' => $filterSummary,
'note' => '批量标记支付并生效含订阅同步limit=' . $limit . ', matched=' . $matchedTotal . ', processed=' . $processed . ')',
];
data_set($meta2, 'audit', $audit);
// 便于追踪:记录最近一次批量推进信息(扁平字段)
data_set($meta2, 'batch_mark_paid_and_activate', [
'at' => $nowStr,
'admin_id' => $admin->id,
'scope' => $scope,
'mode' => 'sync',
'run_id' => $runId,
]);
$order->meta = $meta2;
$order->save();
$success++;
} catch (\Throwable $e) {
$failed++;
$reason = trim((string) $e->getMessage());
$reason = $reason !== '' ? $reason : '未知错误';
$meta = (array) ($order->meta ?? []);
$failedReasonCounts[$reason] = ($failedReasonCounts[$reason] ?? 0) + 1;
data_set($meta, 'batch_mark_paid_and_activate_error', [
'message' => $reason,
'at' => $nowStr,
'admin_id' => $admin->id,
'scope' => $scope,
'run_id' => $runId,
'filters' => $filterSummary,
]);
// 即使失败也写入 batch_mark_paid_and_activate包含 run_id确保本批次可追溯。
data_set($meta, 'batch_mark_paid_and_activate', [
'at' => $nowStr,
'admin_id' => $admin->id,
'scope' => $scope,
'mode' => 'sync',
'run_id' => $runId,
]);
$order->meta = $meta;
$order->save();
}
}
// 写入最小结果汇总(冗余写入到每条订单的 meta.batch_mark_paid_and_activate.last_result便于运营在列表页直接看到“本次批量 BMPA 的执行结果”。
// 注意:当前阶段不引入新表;后续若 BMPA Job 化,可复用 BAS 的 last_result 模式。
$topReasons = [];
if ($failed > 0 && count($failedReasonCounts) > 0) {
arsort($failedReasonCounts);
$top = array_slice($failedReasonCounts, 0, 3, true);
foreach ($top as $reason => $cnt) {
$topReasons[] = [
'reason' => mb_substr((string) $reason, 0, 80),
'count' => (int) $cnt,
];
}
}
$summary = [
'run_id' => $runId,
'success' => $success,
'failed' => $failed,
'matched' => (int) $matchedTotal,
'processed' => (int) $processed,
'top_reasons' => $topReasons,
'at' => now()->toDateTimeString(),
];
foreach ($orders as $row) {
$order = PlatformOrder::query()->find($row->id);
if (! $order) {
continue;
}
$meta = (array) ($order->meta ?? []);
$bmpa = (array) (data_get($meta, 'batch_mark_paid_and_activate', []) ?? []);
if ((string) (data_get($bmpa, 'run_id') ?? '') !== $runId) {
continue;
}
data_set($bmpa, 'last_result', $summary);
data_set($meta, 'batch_mark_paid_and_activate', $bmpa);
$order->meta = $meta;
$order->save();
}
$msg = '批量标记支付并生效完成:成功 ' . $success . ' 条,失败 ' . $failed . ' 条(命中 ' . $matchedTotal . ' 条,本次处理 ' . $processed . ' 条limit=' . $limit . 'run_id=' . $runId . '';
if ($failed > 0 && count($failedReasonCounts) > 0) {
$topText = collect($failedReasonCounts)->sortDesc()->take(3)->map(function ($cnt, $reason) {
$reason = mb_substr((string) $reason, 0, 60);
return $reason . '' . (int) $cnt . '';
})->implode('');
$msg .= '失败原因Top' . $topText;
}
return redirect()->back()->with('success', $msg);
return redirect()->back()->with('success', '批量 BMPA 任务已提交到队列:命中 ' . $matchedTotal . ' 条,本次处理 ' . $processed . ' 条limit=' . $limit . 'run_id=' . $runId . ')。');
}
public function batchMarkActivated(Request $request): RedirectResponse

View File

@@ -0,0 +1,270 @@
<?php
namespace App\Jobs;
use App\Models\PlatformOrder;
use App\Support\SubscriptionActivationService;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class BatchMarkPaidAndActivateJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/** @var int[] */
public array $orderIds;
public int $adminId;
public string $scope;
public string $filterSummary;
public int $limit;
public int $matchedTotal;
public int $processed;
public string $runId;
/**
* @param int[] $orderIds
*/
public function __construct(
array $orderIds,
int $adminId,
string $scope,
string $filterSummary,
int $limit,
int $matchedTotal,
int $processed,
string $runId,
) {
$this->orderIds = array_values(array_map('intval', $orderIds));
$this->adminId = $adminId;
$this->scope = $scope;
$this->filterSummary = $filterSummary;
$this->limit = $limit;
$this->matchedTotal = $matchedTotal;
$this->processed = $processed;
$this->runId = $runId;
}
public function handle(SubscriptionActivationService $service): void
{
$success = 0;
$failed = 0;
$failedReasonCounts = [];
$now = now();
$nowStr = $now->toDateTimeString();
foreach ($this->orderIds as $orderId) {
/** @var PlatformOrder|null $order */
$order = PlatformOrder::query()->find($orderId);
if (! $order) {
continue;
}
try {
// 双保险:仅推进 pending+unpaid
if ((string) $order->status !== 'pending' || (string) $order->payment_status !== 'unpaid') {
throw new \InvalidArgumentException('订单不是待处理+未支付,不允许批量 BMPA。');
}
// 治理优先:续费单必须绑定订阅
if ((string) ($order->order_type ?? '') === 'renewal' && ! (int) ($order->site_subscription_id ?? 0)) {
throw new \InvalidArgumentException('续费单未绑定订阅site_subscription_id 为空),不允许批量标记支付并生效。');
}
// 治理优先:若该订单已有退款轨迹,则不允许推进
if ((float) $order->refundTotal() > 0) {
throw new \InvalidArgumentException('订单存在退款轨迹,不允许批量标记支付并生效,请先完成退款治理。');
}
// 治理优先:若该订单已有回执证据,但回执总额与应付金额不一致,则不允许推进
$receiptTotal = (float) $order->receiptTotal();
$hasReceiptEvidence = (data_get($order->meta, 'payment_summary.total_amount') !== null)
|| (data_get($order->meta, 'payment_receipts.0.amount') !== null);
if ($hasReceiptEvidence) {
$expectedPaid = (float) ($order->payable_amount ?? 0);
$receiptCents = (int) round($receiptTotal * 100);
$expectedCents = (int) round($expectedPaid * 100);
$tol = (float) config('saasshop.amounts.tolerance', 0.01);
$tolCents = (int) round($tol * 100);
$tolCents = max(1, $tolCents);
if (abs($receiptCents - $expectedCents) >= $tolCents) {
throw new \InvalidArgumentException('订单回执总额与应付金额不一致,不允许批量推进,请先修正回执/金额后再处理。');
}
}
// 最小状态推进:标记为已支付 + 已生效,并补齐时间与金额字段
$order->payment_status = 'paid';
$order->status = 'activated';
$order->paid_at = $order->paid_at ?: $now;
$order->activated_at = $order->activated_at ?: $now;
$order->paid_amount = (float) (($order->payable_amount ?? 0) > 0 ? $order->payable_amount : ($order->paid_amount ?? 0));
// 若尚无回执数组,则补一条(可治理留痕,便于后续对账)
$meta = (array) ($order->meta ?? []);
$receipts = (array) (data_get($meta, 'payment_receipts', []) ?? []);
if (count($receipts) === 0) {
$receipts[] = [
'type' => 'batch_mark_paid_and_activate',
'channel' => (string) ($order->payment_channel ?? ''),
'amount' => (float) ($order->paid_amount ?? 0),
'paid_at' => $order->paid_at ? $order->paid_at->format('Y-m-d H:i:s') : $nowStr,
'note' => '由【批量标记支付并生效】自动补记(可治理)',
'created_at' => $nowStr,
'admin_id' => $this->adminId,
];
data_set($meta, 'payment_receipts', $receipts);
$totalPaid = 0.0;
foreach ($receipts as $r) {
$totalPaid += (float) (data_get($r, 'amount') ?? 0);
}
$latest = count($receipts) > 0 ? end($receipts) : null;
data_set($meta, 'payment_summary', [
'count' => count($receipts),
'total_amount' => $totalPaid,
'last_at' => (string) (data_get($latest, 'paid_at') ?? ''),
'last_amount' => (float) (data_get($latest, 'amount') ?? 0),
'last_channel' => (string) (data_get($latest, 'channel') ?? ''),
]);
}
// 清理历史错误
data_forget($meta, 'subscription_activation_error');
data_forget($meta, 'batch_mark_paid_and_activate_error');
// 便于追踪:记录最近一次批量推进信息(扁平字段)
data_set($meta, 'batch_mark_paid_and_activate', [
'at' => $nowStr,
'admin_id' => $this->adminId,
'scope' => $this->scope,
'mode' => 'queue',
'run_id' => $this->runId,
]);
$order->meta = $meta;
$order->save();
// 同步订阅
$subscription = $service->activateOrder($order->id, $this->adminId);
// 审计:记录批量推进(包含订阅同步)
$order->refresh();
$meta2 = (array) ($order->meta ?? []);
$audit = (array) (data_get($meta2, 'audit', []) ?? []);
$audit[] = [
'action' => 'batch_mark_paid_and_activate',
'scope' => $this->scope,
'at' => $nowStr,
'admin_id' => $this->adminId,
'subscription_id' => $subscription->id,
'filters' => $this->filterSummary,
'run_id' => $this->runId,
'note' => '批量标记支付并生效queue, run_id=' . $this->runId . ', limit=' . $this->limit . ', matched=' . $this->matchedTotal . ', processed=' . $this->processed . ')',
];
data_set($meta2, 'audit', $audit);
// 再次写入扁平字段activateOrder 内会更新 meta避免被覆盖
$bmpa2 = (array) (data_get($meta2, 'batch_mark_paid_and_activate', []) ?? []);
$bmpa2 = array_merge($bmpa2, [
'at' => $nowStr,
'admin_id' => $this->adminId,
'scope' => $this->scope,
'mode' => 'queue',
'run_id' => $this->runId,
]);
data_set($meta2, 'batch_mark_paid_and_activate', $bmpa2);
$order->meta = $meta2;
$order->save();
$success++;
} catch (\Throwable $e) {
$failed++;
$reason = trim((string) $e->getMessage());
$reason = $reason !== '' ? $reason : '未知错误';
$failedReasonCounts[$reason] = ($failedReasonCounts[$reason] ?? 0) + 1;
$meta = (array) ($order->meta ?? []);
data_set($meta, 'batch_mark_paid_and_activate_error', [
'message' => $reason,
'at' => $nowStr,
'admin_id' => $this->adminId,
'scope' => $this->scope,
'run_id' => $this->runId,
'filters' => $this->filterSummary,
]);
// 即使失败也写入 batch_mark_paid_and_activate包含 run_id确保本批次可追溯。
data_set($meta, 'batch_mark_paid_and_activate', [
'at' => $nowStr,
'admin_id' => $this->adminId,
'scope' => $this->scope,
'mode' => 'queue',
'run_id' => $this->runId,
]);
$order->meta = $meta;
$order->save();
}
}
// 最小结果汇总(写入到每个订单的 meta.batch_mark_paid_and_activate.last_result便于运营在列表页直接看到“本次队列批量 BMPA 的执行结果”。
$topReasons = [];
if ($failed > 0 && count($failedReasonCounts) > 0) {
arsort($failedReasonCounts);
$top = array_slice($failedReasonCounts, 0, 3, true);
foreach ($top as $reason => $cnt) {
$topReasons[] = [
'reason' => mb_substr((string) $reason, 0, 80),
'count' => (int) $cnt,
];
}
}
$summary = [
'run_id' => $this->runId,
'success' => $success,
'failed' => $failed,
'matched' => (int) $this->matchedTotal,
'processed' => (int) $this->processed,
'top_reasons' => $topReasons,
'at' => now()->toDateTimeString(),
];
foreach ($this->orderIds as $orderId) {
$order = PlatformOrder::query()->find($orderId);
if (! $order) {
continue;
}
$meta = (array) ($order->meta ?? []);
$bmpa = (array) (data_get($meta, 'batch_mark_paid_and_activate', []) ?? []);
// 仅当 run_id 与当前 job 一致时才回写 last_result避免并发覆盖。
if ((string) (data_get($bmpa, 'run_id') ?? '') !== $this->runId) {
continue;
}
data_set($bmpa, 'last_result', $summary);
data_set($meta, 'batch_mark_paid_and_activate', $bmpa);
$order->meta = $meta;
$order->save();
}
}
}