Files
SkinbaseNova/app/Jobs/IngestUserDiscoveryEventJob.php
2026-03-28 19:15:39 +01:00

133 lines
4.6 KiB
PHP

<?php
declare(strict_types=1);
namespace App\Jobs;
use App\Services\Recommendations\UserInterestProfileService;
use App\Services\Recommendations\SessionRecoService;
use Carbon\CarbonImmutable;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Log;
use Illuminate\Support\Facades\Redis;
use Illuminate\Support\Facades\Schema;
final class IngestUserDiscoveryEventJob implements ShouldQueue
{
use Dispatchable;
use InteractsWithQueue;
use Queueable;
use SerializesModels;
public int $tries = 3;
/** @var array<int, int> */
public array $backoff = [5, 30, 120];
/**
* @param array<string, mixed> $meta
*/
public function __construct(
public readonly string $eventId,
public readonly int $userId,
public readonly int $artworkId,
public readonly string $eventType,
public readonly string $algoVersion,
public readonly string $occurredAt,
public readonly array $meta = []
) {
}
public function handle(UserInterestProfileService $profileService, SessionRecoService $sessionRecoService): void
{
$idempotencyKey = sprintf('discovery:event:processed:%s', $this->eventId);
try {
$didSet = false;
try {
$didSet = (bool) Redis::setnx($idempotencyKey, 1);
if ($didSet) {
Redis::expire($idempotencyKey, 86400 * 2);
}
} catch (\Throwable $e) {
Log::warning('Redis unavailable for discovery ingestion; proceeding without redis dedupe', [
'event_id' => $this->eventId,
'error' => $e->getMessage(),
]);
$didSet = true;
}
if (! $didSet) {
return;
}
$occurredAt = CarbonImmutable::parse($this->occurredAt);
$eventVersion = (string) config('discovery.event_version', 'event-v1');
$eventWeight = (float) ((array) config('discovery.weights', []))[$this->eventType] ?? 1.0;
$categoryId = DB::table('artwork_category')
->where('artwork_id', $this->artworkId)
->orderBy('category_id')
->value('category_id');
$insertPayload = [
'event_id' => $this->eventId,
'user_id' => $this->userId,
'artwork_id' => $this->artworkId,
'category_id' => $categoryId !== null ? (int) $categoryId : null,
'event_type' => $this->eventType,
'event_version' => $eventVersion,
'algo_version' => $this->algoVersion,
'weight' => $eventWeight,
'event_date' => $occurredAt->toDateString(),
'occurred_at' => $occurredAt->toDateTimeString(),
'created_at' => now(),
'updated_at' => now(),
];
if (Schema::hasColumn('user_discovery_events', 'meta')) {
$insertPayload['meta'] = $this->meta;
} elseif (Schema::hasColumn('user_discovery_events', 'metadata')) {
$insertPayload['metadata'] = json_encode($this->meta, JSON_UNESCAPED_SLASHES);
}
DB::table('user_discovery_events')->insertOrIgnore($insertPayload);
$profileService->applyEvent(
userId: $this->userId,
eventType: $this->eventType,
artworkId: $this->artworkId,
categoryId: $categoryId !== null ? (int) $categoryId : null,
occurredAt: $occurredAt,
eventId: $this->eventId,
algoVersion: $this->algoVersion,
eventMeta: $this->meta
);
$sessionRecoService->applyEvent(
userId: $this->userId,
eventType: $this->eventType,
artworkId: $this->artworkId,
categoryId: $categoryId !== null ? (int) $categoryId : null,
occurredAt: $occurredAt->toIso8601String(),
meta: $this->meta,
);
} catch (\Throwable $e) {
Log::error('IngestUserDiscoveryEventJob failed', [
'event_id' => $this->eventId,
'user_id' => $this->userId,
'artwork_id' => $this->artworkId,
'event_type' => $this->eventType,
'error' => $e->getMessage(),
]);
throw $e;
}
}
}