feat: add Reverb realtime messaging

This commit is contained in:
2026-03-21 12:51:59 +01:00
parent 60f78e8235
commit e8b5edf5d2
45 changed files with 3609 additions and 339 deletions

View File

@@ -0,0 +1,98 @@
<?php
namespace App\Services\Messaging;
use App\Events\ConversationUpdated;
use App\Events\MessageRead;
use App\Models\Conversation;
use App\Models\ConversationParticipant;
use App\Models\Message;
use App\Models\User;
use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Facades\DB;
class ConversationStateService
{
public function activeParticipantIds(Conversation|int $conversation): array
{
$conversationId = $conversation instanceof Conversation ? $conversation->id : $conversation;
return ConversationParticipant::query()
->where('conversation_id', $conversationId)
->whereNull('left_at')
->pluck('user_id')
->map(fn ($id) => (int) $id)
->all();
}
public function touchConversationCachesForUsers(array $userIds): void
{
foreach (array_unique($userIds) as $userId) {
if (! $userId) {
continue;
}
$versionKey = "messages:conversations:version:{$userId}";
Cache::add($versionKey, 1, now()->addDay());
Cache::increment($versionKey);
}
}
public function markConversationRead(Conversation $conversation, User $user, ?int $messageId = null): ConversationParticipant
{
/** @var ConversationParticipant $participant */
$participant = ConversationParticipant::query()
->where('conversation_id', $conversation->id)
->where('user_id', $user->id)
->whereNull('left_at')
->firstOrFail();
$lastReadableMessage = Message::query()
->where('conversation_id', $conversation->id)
->whereNull('deleted_at')
->where('sender_id', '!=', $user->id)
->when($messageId, fn ($query) => $query->where('id', '<=', $messageId))
->orderByDesc('id')
->first();
$readAt = now();
$participant->update([
'last_read_at' => $readAt,
'last_read_message_id' => $lastReadableMessage?->id,
]);
if ($lastReadableMessage) {
$messageReads = Message::query()
->select(['id'])
->where('conversation_id', $conversation->id)
->whereNull('deleted_at')
->where('sender_id', '!=', $user->id)
->where('id', '<=', $lastReadableMessage->id)
->get()
->map(fn (Message $message) => [
'message_id' => $message->id,
'user_id' => $user->id,
'read_at' => $readAt,
])
->all();
if (! empty($messageReads)) {
DB::table('message_reads')->upsert($messageReads, ['message_id', 'user_id'], ['read_at']);
}
}
$participantIds = $this->activeParticipantIds($conversation);
$this->touchConversationCachesForUsers($participantIds);
DB::afterCommit(function () use ($conversation, $participant, $user): void {
event(new MessageRead($conversation, $participant, $user));
foreach ($this->activeParticipantIds($conversation) as $participantId) {
event(new ConversationUpdated($participantId, $conversation, 'message.read'));
}
});
return $participant->fresh(['user']);
}
}

View File

@@ -0,0 +1,152 @@
<?php
namespace App\Services\Messaging;
use App\Models\Conversation;
use App\Models\ConversationParticipant;
use App\Models\Message;
use App\Models\MessageAttachment;
use App\Models\User;
class MessagingPayloadFactory
{
public function message(Message $message, ?int $viewerId = null): array
{
$message->loadMissing([
'sender:id,username,name',
'attachments',
'reactions',
]);
return [
'id' => (int) $message->id,
'uuid' => (string) $message->uuid,
'client_temp_id' => $message->client_temp_id,
'conversation_id' => (int) $message->conversation_id,
'sender_id' => (int) $message->sender_id,
'sender' => $this->userSummary($message->sender),
'message_type' => (string) ($message->message_type ?? 'text'),
'body' => (string) ($message->body ?? ''),
'reply_to_message_id' => $message->reply_to_message_id ? (int) $message->reply_to_message_id : null,
'attachments' => $message->attachments->map(fn (MessageAttachment $attachment) => $this->attachment($attachment))->values()->all(),
'reaction_summary' => $this->reactionSummary($message, $viewerId),
'edited_at' => optional($message->edited_at)?->toIso8601String(),
'deleted_at' => optional($message->deleted_at)?->toIso8601String(),
'created_at' => optional($message->created_at)?->toIso8601String(),
'updated_at' => optional($message->updated_at)?->toIso8601String(),
];
}
public function conversationSummary(Conversation $conversation, int $viewerId): array
{
$conversation->loadMissing([
'allParticipants.user:id,username,name',
'latestMessage.sender:id,username,name',
'latestMessage.attachments',
'latestMessage.reactions',
]);
/** @var ConversationParticipant|null $myParticipant */
$myParticipant = $conversation->allParticipants->firstWhere('user_id', $viewerId);
return [
'id' => (int) $conversation->id,
'uuid' => (string) $conversation->uuid,
'type' => (string) $conversation->type,
'title' => $conversation->title,
'is_active' => (bool) ($conversation->is_active ?? true),
'last_message_at' => optional($conversation->last_message_at)?->toIso8601String(),
'unread_count' => $conversation->unreadCountFor($viewerId),
'my_participant' => $myParticipant ? $this->participant($myParticipant) : null,
'all_participants' => $conversation->allParticipants
->whereNull('left_at')
->map(fn (ConversationParticipant $participant) => $this->participant($participant))
->values()
->all(),
'latest_message' => $conversation->latestMessage
? $this->message($conversation->latestMessage, $viewerId)
: null,
];
}
public function presenceUser(User $user): array
{
return [
'id' => (int) $user->id,
'username' => (string) $user->username,
'display_name' => (string) ($user->name ?: $user->username),
'avatar_thumb_url' => null,
];
}
public function userSummary(?User $user): array
{
if (! $user) {
return [
'id' => null,
'username' => null,
'display_name' => null,
'avatar_thumb_url' => null,
];
}
return [
'id' => (int) $user->id,
'username' => (string) $user->username,
'display_name' => (string) ($user->name ?: $user->username),
'avatar_thumb_url' => null,
];
}
private function participant(ConversationParticipant $participant): array
{
return [
'id' => (int) $participant->id,
'user_id' => (int) $participant->user_id,
'role' => (string) $participant->role,
'last_read_at' => optional($participant->last_read_at)?->toIso8601String(),
'last_read_message_id' => $participant->last_read_message_id ? (int) $participant->last_read_message_id : null,
'is_muted' => (bool) $participant->is_muted,
'is_archived' => (bool) $participant->is_archived,
'is_pinned' => (bool) $participant->is_pinned,
'is_hidden' => (bool) ($participant->is_hidden ?? false),
'pinned_at' => optional($participant->pinned_at)?->toIso8601String(),
'joined_at' => optional($participant->joined_at)?->toIso8601String(),
'left_at' => optional($participant->left_at)?->toIso8601String(),
'user' => $this->userSummary($participant->user),
];
}
private function attachment(MessageAttachment $attachment): array
{
return [
'id' => (int) $attachment->id,
'disk' => (string) ($attachment->disk ?: config('messaging.attachments.disk', 'local')),
'type' => (string) $attachment->type,
'mime' => (string) $attachment->mime,
'size_bytes' => (int) $attachment->size_bytes,
'width' => $attachment->width ? (int) $attachment->width : null,
'height' => $attachment->height ? (int) $attachment->height : null,
'original_name' => (string) $attachment->original_name,
];
}
private function reactionSummary(Message $message, ?int $viewerId = null): array
{
$counts = [];
$mine = [];
foreach ($message->reactions as $reaction) {
$emoji = (string) $reaction->reaction;
$counts[$emoji] = ($counts[$emoji] ?? 0) + 1;
if ($viewerId !== null && (int) $reaction->user_id === $viewerId) {
$mine[] = $emoji;
}
}
$counts['me'] = array_values(array_unique($mine));
return $counts;
}
}

View File

@@ -0,0 +1,126 @@
<?php
namespace App\Services\Messaging;
use App\Events\ConversationUpdated;
use App\Events\MessageCreated;
use App\Models\Conversation;
use App\Models\Message;
use App\Models\MessageAttachment;
use App\Models\User;
use App\Services\Messaging\ConversationStateService;
use Illuminate\Http\UploadedFile;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Storage;
class SendMessageAction
{
public function __construct(
private readonly ConversationStateService $conversationState,
private readonly MessageNotificationService $notifications,
private readonly MessageSearchIndexer $searchIndexer,
) {}
public function execute(Conversation $conversation, User $sender, array $payload): Message
{
$body = trim((string) ($payload['body'] ?? ''));
$files = $payload['attachments'] ?? [];
/** @var Message $message */
$message = DB::transaction(function () use ($conversation, $sender, $payload, $body, $files) {
$message = Message::query()->create([
'conversation_id' => $conversation->id,
'sender_id' => $sender->id,
'client_temp_id' => $payload['client_temp_id'] ?? null,
'message_type' => empty($files) ? 'text' : ($body === '' ? 'attachment' : 'text'),
'body' => $body,
'reply_to_message_id' => $payload['reply_to_message_id'] ?? null,
]);
foreach ($files as $file) {
if ($file instanceof UploadedFile) {
$this->storeAttachment($file, $message, $sender->id);
}
}
$conversation->forceFill([
'last_message_id' => $message->id,
'last_message_at' => $message->created_at,
])->save();
return $message;
});
$participantIds = $this->conversationState->activeParticipantIds($conversation);
$this->conversationState->touchConversationCachesForUsers($participantIds);
DB::afterCommit(function () use ($conversation, $message, $sender, $participantIds): void {
$this->notifications->notifyNewMessage($conversation, $message, $sender);
$this->searchIndexer->indexMessage($message);
event(new MessageCreated($conversation, $message, $sender->id));
foreach ($participantIds as $participantId) {
event(new ConversationUpdated($participantId, $conversation, 'message.created'));
}
});
return $message->fresh(['sender:id,username,name', 'attachments', 'reactions']);
}
private function storeAttachment(UploadedFile $file, Message $message, int $userId): void
{
$mime = (string) $file->getMimeType();
$finfo = finfo_open(FILEINFO_MIME_TYPE);
$finfoMime = $finfo ? (string) finfo_file($finfo, $file->getPathname()) : '';
if ($finfo) {
finfo_close($finfo);
}
$detectedMime = $finfoMime !== '' ? $finfoMime : $mime;
$allowedImage = (array) config('messaging.attachments.allowed_image_mimes', []);
$allowedFile = (array) config('messaging.attachments.allowed_file_mimes', []);
$type = in_array($detectedMime, $allowedImage, true) ? 'image' : 'file';
$allowed = $type === 'image' ? $allowedImage : $allowedFile;
abort_unless(in_array($detectedMime, $allowed, true), 422, 'Unsupported attachment type.');
$maxBytes = $type === 'image'
? ((int) config('messaging.attachments.max_image_kb', 10240) * 1024)
: ((int) config('messaging.attachments.max_file_kb', 25600) * 1024);
abort_if($file->getSize() > $maxBytes, 422, 'Attachment exceeds allowed size.');
$year = now()->format('Y');
$month = now()->format('m');
$ext = strtolower($file->getClientOriginalExtension() ?: $file->extension() ?: 'bin');
$path = "messages/{$message->conversation_id}/{$year}/{$month}/" . uniqid('att_', true) . ".{$ext}";
$diskName = (string) config('messaging.attachments.disk', 'local');
Storage::disk($diskName)->put($path, file_get_contents($file->getPathname()));
$width = null;
$height = null;
if ($type === 'image') {
$dimensions = @getimagesize($file->getPathname());
$width = isset($dimensions[0]) ? (int) $dimensions[0] : null;
$height = isset($dimensions[1]) ? (int) $dimensions[1] : null;
}
MessageAttachment::query()->create([
'message_id' => $message->id,
'disk' => $diskName,
'user_id' => $userId,
'type' => $type,
'mime' => $detectedMime,
'size_bytes' => (int) $file->getSize(),
'width' => $width,
'height' => $height,
'sha256' => hash_file('sha256', $file->getPathname()),
'original_name' => substr((string) $file->getClientOriginalName(), 0, 255),
'storage_path' => $path,
'created_at' => now(),
]);
}
}