diff --git a/src/ws/state/connection/handler.ts b/src/ws/state/connection/handler.ts new file mode 100644 index 0000000..43c3620 --- /dev/null +++ b/src/ws/state/connection/handler.ts @@ -0,0 +1,181 @@ +import { Logger } from '@nestjs/common'; +import { WsException } from '@nestjs/websockets'; +import type { AuthenticatedSocket } from '../../../types/socket'; +import { JwtVerificationService } from '../../../auth/services/jwt-verification.service'; +import { PrismaService } from '../../../database/prisma.service'; +import { UserSocketService } from '../user-socket.service'; +import { WsNotificationService } from '../ws-notification.service'; +import { WS_EVENT } from '../ws-events'; +import { UsersService } from '../../../users/users.service'; + +export class ConnectionHandler { + constructor( + private readonly jwtVerificationService: JwtVerificationService, + private readonly prisma: PrismaService, + private readonly usersService: UsersService, + private readonly userSocketService: UserSocketService, + private readonly wsNotificationService: WsNotificationService, + private readonly logger: Logger, + ) {} + + handleConnection(client: AuthenticatedSocket) { + try { + const token = this.jwtVerificationService.extractToken(client.handshake); + if (!token) { + this.logger.warn('WebSocket connection attempt without token'); + client.disconnect(); + return; + } + + const payload = this.jwtVerificationService.verifyToken(token); + + if (!payload.sub) { + throw new WsException('Invalid token: missing subject'); + } + + client.data.user = { + userId: payload.sub, + email: payload.email, + roles: payload.roles, + }; + + // Initialize defaults + client.data.activeDollId = null; + client.data.friends = new Set(); + // userId is not set yet, it will be set in handleClientInitialize + + this.logger.log(`WebSocket authenticated (Pending Init): ${payload.sub}`); + + this.logger.log( + `Client id: ${client.id} connected (user: ${payload.sub})`, + ); + } catch (error: unknown) { + const errorMessage = + error instanceof Error ? error.message : 'Unknown error'; + this.logger.error(`Connection error: ${errorMessage}`); + client.disconnect(); + } + } + + async handleClientInitialize(client: AuthenticatedSocket) { + try { + let userTokenData = client.data.user; + + if (!userTokenData) { + this.logger.warn( + 'No user data found during initialization - attempting handshake token verification', + ); + + const token = this.jwtVerificationService.extractToken( + client.handshake, + ); + if (!token) { + throw new WsException('Unauthorized: No user data found'); + } + + const payload = this.jwtVerificationService.verifyToken(token); + if (!payload.sub) { + throw new WsException('Invalid token: missing subject'); + } + + userTokenData = { + userId: payload.sub, + email: payload.email, + roles: payload.roles, + }; + client.data.user = userTokenData; + + // Ensure defaults exist if this path runs on reconnect + client.data.activeDollId = client.data.activeDollId ?? null; + client.data.friends = client.data.friends ?? new Set(); + + this.logger.log( + `WebSocket authenticated via initialize fallback (Pending Init): ${payload.sub}`, + ); + + this.logger.log( + `WebSocket authenticated via initialize fallback (Pending Init): ${payload.sub}`, + ); + } + + const user = await this.usersService.findOne(userTokenData.userId); + + // 2. Register socket mapping (Redis Write) + await this.userSocketService.setSocket(user.id, client.id); + client.data.userId = user.id; + + // 3. Fetch initial state (DB Read) + const [userWithDoll, friends] = await Promise.all([ + this.prisma.user.findUnique({ + where: { id: user.id }, + select: { activeDollId: true }, + }), + this.prisma.friendship.findMany({ + where: { userId: user.id }, + select: { friendId: true }, + }), + ]); + + client.data.activeDollId = userWithDoll?.activeDollId || null; + client.data.friends = new Set(friends.map((f) => f.friendId)); + + this.logger.log(`Client initialized: ${user.id} (${client.id})`); + + // 4. Notify client + client.emit(WS_EVENT.INITIALIZED, { + userId: user.id, + activeDollId: client.data.activeDollId, + }); + } catch (error) { + const errorMessage = + error instanceof Error ? error.message : String(error); + this.logger.error(`Initialization error: ${errorMessage}`); + client.emit('auth-error', { message: errorMessage }); + client.disconnect(); + } + } + + async handleDisconnect(client: AuthenticatedSocket) { + const user = client.data.user; + + if (user) { + const userId = client.data.userId; + + if (userId) { + // Check if this socket is still the active one for the user + const currentSocketId = await this.userSocketService.getSocket(userId); + if (currentSocketId === client.id) { + await this.userSocketService.removeSocket(userId); + // Note: throttling remove is done in gateway + + // Notify friends that this user has disconnected + const friends = client.data.friends; + if (friends) { + const friendIds = Array.from(friends); + const friendSockets = + await this.userSocketService.getFriendsSockets(friendIds); + + for (const { socketId } of friendSockets) { + this.wsNotificationService.emitToSocket( + socketId, + WS_EVENT.FRIEND_DISCONNECTED, + { + userId: userId, + }, + ); + } + } + } + } + // If userId is undefined, client never initialized, so no cleanup needed + } + + this.logger.log( + `Client id: ${client.id} disconnected (user: ${user?.userId || 'unknown'})`, + ); + + this.logger.log( + `Client id: ${client.id} disconnected (user: ${user?.userId || 'unknown'})`, + ); + } +} diff --git a/src/ws/state/cursor/handler.ts b/src/ws/state/cursor/handler.ts new file mode 100644 index 0000000..bb24f41 --- /dev/null +++ b/src/ws/state/cursor/handler.ts @@ -0,0 +1,55 @@ +import { Logger } from '@nestjs/common'; +import type { AuthenticatedSocket } from '../../../types/socket'; +import { CursorPositionDto } from '../../dto/cursor-position.dto'; +import { WS_EVENT } from '../ws-events'; +import { Validator } from '../utils/validation'; +import { Throttler } from '../utils/throttling'; +import { Broadcaster } from '../utils/broadcasting'; + +const CURSOR_THROTTLE_MS = 100; + +export class CursorHandler { + private readonly logger = new Logger(CursorHandler.name); + + constructor( + private readonly broadcaster: Broadcaster, + private readonly throttler: Throttler, + ) {} + + async handleCursorReportPosition( + client: AuthenticatedSocket, + data: CursorPositionDto, + ) { + Validator.validateUser(client); + + const currentUserId = client.data.userId; + + if (!currentUserId) { + // User has not initialized yet + return; + } + + // Do not broadcast cursor position if user has no active doll + if (!client.data.activeDollId) { + return; + } + + if (this.throttler.isThrottled(currentUserId, CURSOR_THROTTLE_MS)) { + return; + } + + // Broadcast to online friends + const friends = client.data.friends; + if (friends) { + const payload = { + userId: currentUserId, + position: data, + }; + await this.broadcaster.broadcastToFriends( + friends, + WS_EVENT.FRIEND_CURSOR_POSITION, + payload, + ); + } + } +} diff --git a/src/ws/state/interaction/handler.ts b/src/ws/state/interaction/handler.ts new file mode 100644 index 0000000..1d0972f --- /dev/null +++ b/src/ws/state/interaction/handler.ts @@ -0,0 +1,76 @@ +import { Logger } from '@nestjs/common'; +import { WsException } from '@nestjs/websockets'; +import type { AuthenticatedSocket } from '../../../types/socket'; +import { SendInteractionDto } from '../../dto/send-interaction.dto'; +import { InteractionPayloadDto } from '../../dto/interaction-payload.dto'; +import { PrismaService } from '../../../database/prisma.service'; +import { UserSocketService } from '../user-socket.service'; +import { WsNotificationService } from '../ws-notification.service'; +import { WS_EVENT } from '../ws-events'; +import { Validator } from '../utils/validation'; + +export class InteractionHandler { + private readonly logger = new Logger(InteractionHandler.name); + + constructor( + private readonly prisma: PrismaService, + private readonly userSocketService: UserSocketService, + private readonly wsNotificationService: WsNotificationService, + ) {} + + async handleSendInteraction( + client: AuthenticatedSocket, + data: SendInteractionDto, + ) { + const user = client.data.user; + const currentUserId = Validator.validateInitialized(client); + + if (!user) { + throw new WsException('Unauthorized: User not initialized'); + } + + // 1. Verify recipient is a friend + const friends = client.data.friends; + if (!friends || !friends.has(data.recipientUserId)) { + client.emit(WS_EVENT.INTERACTION_DELIVERY_FAILED, { + recipientUserId: data.recipientUserId, + reason: 'Recipient is not a friend', + }); + return; + } + + // 2. Check if recipient is online + const isOnline = await this.userSocketService.isUserOnline( + data.recipientUserId, + ); + if (!isOnline) { + client.emit(WS_EVENT.INTERACTION_DELIVERY_FAILED, { + recipientUserId: data.recipientUserId, + reason: 'Recipient is offline', + }); + return; + } + + // 3. Construct payload + const sender = await this.prisma.user.findUnique({ + where: { id: currentUserId }, + select: { name: true, username: true }, + }); + const senderName = sender?.name || sender?.username || 'Unknown'; + + const payload: InteractionPayloadDto = { + senderUserId: currentUserId, + senderName, + content: data.content, + type: data.type, + timestamp: new Date().toISOString(), + }; + + // 4. Send to recipient + await this.wsNotificationService.emitToUser( + data.recipientUserId, + WS_EVENT.INTERACTION_RECEIVED, + payload, + ); + } +} diff --git a/src/ws/state/state.gateway.ts b/src/ws/state/state.gateway.ts index ddab467..102a890 100644 --- a/src/ws/state/state.gateway.ts +++ b/src/ws/state/state.gateway.ts @@ -6,7 +6,6 @@ import { SubscribeMessage, WebSocketGateway, WebSocketServer, - WsException, } from '@nestjs/websockets'; import Redis from 'ioredis'; import type { Server } from 'socket.io'; @@ -19,14 +18,18 @@ import { JwtVerificationService } from '../../auth/services/jwt-verification.ser import { CursorPositionDto } from '../dto/cursor-position.dto'; import { UserStatusDto } from '../dto/user-status.dto'; import { SendInteractionDto } from '../dto/send-interaction.dto'; -import { InteractionPayloadDto } from '../dto/interaction-payload.dto'; import { PrismaService } from '../../database/prisma.service'; import { UserSocketService } from './user-socket.service'; import { WsNotificationService } from './ws-notification.service'; import { WS_EVENT, REDIS_CHANNEL } from './ws-events'; import { UsersService } from '../../users/users.service'; - -const USER_STATUS_BROADCAST_THROTTLING_MS = 200; +import { ConnectionHandler } from './connection/handler'; +import { CursorHandler } from './cursor/handler'; +import { StatusHandler } from './status/handler'; +import { InteractionHandler } from './interaction/handler'; +import { RedisHandler } from './utils/redis-handler'; +import { Broadcaster } from './utils/broadcasting'; +import { Throttler } from './utils/throttling'; @WebSocketGateway({ cors: { @@ -38,10 +41,17 @@ export class StateGateway implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect { private readonly logger = new Logger(StateGateway.name); - private lastBroadcastMap: Map = new Map(); @WebSocketServer() io: Server; + private readonly throttler = new Throttler(); + private readonly broadcaster: Broadcaster; + private readonly redisHandler: RedisHandler; + private readonly connectionHandler: ConnectionHandler; + private readonly cursorHandler: CursorHandler; + private readonly statusHandler: StatusHandler; + private readonly interactionHandler: InteractionHandler; + constructor( private readonly jwtVerificationService: JwtVerificationService, private readonly prisma: PrismaService, @@ -52,6 +62,27 @@ export class StateGateway @Inject(REDIS_SUBSCRIBER_CLIENT) private readonly redisSubscriber: Redis | null, ) { + this.broadcaster = new Broadcaster( + this.userSocketService, + this.wsNotificationService, + ); + this.redisHandler = new RedisHandler(this.wsNotificationService); + this.connectionHandler = new ConnectionHandler( + this.jwtVerificationService, + this.prisma, + this.usersService, + this.userSocketService, + this.wsNotificationService, + this.logger, + ); + this.cursorHandler = new CursorHandler(this.broadcaster, this.throttler); + this.statusHandler = new StatusHandler(this.broadcaster, this.throttler); + this.interactionHandler = new InteractionHandler( + this.prisma, + this.userSocketService, + this.wsNotificationService, + ); + // Setup Redis subscription for cross-instance communication if (this.redisSubscriber) { this.redisSubscriber @@ -73,10 +104,10 @@ export class StateGateway this.redisSubscriber.on('message', (channel, message) => { if (channel === REDIS_CHANNEL.ACTIVE_DOLL_UPDATE) { // eslint-disable-next-line @typescript-eslint/no-floating-promises - this.handleActiveDollUpdateMessage(message); + this.redisHandler.handleActiveDollUpdateMessage(message); } else if (channel === REDIS_CHANNEL.FRIEND_CACHE_UPDATE) { // eslint-disable-next-line @typescript-eslint/no-floating-promises - this.handleFriendCacheUpdateMessage(message); + this.redisHandler.handleFriendCacheUpdateMessage(message); } }); } @@ -87,199 +118,22 @@ export class StateGateway this.wsNotificationService.setIo(this.io); } - private async handleActiveDollUpdateMessage(message: string) { - try { - const data = JSON.parse(message) as { - userId: string; - dollId: string | null; - }; - const { userId, dollId } = data; - await this.wsNotificationService.updateActiveDollCache(userId, dollId); - } catch (error) { - this.logger.error('Error handling active doll update message', error); - } - } - - private async handleFriendCacheUpdateMessage(message: string) { - try { - const data = JSON.parse(message) as { - userId: string; - friendId: string; - action: 'add' | 'delete'; - }; - const { userId, friendId, action } = data; - await this.wsNotificationService.updateFriendsCacheLocal( - userId, - friendId, - action, - ); - } catch (error) { - this.logger.error('Error handling friend cache update message', error); - } - } - handleConnection(client: AuthenticatedSocket) { - try { - this.logger.debug( - `Connection attempt - handshake auth: ${JSON.stringify(client.handshake.auth)}`, - ); - this.logger.debug( - `Connection attempt - handshake headers: ${JSON.stringify(client.handshake.headers)}`, - ); - - const token = this.jwtVerificationService.extractToken(client.handshake); - - if (!token) { - this.logger.warn('WebSocket connection attempt without token'); - client.disconnect(); - return; - } - - const payload = this.jwtVerificationService.verifyToken(token); - - if (!payload.sub) { - throw new WsException('Invalid token: missing subject'); - } - - client.data.user = { - userId: payload.sub, - email: payload.email, - roles: payload.roles, - }; - - // Initialize defaults - client.data.activeDollId = null; - client.data.friends = new Set(); - // userId is not set yet, it will be set in handleClientInitialize - - this.logger.log(`WebSocket authenticated (Pending Init): ${payload.sub}`); - - const { sockets } = this.io.sockets; - this.logger.log( - `Client id: ${client.id} connected (user: ${payload.sub})`, - ); - this.logger.debug(`Number of connected clients: ${sockets.size}`); - } catch (error: unknown) { - const errorMessage = - error instanceof Error ? error.message : 'Unknown error'; - this.logger.error(`Connection error: ${errorMessage}`); - client.disconnect(); - } + this.connectionHandler.handleConnection(client); } @SubscribeMessage(WS_EVENT.CLIENT_INITIALIZE) async handleClientInitialize(client: AuthenticatedSocket) { - try { - let userTokenData = client.data.user; - - if (!userTokenData) { - this.logger.warn( - 'No user data found during initialization - attempting handshake token verification', - ); - - const token = this.jwtVerificationService.extractToken( - client.handshake, - ); - if (!token) { - throw new WsException('Unauthorized: No user data found'); - } - - const payload = this.jwtVerificationService.verifyToken(token); - if (!payload.sub) { - throw new WsException('Invalid token: missing subject'); - } - - userTokenData = { - userId: payload.sub, - email: payload.email, - roles: payload.roles, - }; - client.data.user = userTokenData; - - // Ensure defaults exist if this path runs on reconnect - client.data.activeDollId = client.data.activeDollId ?? null; - client.data.friends = client.data.friends ?? new Set(); - - this.logger.log( - `WebSocket authenticated via initialize fallback (Pending Init): ${payload.sub}`, - ); - } - - const user = await this.usersService.findOne(userTokenData.userId); - - // 2. Register socket mapping (Redis Write) - await this.userSocketService.setSocket(user.id, client.id); - client.data.userId = user.id; - - // 3. Fetch initial state (DB Read) - const [userWithDoll, friends] = await Promise.all([ - this.prisma.user.findUnique({ - where: { id: user.id }, - select: { activeDollId: true }, - }), - this.prisma.friendship.findMany({ - where: { userId: user.id }, - select: { friendId: true }, - }), - ]); - - client.data.activeDollId = userWithDoll?.activeDollId || null; - client.data.friends = new Set(friends.map((f) => f.friendId)); - - this.logger.log(`Client initialized: ${user.id} (${client.id})`); - - // 4. Notify client - client.emit(WS_EVENT.INITIALIZED, { - userId: user.id, - activeDollId: client.data.activeDollId, - }); - } catch (error) { - const errorMessage = - error instanceof Error ? error.message : String(error); - this.logger.error(`Initialization error: ${errorMessage}`); - client.emit('auth-error', { message: errorMessage }); - client.disconnect(); - } + await this.connectionHandler.handleClientInitialize(client); } async handleDisconnect(client: AuthenticatedSocket) { - const user = client.data.user; - - if (user) { - const userId = client.data.userId; - - if (userId) { - // Check if this socket is still the active one for the user - const currentSocketId = await this.userSocketService.getSocket(userId); - if (currentSocketId === client.id) { - await this.userSocketService.removeSocket(userId); - this.lastBroadcastMap.delete(userId); - - // Notify friends that this user has disconnected - const friends = client.data.friends; - if (friends) { - const friendIds = Array.from(friends); - const friendSockets = - await this.userSocketService.getFriendsSockets(friendIds); - - for (const { socketId } of friendSockets) { - this.wsNotificationService.emitToSocket( - socketId, - WS_EVENT.FRIEND_DISCONNECTED, - { - userId: userId, - }, - ); - } - } - } - } - // If userId is undefined, client never initialized, so no cleanup needed + await this.connectionHandler.handleDisconnect(client); + // Remove from throttler + const userId = client.data.userId; + if (userId) { + this.throttler.remove(userId); } - - this.logger.log( - `Client id: ${client.id} disconnected (user: ${user?.userId || 'unknown'})`, - ); } async isUserOnline(userId: string): Promise { @@ -291,50 +145,7 @@ export class StateGateway client: AuthenticatedSocket, data: CursorPositionDto, ) { - const user = client.data.user; - - if (!user) { - throw new WsException('Unauthorized'); - } - - const currentUserId = client.data.userId; - - if (!currentUserId) { - // User has not initialized yet - return; - } - - // Do not broadcast cursor position if user has no active doll - if (!client.data.activeDollId) { - return; - } - - const now = Date.now(); - const lastBroadcast = this.lastBroadcastMap.get(currentUserId) || 0; - if (now - lastBroadcast < 100) { - return; - } - this.lastBroadcastMap.set(currentUserId, now); - - // Broadcast to online friends - const friends = client.data.friends; - if (friends) { - const friendIds = Array.from(friends); - const friendSockets = - await this.userSocketService.getFriendsSockets(friendIds); - - for (const { socketId } of friendSockets) { - const payload = { - userId: currentUserId, - position: data, - }; - this.wsNotificationService.emitToSocket( - socketId, - WS_EVENT.FRIEND_CURSOR_POSITION, - payload, - ); - } - } + await this.cursorHandler.handleCursorReportPosition(client, data); } @SubscribeMessage(WS_EVENT.CLIENT_REPORT_USER_STATUS) @@ -342,60 +153,7 @@ export class StateGateway client: AuthenticatedSocket, data: UserStatusDto, ) { - const user = client.data.user; - - if (!user) { - throw new WsException('Unauthorized'); - } - - const currentUserId = client.data.userId; - - if (!currentUserId) { - // User has not initialized yet - return; - } - - // Do not broadcast user status if user has no active doll - if (!client.data.activeDollId) { - return; - } - - const now = Date.now(); - const lastBroadcast = this.lastBroadcastMap.get(currentUserId) || 0; - if (now - lastBroadcast < USER_STATUS_BROADCAST_THROTTLING_MS) { - return; - } - this.lastBroadcastMap.set(currentUserId, now); - - // Broadcast to online friends - const friends = client.data.friends; - if (friends) { - const friendIds = Array.from(friends); - try { - const friendSockets = - await this.userSocketService.getFriendsSockets(friendIds); - - for (const { socketId } of friendSockets) { - const payload = { - userId: currentUserId, - status: data, - }; - this.wsNotificationService.emitToSocket( - socketId, - WS_EVENT.FRIEND_USER_STATUS, - payload, - ); - } - this.logger.debug( - `Broadcasted user status to ${friendSockets.length} friends for user ${currentUserId}`, - ); - } catch (error) { - this.logger.error( - `Failed to broadcast user status for user ${currentUserId}: ${(error as Error).message}`, - (error as Error).stack, - ); - } - } + await this.statusHandler.handleClientReportUserStatus(client, data); } @SubscribeMessage(WS_EVENT.CLIENT_SEND_INTERACTION) @@ -403,55 +161,6 @@ export class StateGateway client: AuthenticatedSocket, data: SendInteractionDto, ) { - const user = client.data.user; - const currentUserId = client.data.userId; - - if (!user || !currentUserId) { - throw new WsException('Unauthorized: User not initialized'); - } - - // 1. Verify recipient is a friend - const friends = client.data.friends; - if (!friends || !friends.has(data.recipientUserId)) { - client.emit(WS_EVENT.INTERACTION_DELIVERY_FAILED, { - recipientUserId: data.recipientUserId, - reason: 'Recipient is not a friend', - }); - return; - } - - // 2. Check if recipient is online - const isOnline = await this.userSocketService.isUserOnline( - data.recipientUserId, - ); - if (!isOnline) { - client.emit(WS_EVENT.INTERACTION_DELIVERY_FAILED, { - recipientUserId: data.recipientUserId, - reason: 'Recipient is offline', - }); - return; - } - - // 3. Construct payload - const sender = await this.prisma.user.findUnique({ - where: { id: currentUserId }, - select: { name: true, username: true }, - }); - const senderName = sender?.name || sender?.username || 'Unknown'; - - const payload: InteractionPayloadDto = { - senderUserId: currentUserId, - senderName, - content: data.content, - type: data.type, - timestamp: new Date().toISOString(), - }; - - // 4. Send to recipient - await this.wsNotificationService.emitToUser( - data.recipientUserId, - WS_EVENT.INTERACTION_RECEIVED, - payload, - ); + await this.interactionHandler.handleSendInteraction(client, data); } } diff --git a/src/ws/state/status/handler.ts b/src/ws/state/status/handler.ts new file mode 100644 index 0000000..aff12e5 --- /dev/null +++ b/src/ws/state/status/handler.ts @@ -0,0 +1,70 @@ +import { Logger } from '@nestjs/common'; +import type { AuthenticatedSocket } from '../../../types/socket'; +import { UserStatusDto } from '../../dto/user-status.dto'; +import { WS_EVENT } from '../ws-events'; +import { Validator } from '../utils/validation'; +import { Throttler } from '../utils/throttling'; +import { Broadcaster } from '../utils/broadcasting'; + +const USER_STATUS_BROADCAST_THROTTLING_MS = 200; + +export class StatusHandler { + private readonly logger = new Logger(StatusHandler.name); + + constructor( + private readonly broadcaster: Broadcaster, + private readonly throttler: Throttler, + ) {} + + async handleClientReportUserStatus( + client: AuthenticatedSocket, + data: UserStatusDto, + ) { + Validator.validateUser(client); + + const currentUserId = client.data.userId; + + if (!currentUserId) { + // User has not initialized yet + return; + } + + // Do not broadcast user status if user has no active doll + if (!client.data.activeDollId) { + return; + } + + if ( + this.throttler.isThrottled( + currentUserId, + USER_STATUS_BROADCAST_THROTTLING_MS, + ) + ) { + return; + } + + // Broadcast to online friends + const friends = client.data.friends; + if (friends) { + try { + const payload = { + userId: currentUserId, + status: data, + }; + await this.broadcaster.broadcastToFriends( + friends, + WS_EVENT.FRIEND_USER_STATUS, + payload, + ); + this.logger.debug( + `Broadcasted user status to friends for user ${currentUserId}`, + ); + } catch (error) { + this.logger.error( + `Failed to broadcast user status for user ${currentUserId}: ${(error as Error).message}`, + (error as Error).stack, + ); + } + } + } +} diff --git a/src/ws/state/utils/broadcasting.ts b/src/ws/state/utils/broadcasting.ts new file mode 100644 index 0000000..1c579b4 --- /dev/null +++ b/src/ws/state/utils/broadcasting.ts @@ -0,0 +1,19 @@ +import { UserSocketService } from '../user-socket.service'; +import { WsNotificationService } from '../ws-notification.service'; + +export class Broadcaster { + constructor( + private readonly userSocketService: UserSocketService, + private readonly wsNotificationService: WsNotificationService, + ) {} + + async broadcastToFriends(friends: Set, event: string, payload: any) { + const friendIds = Array.from(friends); + const friendSockets = + await this.userSocketService.getFriendsSockets(friendIds); + + for (const { socketId } of friendSockets) { + this.wsNotificationService.emitToSocket(socketId, event, payload); + } + } +} diff --git a/src/ws/state/utils/redis-handler.ts b/src/ws/state/utils/redis-handler.ts new file mode 100644 index 0000000..8df961f --- /dev/null +++ b/src/ws/state/utils/redis-handler.ts @@ -0,0 +1,39 @@ +import { Logger } from '@nestjs/common'; +import { WsNotificationService } from '../ws-notification.service'; + +export class RedisHandler { + private readonly logger = new Logger(RedisHandler.name); + + constructor(private readonly wsNotificationService: WsNotificationService) {} + + async handleActiveDollUpdateMessage(message: string): Promise { + try { + const data = JSON.parse(message) as { + userId: string; + dollId: string | null; + }; + const { userId, dollId } = data; + await this.wsNotificationService.updateActiveDollCache(userId, dollId); + } catch (error) { + this.logger.error('Error handling active doll update message', error); + } + } + + async handleFriendCacheUpdateMessage(message: string): Promise { + try { + const data = JSON.parse(message) as { + userId: string; + friendId: string; + action: 'add' | 'delete'; + }; + const { userId, friendId, action } = data; + await this.wsNotificationService.updateFriendsCacheLocal( + userId, + friendId, + action, + ); + } catch (error) { + this.logger.error('Error handling friend cache update message', error); + } + } +} diff --git a/src/ws/state/utils/throttling.ts b/src/ws/state/utils/throttling.ts new file mode 100644 index 0000000..7b3c89c --- /dev/null +++ b/src/ws/state/utils/throttling.ts @@ -0,0 +1,17 @@ +export class Throttler { + private lastBroadcastMap: Map = new Map(); + + isThrottled(userId: string, throttleMs: number): boolean { + const now = Date.now(); + const lastBroadcast = this.lastBroadcastMap.get(userId) || 0; + if (now - lastBroadcast < throttleMs) { + return true; + } + this.lastBroadcastMap.set(userId, now); + return false; + } + + remove(userId: string) { + this.lastBroadcastMap.delete(userId); + } +} diff --git a/src/ws/state/utils/validation.ts b/src/ws/state/utils/validation.ts new file mode 100644 index 0000000..32fbbd4 --- /dev/null +++ b/src/ws/state/utils/validation.ts @@ -0,0 +1,24 @@ +import { WsException } from '@nestjs/websockets'; +import type { AuthenticatedSocket } from '../../../types/socket'; + +export class Validator { + static validateUser(client: AuthenticatedSocket): void { + if (!client.data.user) { + throw new WsException('Unauthorized'); + } + } + + static validateInitialized(client: AuthenticatedSocket): string { + const userId = client.data.userId; + if (!userId) { + throw new WsException('User not initialized'); + } + return userId; + } + + static validateActiveDoll(client: AuthenticatedSocket): void { + if (!client.data.activeDollId) { + throw new WsException('No active doll'); + } + } +}