123 lines
4.2 KiB
PHP
123 lines
4.2 KiB
PHP
<?php
|
|
|
|
declare(strict_types=1);
|
|
|
|
namespace App\Jobs;
|
|
|
|
use App\Services\Recommendations\UserInterestProfileService;
|
|
use Carbon\CarbonImmutable;
|
|
use Illuminate\Bus\Queueable;
|
|
use Illuminate\Contracts\Queue\ShouldQueue;
|
|
use Illuminate\Foundation\Bus\Dispatchable;
|
|
use Illuminate\Queue\InteractsWithQueue;
|
|
use Illuminate\Queue\SerializesModels;
|
|
use Illuminate\Support\Facades\DB;
|
|
use Illuminate\Support\Facades\Log;
|
|
use Illuminate\Support\Facades\Redis;
|
|
use Illuminate\Support\Facades\Schema;
|
|
|
|
final class IngestUserDiscoveryEventJob implements ShouldQueue
|
|
{
|
|
use Dispatchable;
|
|
use InteractsWithQueue;
|
|
use Queueable;
|
|
use SerializesModels;
|
|
|
|
public int $tries = 3;
|
|
|
|
/** @var array<int, int> */
|
|
public array $backoff = [5, 30, 120];
|
|
|
|
/**
|
|
* @param array<string, mixed> $meta
|
|
*/
|
|
public function __construct(
|
|
public readonly string $eventId,
|
|
public readonly int $userId,
|
|
public readonly int $artworkId,
|
|
public readonly string $eventType,
|
|
public readonly string $algoVersion,
|
|
public readonly string $occurredAt,
|
|
public readonly array $meta = []
|
|
) {
|
|
}
|
|
|
|
public function handle(UserInterestProfileService $profileService): void
|
|
{
|
|
$idempotencyKey = sprintf('discovery:event:processed:%s', $this->eventId);
|
|
|
|
try {
|
|
$didSet = false;
|
|
try {
|
|
$didSet = (bool) Redis::setnx($idempotencyKey, 1);
|
|
if ($didSet) {
|
|
Redis::expire($idempotencyKey, 86400 * 2);
|
|
}
|
|
} catch (\Throwable $e) {
|
|
Log::warning('Redis unavailable for discovery ingestion; proceeding without redis dedupe', [
|
|
'event_id' => $this->eventId,
|
|
'error' => $e->getMessage(),
|
|
]);
|
|
$didSet = true;
|
|
}
|
|
|
|
if (! $didSet) {
|
|
return;
|
|
}
|
|
|
|
$occurredAt = CarbonImmutable::parse($this->occurredAt);
|
|
$eventVersion = (string) config('discovery.event_version', 'event-v1');
|
|
$eventWeight = (float) ((array) config('discovery.weights', []))[$this->eventType] ?? 1.0;
|
|
|
|
$categoryId = DB::table('artwork_category')
|
|
->where('artwork_id', $this->artworkId)
|
|
->orderBy('category_id')
|
|
->value('category_id');
|
|
|
|
$insertPayload = [
|
|
'event_id' => $this->eventId,
|
|
'user_id' => $this->userId,
|
|
'artwork_id' => $this->artworkId,
|
|
'category_id' => $categoryId !== null ? (int) $categoryId : null,
|
|
'event_type' => $this->eventType,
|
|
'event_version' => $eventVersion,
|
|
'algo_version' => $this->algoVersion,
|
|
'weight' => $eventWeight,
|
|
'event_date' => $occurredAt->toDateString(),
|
|
'occurred_at' => $occurredAt->toDateTimeString(),
|
|
'created_at' => now(),
|
|
'updated_at' => now(),
|
|
];
|
|
|
|
if (Schema::hasColumn('user_discovery_events', 'meta')) {
|
|
$insertPayload['meta'] = $this->meta;
|
|
} elseif (Schema::hasColumn('user_discovery_events', 'metadata')) {
|
|
$insertPayload['metadata'] = json_encode($this->meta, JSON_UNESCAPED_SLASHES);
|
|
}
|
|
|
|
DB::table('user_discovery_events')->insertOrIgnore($insertPayload);
|
|
|
|
$profileService->applyEvent(
|
|
userId: $this->userId,
|
|
eventType: $this->eventType,
|
|
artworkId: $this->artworkId,
|
|
categoryId: $categoryId !== null ? (int) $categoryId : null,
|
|
occurredAt: $occurredAt,
|
|
eventId: $this->eventId,
|
|
algoVersion: $this->algoVersion,
|
|
eventMeta: $this->meta
|
|
);
|
|
} catch (\Throwable $e) {
|
|
Log::error('IngestUserDiscoveryEventJob failed', [
|
|
'event_id' => $this->eventId,
|
|
'user_id' => $this->userId,
|
|
'artwork_id' => $this->artworkId,
|
|
'event_type' => $this->eventType,
|
|
'error' => $e->getMessage(),
|
|
]);
|
|
|
|
throw $e;
|
|
}
|
|
}
|
|
}
|