feat(platform-orders): queue batch activate subscriptions job

This commit is contained in:
萝卜
2026-03-17 12:40:21 +08:00
parent 5158703a3e
commit e9ec968379
4 changed files with 206 additions and 91 deletions

View File

@@ -1592,87 +1592,21 @@ class PlatformOrderController extends Controller
}
$filterSummary = implode('&', $filterSummaryParts);
foreach ($orders as $orderRow) {
try {
$order = PlatformOrder::query()->find($orderRow->id);
if (! $order) {
continue;
}
// 队列化M3 可运维化第一步):先把匹配到的订单 ID 列表投递为一个 Job避免请求超时。
// 注意:当前阶段仍由 Job 内逐条写 meta 与错误原因;后续可再升级为分片 Job + 结果聚合。
$orderIds = $orders->pluck('id')->map(fn ($id) => (int) $id)->values()->all();
// 治理优先:续费单必须绑定订阅(兼容历史脏数据/手工改库等场景)
if ((string) ($order->order_type ?? '') === 'renewal' && ! (int) ($order->site_subscription_id ?? 0)) {
throw new \InvalidArgumentException('续费单未绑定订阅site_subscription_id 为空),不允许批量同步订阅。');
}
\App\Jobs\BatchActivateSubscriptionsJob::dispatch(
$orderIds,
(int) $admin->id,
$scope,
(string) $filterSummary,
(int) $limit,
(int) $matchedTotal,
(int) $processed,
);
$subscription = $service->activateOrder($orderRow->id, $admin->id);
// 注意activateOrder 过程中会写入 order 的 meta/site_subscription_id 等;此处必须 refresh避免后续写审计时覆盖掉同步结果
$order->refresh();
// 轻量审计:记录批量同步动作(方便追溯)
if ($order) {
$meta = (array) ($order->meta ?? []);
$audit = (array) (data_get($meta, 'audit', []) ?? []);
$nowStr = now()->toDateTimeString();
$audit[] = [
'action' => 'batch_activate_subscription',
'scope' => $scope,
'at' => $nowStr,
'admin_id' => $admin->id,
'subscription_id' => $subscription->id,
'filters' => $filterSummary,
'note' => '批量同步订阅limit=' . $limit . ', matched=' . $matchedTotal . ', processed=' . $processed . ')',
];
data_set($meta, 'audit', $audit);
// 便于筛选/统计:记录最近一次批量同步信息(扁平字段)
data_set($meta, 'batch_activation', [
'at' => $nowStr,
'admin_id' => $admin->id,
'scope' => $scope,
]);
$order->meta = $meta;
$order->save();
}
$success++;
} catch (\Throwable $e) {
$failed++;
$reason = trim((string) $e->getMessage());
$reason = $reason !== '' ? $reason : '未知错误';
$failedReasonCounts[$reason] = ($failedReasonCounts[$reason] ?? 0) + 1;
// 批量同步失败也需要可治理:写入失败原因到订单 meta便于后续筛选/导出/清理
$order = PlatformOrder::query()->find($orderRow->id);
if ($order) {
$meta = (array) ($order->meta ?? []);
data_set($meta, 'subscription_activation_error', [
'message' => $reason,
'at' => now()->toDateTimeString(),
'admin_id' => $admin->id,
]);
$order->meta = $meta;
$order->save();
}
}
}
$msg = '批量同步订阅完成:成功 ' . $success . ' 条,失败 ' . $failed . ' 条(命中 ' . $matchedTotal . ' 条,本次处理 ' . $processed . ' 条limit=' . $limit . '';
if ($failed > 0 && count($failedReasonCounts) > 0) {
arsort($failedReasonCounts);
$top = array_slice($failedReasonCounts, 0, 3, true);
$topText = collect($top)->map(function ($cnt, $reason) {
$reason = mb_substr((string) $reason, 0, 60);
return $reason . '' . $cnt . '';
})->implode('');
$msg .= '失败原因Top' . $topText;
}
return redirect()->back()->with('success', $msg);
return redirect()->back()->with('success', '批量同步订阅任务已提交到队列:命中 ' . $matchedTotal . ' 条,本次处理 ' . $processed . ' 条limit=' . $limit . ')。');
}
public function batchMarkPaidAndActivate(Request $request, SubscriptionActivationService $service): RedirectResponse

View File

@@ -0,0 +1,110 @@
<?php
namespace App\Jobs;
use App\Models\PlatformOrder;
use App\Support\SubscriptionActivationService;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class BatchActivateSubscriptionsJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/** @var int[] */
public array $orderIds;
public int $adminId;
public string $scope;
public string $filterSummary;
public int $limit;
public int $matchedTotal;
public int $processed;
/**
* @param int[] $orderIds
*/
public function __construct(
array $orderIds,
int $adminId,
string $scope,
string $filterSummary,
int $limit,
int $matchedTotal,
int $processed,
) {
$this->orderIds = array_values(array_map('intval', $orderIds));
$this->adminId = $adminId;
$this->scope = $scope;
$this->filterSummary = $filterSummary;
$this->limit = $limit;
$this->matchedTotal = $matchedTotal;
$this->processed = $processed;
}
public function handle(SubscriptionActivationService $service): void
{
foreach ($this->orderIds as $orderId) {
/** @var PlatformOrder|null $order */
$order = PlatformOrder::query()->find($orderId);
if (! $order) {
continue;
}
try {
// 双保险:续费单必须绑定订阅
if ((string) ($order->order_type ?? '') === 'renewal' && ! (int) ($order->site_subscription_id ?? 0)) {
throw new \InvalidArgumentException('续费单未绑定订阅site_subscription_id 为空),不允许批量同步订阅。');
}
$subscription = $service->activateOrder($orderId, $this->adminId);
// 注意activateOrder 内会更新 meta这里 refresh 避免覆盖
$order->refresh();
$meta = (array) ($order->meta ?? []);
$audit = (array) (data_get($meta, 'audit', []) ?? []);
$nowStr = now()->toDateTimeString();
$audit[] = [
'action' => 'batch_activate_subscription',
'scope' => $this->scope,
'at' => $nowStr,
'admin_id' => $this->adminId,
'subscription_id' => $subscription->id,
'filters' => $this->filterSummary,
'note' => '批量同步订阅queue, limit=' . $this->limit . ', matched=' . $this->matchedTotal . ', processed=' . $this->processed . ')',
];
data_set($meta, 'audit', $audit);
// 便于筛选/统计:记录最近一次批量同步信息(扁平字段)
data_set($meta, 'batch_activation', [
'at' => $nowStr,
'admin_id' => $this->adminId,
'scope' => $this->scope,
'mode' => 'queue',
]);
$order->meta = $meta;
$order->save();
} catch (\Throwable $e) {
$meta = (array) ($order->meta ?? []);
data_set($meta, 'subscription_activation_error', [
'message' => trim((string) $e->getMessage()) !== '' ? trim((string) $e->getMessage()) : '未知错误',
'at' => now()->toDateTimeString(),
'admin_id' => $this->adminId,
]);
$order->meta = $meta;
$order->save();
}
}
}
}

View File

@@ -0,0 +1,80 @@
<?php
namespace Tests\Feature;
use App\Jobs\BatchActivateSubscriptionsJob;
use App\Models\Merchant;
use App\Models\Plan;
use App\Models\PlatformOrder;
use Illuminate\Foundation\Testing\RefreshDatabase;
use Illuminate\Support\Facades\Queue;
use Tests\TestCase;
class AdminPlatformOrderBatchActivateSubscriptionsShouldDispatchJobTest extends TestCase
{
use RefreshDatabase;
protected function loginAsPlatformAdmin(): void
{
$this->seed();
$this->post('/admin/login', [
'email' => 'platform.admin@demo.local',
'password' => 'Platform@123456',
])->assertRedirect('/admin');
}
public function test_batch_activate_subscriptions_should_dispatch_job_and_not_run_inline(): void
{
$this->loginAsPlatformAdmin();
Queue::fake();
$merchant = Merchant::query()->firstOrFail();
$plan = Plan::query()->create([
'code' => 'batch_activate_dispatch_job_plan',
'name' => '批量同步订阅投递队列测试套餐',
'billing_cycle' => 'monthly',
'price' => 10,
'list_price' => 10,
'status' => 'active',
'sort' => 10,
'published_at' => now(),
]);
$syncable = PlatformOrder::query()->create([
'merchant_id' => $merchant->id,
'plan_id' => $plan->id,
'order_no' => 'PO_BATCH_DISPATCH_0001',
'order_type' => 'new_purchase',
'status' => 'activated',
'payment_status' => 'paid',
'plan_name' => $plan->name,
'billing_cycle' => $plan->billing_cycle,
'period_months' => 1,
'quantity' => 1,
'payable_amount' => 10,
'paid_amount' => 10,
'placed_at' => now()->subMinutes(10),
'paid_at' => now()->subMinutes(9),
'activated_at' => now()->subMinutes(8),
]);
$this->post('/admin/platform-orders/batch-activate-subscriptions', [
'scope' => 'filtered',
'syncable_only' => '1',
'limit' => 50,
])->assertRedirect();
Queue::assertPushed(BatchActivateSubscriptionsJob::class, function (BatchActivateSubscriptionsJob $job) use ($syncable) {
return in_array($syncable->id, $job->orderIds, true)
&& $job->scope === 'filtered'
&& $job->limit === 50;
});
// 不应在请求内就完成同步(因为我们已队列化);因此 site_subscription_id 仍应为空
$syncable->refresh();
$this->assertNull($syncable->site_subscription_id);
$this->assertEmpty(data_get($syncable->meta, 'subscription_activation'));
}
}

View File

@@ -168,19 +168,10 @@ class AdminPlatformOrderBatchActivateSubscriptionsTest extends TestCase
$res->assertRedirect()->assertSessionHas('success');
$ok->refresh();
$bad->refresh();
$this->assertNotNull($ok->site_subscription_id);
$this->assertNotEmpty(data_get($ok->meta, 'subscription_activation.subscription_id'));
$this->assertNotEmpty(data_get($bad->meta, 'subscription_activation_error.message'));
$this->assertNotEmpty(data_get($bad->meta, 'subscription_activation_error.at'));
// 批量结果摘要应包含失败原因Top
// 队列化后:请求内不会立即跑完,不应强行断言订单已同步/失败原因已落库。
// 这里仅锁定“已提交到队列”的口径;具体成功/失败应由队列 worker 执行。
$msg = (string) $res->getSession()->get('success');
$this->assertStringContainsString('失败原因Top', $msg);
$this->assertStringContainsString('模拟失败:订阅同步异常', $msg);
$this->assertStringContainsString('任务已提交到队列', $msg);
}
public function test_platform_admin_batch_activate_respects_limit(): void