broke down ws

This commit is contained in:
2026-02-11 17:57:57 +08:00
parent 641f82ec49
commit c252e184a7
9 changed files with 529 additions and 339 deletions

View File

@@ -0,0 +1,181 @@
import { Logger } from '@nestjs/common';
import { WsException } from '@nestjs/websockets';
import type { AuthenticatedSocket } from '../../../types/socket';
import { JwtVerificationService } from '../../../auth/services/jwt-verification.service';
import { PrismaService } from '../../../database/prisma.service';
import { UserSocketService } from '../user-socket.service';
import { WsNotificationService } from '../ws-notification.service';
import { WS_EVENT } from '../ws-events';
import { UsersService } from '../../../users/users.service';
export class ConnectionHandler {
constructor(
private readonly jwtVerificationService: JwtVerificationService,
private readonly prisma: PrismaService,
private readonly usersService: UsersService,
private readonly userSocketService: UserSocketService,
private readonly wsNotificationService: WsNotificationService,
private readonly logger: Logger,
) {}
handleConnection(client: AuthenticatedSocket) {
try {
const token = this.jwtVerificationService.extractToken(client.handshake);
if (!token) {
this.logger.warn('WebSocket connection attempt without token');
client.disconnect();
return;
}
const payload = this.jwtVerificationService.verifyToken(token);
if (!payload.sub) {
throw new WsException('Invalid token: missing subject');
}
client.data.user = {
userId: payload.sub,
email: payload.email,
roles: payload.roles,
};
// Initialize defaults
client.data.activeDollId = null;
client.data.friends = new Set();
// userId is not set yet, it will be set in handleClientInitialize
this.logger.log(`WebSocket authenticated (Pending Init): ${payload.sub}`);
this.logger.log(
`Client id: ${client.id} connected (user: ${payload.sub})`,
);
} catch (error: unknown) {
const errorMessage =
error instanceof Error ? error.message : 'Unknown error';
this.logger.error(`Connection error: ${errorMessage}`);
client.disconnect();
}
}
async handleClientInitialize(client: AuthenticatedSocket) {
try {
let userTokenData = client.data.user;
if (!userTokenData) {
this.logger.warn(
'No user data found during initialization - attempting handshake token verification',
);
const token = this.jwtVerificationService.extractToken(
client.handshake,
);
if (!token) {
throw new WsException('Unauthorized: No user data found');
}
const payload = this.jwtVerificationService.verifyToken(token);
if (!payload.sub) {
throw new WsException('Invalid token: missing subject');
}
userTokenData = {
userId: payload.sub,
email: payload.email,
roles: payload.roles,
};
client.data.user = userTokenData;
// Ensure defaults exist if this path runs on reconnect
client.data.activeDollId = client.data.activeDollId ?? null;
client.data.friends = client.data.friends ?? new Set();
this.logger.log(
`WebSocket authenticated via initialize fallback (Pending Init): ${payload.sub}`,
);
this.logger.log(
`WebSocket authenticated via initialize fallback (Pending Init): ${payload.sub}`,
);
}
const user = await this.usersService.findOne(userTokenData.userId);
// 2. Register socket mapping (Redis Write)
await this.userSocketService.setSocket(user.id, client.id);
client.data.userId = user.id;
// 3. Fetch initial state (DB Read)
const [userWithDoll, friends] = await Promise.all([
this.prisma.user.findUnique({
where: { id: user.id },
select: { activeDollId: true },
}),
this.prisma.friendship.findMany({
where: { userId: user.id },
select: { friendId: true },
}),
]);
client.data.activeDollId = userWithDoll?.activeDollId || null;
client.data.friends = new Set(friends.map((f) => f.friendId));
this.logger.log(`Client initialized: ${user.id} (${client.id})`);
// 4. Notify client
client.emit(WS_EVENT.INITIALIZED, {
userId: user.id,
activeDollId: client.data.activeDollId,
});
} catch (error) {
const errorMessage =
error instanceof Error ? error.message : String(error);
this.logger.error(`Initialization error: ${errorMessage}`);
client.emit('auth-error', { message: errorMessage });
client.disconnect();
}
}
async handleDisconnect(client: AuthenticatedSocket) {
const user = client.data.user;
if (user) {
const userId = client.data.userId;
if (userId) {
// Check if this socket is still the active one for the user
const currentSocketId = await this.userSocketService.getSocket(userId);
if (currentSocketId === client.id) {
await this.userSocketService.removeSocket(userId);
// Note: throttling remove is done in gateway
// Notify friends that this user has disconnected
const friends = client.data.friends;
if (friends) {
const friendIds = Array.from(friends);
const friendSockets =
await this.userSocketService.getFriendsSockets(friendIds);
for (const { socketId } of friendSockets) {
this.wsNotificationService.emitToSocket(
socketId,
WS_EVENT.FRIEND_DISCONNECTED,
{
userId: userId,
},
);
}
}
}
}
// If userId is undefined, client never initialized, so no cleanup needed
}
this.logger.log(
`Client id: ${client.id} disconnected (user: ${user?.userId || 'unknown'})`,
);
this.logger.log(
`Client id: ${client.id} disconnected (user: ${user?.userId || 'unknown'})`,
);
}
}

View File

@@ -0,0 +1,55 @@
import { Logger } from '@nestjs/common';
import type { AuthenticatedSocket } from '../../../types/socket';
import { CursorPositionDto } from '../../dto/cursor-position.dto';
import { WS_EVENT } from '../ws-events';
import { Validator } from '../utils/validation';
import { Throttler } from '../utils/throttling';
import { Broadcaster } from '../utils/broadcasting';
const CURSOR_THROTTLE_MS = 100;
export class CursorHandler {
private readonly logger = new Logger(CursorHandler.name);
constructor(
private readonly broadcaster: Broadcaster,
private readonly throttler: Throttler,
) {}
async handleCursorReportPosition(
client: AuthenticatedSocket,
data: CursorPositionDto,
) {
Validator.validateUser(client);
const currentUserId = client.data.userId;
if (!currentUserId) {
// User has not initialized yet
return;
}
// Do not broadcast cursor position if user has no active doll
if (!client.data.activeDollId) {
return;
}
if (this.throttler.isThrottled(currentUserId, CURSOR_THROTTLE_MS)) {
return;
}
// Broadcast to online friends
const friends = client.data.friends;
if (friends) {
const payload = {
userId: currentUserId,
position: data,
};
await this.broadcaster.broadcastToFriends(
friends,
WS_EVENT.FRIEND_CURSOR_POSITION,
payload,
);
}
}
}

View File

@@ -0,0 +1,76 @@
import { Logger } from '@nestjs/common';
import { WsException } from '@nestjs/websockets';
import type { AuthenticatedSocket } from '../../../types/socket';
import { SendInteractionDto } from '../../dto/send-interaction.dto';
import { InteractionPayloadDto } from '../../dto/interaction-payload.dto';
import { PrismaService } from '../../../database/prisma.service';
import { UserSocketService } from '../user-socket.service';
import { WsNotificationService } from '../ws-notification.service';
import { WS_EVENT } from '../ws-events';
import { Validator } from '../utils/validation';
export class InteractionHandler {
private readonly logger = new Logger(InteractionHandler.name);
constructor(
private readonly prisma: PrismaService,
private readonly userSocketService: UserSocketService,
private readonly wsNotificationService: WsNotificationService,
) {}
async handleSendInteraction(
client: AuthenticatedSocket,
data: SendInteractionDto,
) {
const user = client.data.user;
const currentUserId = Validator.validateInitialized(client);
if (!user) {
throw new WsException('Unauthorized: User not initialized');
}
// 1. Verify recipient is a friend
const friends = client.data.friends;
if (!friends || !friends.has(data.recipientUserId)) {
client.emit(WS_EVENT.INTERACTION_DELIVERY_FAILED, {
recipientUserId: data.recipientUserId,
reason: 'Recipient is not a friend',
});
return;
}
// 2. Check if recipient is online
const isOnline = await this.userSocketService.isUserOnline(
data.recipientUserId,
);
if (!isOnline) {
client.emit(WS_EVENT.INTERACTION_DELIVERY_FAILED, {
recipientUserId: data.recipientUserId,
reason: 'Recipient is offline',
});
return;
}
// 3. Construct payload
const sender = await this.prisma.user.findUnique({
where: { id: currentUserId },
select: { name: true, username: true },
});
const senderName = sender?.name || sender?.username || 'Unknown';
const payload: InteractionPayloadDto = {
senderUserId: currentUserId,
senderName,
content: data.content,
type: data.type,
timestamp: new Date().toISOString(),
};
// 4. Send to recipient
await this.wsNotificationService.emitToUser(
data.recipientUserId,
WS_EVENT.INTERACTION_RECEIVED,
payload,
);
}
}

View File

@@ -6,7 +6,6 @@ import {
SubscribeMessage, SubscribeMessage,
WebSocketGateway, WebSocketGateway,
WebSocketServer, WebSocketServer,
WsException,
} from '@nestjs/websockets'; } from '@nestjs/websockets';
import Redis from 'ioredis'; import Redis from 'ioredis';
import type { Server } from 'socket.io'; import type { Server } from 'socket.io';
@@ -19,14 +18,18 @@ import { JwtVerificationService } from '../../auth/services/jwt-verification.ser
import { CursorPositionDto } from '../dto/cursor-position.dto'; import { CursorPositionDto } from '../dto/cursor-position.dto';
import { UserStatusDto } from '../dto/user-status.dto'; import { UserStatusDto } from '../dto/user-status.dto';
import { SendInteractionDto } from '../dto/send-interaction.dto'; import { SendInteractionDto } from '../dto/send-interaction.dto';
import { InteractionPayloadDto } from '../dto/interaction-payload.dto';
import { PrismaService } from '../../database/prisma.service'; import { PrismaService } from '../../database/prisma.service';
import { UserSocketService } from './user-socket.service'; import { UserSocketService } from './user-socket.service';
import { WsNotificationService } from './ws-notification.service'; import { WsNotificationService } from './ws-notification.service';
import { WS_EVENT, REDIS_CHANNEL } from './ws-events'; import { WS_EVENT, REDIS_CHANNEL } from './ws-events';
import { UsersService } from '../../users/users.service'; import { UsersService } from '../../users/users.service';
import { ConnectionHandler } from './connection/handler';
const USER_STATUS_BROADCAST_THROTTLING_MS = 200; import { CursorHandler } from './cursor/handler';
import { StatusHandler } from './status/handler';
import { InteractionHandler } from './interaction/handler';
import { RedisHandler } from './utils/redis-handler';
import { Broadcaster } from './utils/broadcasting';
import { Throttler } from './utils/throttling';
@WebSocketGateway({ @WebSocketGateway({
cors: { cors: {
@@ -38,10 +41,17 @@ export class StateGateway
implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect
{ {
private readonly logger = new Logger(StateGateway.name); private readonly logger = new Logger(StateGateway.name);
private lastBroadcastMap: Map<string, number> = new Map();
@WebSocketServer() io: Server; @WebSocketServer() io: Server;
private readonly throttler = new Throttler();
private readonly broadcaster: Broadcaster;
private readonly redisHandler: RedisHandler;
private readonly connectionHandler: ConnectionHandler;
private readonly cursorHandler: CursorHandler;
private readonly statusHandler: StatusHandler;
private readonly interactionHandler: InteractionHandler;
constructor( constructor(
private readonly jwtVerificationService: JwtVerificationService, private readonly jwtVerificationService: JwtVerificationService,
private readonly prisma: PrismaService, private readonly prisma: PrismaService,
@@ -52,6 +62,27 @@ export class StateGateway
@Inject(REDIS_SUBSCRIBER_CLIENT) @Inject(REDIS_SUBSCRIBER_CLIENT)
private readonly redisSubscriber: Redis | null, private readonly redisSubscriber: Redis | null,
) { ) {
this.broadcaster = new Broadcaster(
this.userSocketService,
this.wsNotificationService,
);
this.redisHandler = new RedisHandler(this.wsNotificationService);
this.connectionHandler = new ConnectionHandler(
this.jwtVerificationService,
this.prisma,
this.usersService,
this.userSocketService,
this.wsNotificationService,
this.logger,
);
this.cursorHandler = new CursorHandler(this.broadcaster, this.throttler);
this.statusHandler = new StatusHandler(this.broadcaster, this.throttler);
this.interactionHandler = new InteractionHandler(
this.prisma,
this.userSocketService,
this.wsNotificationService,
);
// Setup Redis subscription for cross-instance communication // Setup Redis subscription for cross-instance communication
if (this.redisSubscriber) { if (this.redisSubscriber) {
this.redisSubscriber this.redisSubscriber
@@ -73,10 +104,10 @@ export class StateGateway
this.redisSubscriber.on('message', (channel, message) => { this.redisSubscriber.on('message', (channel, message) => {
if (channel === REDIS_CHANNEL.ACTIVE_DOLL_UPDATE) { if (channel === REDIS_CHANNEL.ACTIVE_DOLL_UPDATE) {
// eslint-disable-next-line @typescript-eslint/no-floating-promises // eslint-disable-next-line @typescript-eslint/no-floating-promises
this.handleActiveDollUpdateMessage(message); this.redisHandler.handleActiveDollUpdateMessage(message);
} else if (channel === REDIS_CHANNEL.FRIEND_CACHE_UPDATE) { } else if (channel === REDIS_CHANNEL.FRIEND_CACHE_UPDATE) {
// eslint-disable-next-line @typescript-eslint/no-floating-promises // eslint-disable-next-line @typescript-eslint/no-floating-promises
this.handleFriendCacheUpdateMessage(message); this.redisHandler.handleFriendCacheUpdateMessage(message);
} }
}); });
} }
@@ -87,199 +118,22 @@ export class StateGateway
this.wsNotificationService.setIo(this.io); this.wsNotificationService.setIo(this.io);
} }
private async handleActiveDollUpdateMessage(message: string) {
try {
const data = JSON.parse(message) as {
userId: string;
dollId: string | null;
};
const { userId, dollId } = data;
await this.wsNotificationService.updateActiveDollCache(userId, dollId);
} catch (error) {
this.logger.error('Error handling active doll update message', error);
}
}
private async handleFriendCacheUpdateMessage(message: string) {
try {
const data = JSON.parse(message) as {
userId: string;
friendId: string;
action: 'add' | 'delete';
};
const { userId, friendId, action } = data;
await this.wsNotificationService.updateFriendsCacheLocal(
userId,
friendId,
action,
);
} catch (error) {
this.logger.error('Error handling friend cache update message', error);
}
}
handleConnection(client: AuthenticatedSocket) { handleConnection(client: AuthenticatedSocket) {
try { this.connectionHandler.handleConnection(client);
this.logger.debug(
`Connection attempt - handshake auth: ${JSON.stringify(client.handshake.auth)}`,
);
this.logger.debug(
`Connection attempt - handshake headers: ${JSON.stringify(client.handshake.headers)}`,
);
const token = this.jwtVerificationService.extractToken(client.handshake);
if (!token) {
this.logger.warn('WebSocket connection attempt without token');
client.disconnect();
return;
}
const payload = this.jwtVerificationService.verifyToken(token);
if (!payload.sub) {
throw new WsException('Invalid token: missing subject');
}
client.data.user = {
userId: payload.sub,
email: payload.email,
roles: payload.roles,
};
// Initialize defaults
client.data.activeDollId = null;
client.data.friends = new Set();
// userId is not set yet, it will be set in handleClientInitialize
this.logger.log(`WebSocket authenticated (Pending Init): ${payload.sub}`);
const { sockets } = this.io.sockets;
this.logger.log(
`Client id: ${client.id} connected (user: ${payload.sub})`,
);
this.logger.debug(`Number of connected clients: ${sockets.size}`);
} catch (error: unknown) {
const errorMessage =
error instanceof Error ? error.message : 'Unknown error';
this.logger.error(`Connection error: ${errorMessage}`);
client.disconnect();
}
} }
@SubscribeMessage(WS_EVENT.CLIENT_INITIALIZE) @SubscribeMessage(WS_EVENT.CLIENT_INITIALIZE)
async handleClientInitialize(client: AuthenticatedSocket) { async handleClientInitialize(client: AuthenticatedSocket) {
try { await this.connectionHandler.handleClientInitialize(client);
let userTokenData = client.data.user;
if (!userTokenData) {
this.logger.warn(
'No user data found during initialization - attempting handshake token verification',
);
const token = this.jwtVerificationService.extractToken(
client.handshake,
);
if (!token) {
throw new WsException('Unauthorized: No user data found');
}
const payload = this.jwtVerificationService.verifyToken(token);
if (!payload.sub) {
throw new WsException('Invalid token: missing subject');
}
userTokenData = {
userId: payload.sub,
email: payload.email,
roles: payload.roles,
};
client.data.user = userTokenData;
// Ensure defaults exist if this path runs on reconnect
client.data.activeDollId = client.data.activeDollId ?? null;
client.data.friends = client.data.friends ?? new Set();
this.logger.log(
`WebSocket authenticated via initialize fallback (Pending Init): ${payload.sub}`,
);
}
const user = await this.usersService.findOne(userTokenData.userId);
// 2. Register socket mapping (Redis Write)
await this.userSocketService.setSocket(user.id, client.id);
client.data.userId = user.id;
// 3. Fetch initial state (DB Read)
const [userWithDoll, friends] = await Promise.all([
this.prisma.user.findUnique({
where: { id: user.id },
select: { activeDollId: true },
}),
this.prisma.friendship.findMany({
where: { userId: user.id },
select: { friendId: true },
}),
]);
client.data.activeDollId = userWithDoll?.activeDollId || null;
client.data.friends = new Set(friends.map((f) => f.friendId));
this.logger.log(`Client initialized: ${user.id} (${client.id})`);
// 4. Notify client
client.emit(WS_EVENT.INITIALIZED, {
userId: user.id,
activeDollId: client.data.activeDollId,
});
} catch (error) {
const errorMessage =
error instanceof Error ? error.message : String(error);
this.logger.error(`Initialization error: ${errorMessage}`);
client.emit('auth-error', { message: errorMessage });
client.disconnect();
}
} }
async handleDisconnect(client: AuthenticatedSocket) { async handleDisconnect(client: AuthenticatedSocket) {
const user = client.data.user; await this.connectionHandler.handleDisconnect(client);
// Remove from throttler
if (user) { const userId = client.data.userId;
const userId = client.data.userId; if (userId) {
this.throttler.remove(userId);
if (userId) {
// Check if this socket is still the active one for the user
const currentSocketId = await this.userSocketService.getSocket(userId);
if (currentSocketId === client.id) {
await this.userSocketService.removeSocket(userId);
this.lastBroadcastMap.delete(userId);
// Notify friends that this user has disconnected
const friends = client.data.friends;
if (friends) {
const friendIds = Array.from(friends);
const friendSockets =
await this.userSocketService.getFriendsSockets(friendIds);
for (const { socketId } of friendSockets) {
this.wsNotificationService.emitToSocket(
socketId,
WS_EVENT.FRIEND_DISCONNECTED,
{
userId: userId,
},
);
}
}
}
}
// If userId is undefined, client never initialized, so no cleanup needed
} }
this.logger.log(
`Client id: ${client.id} disconnected (user: ${user?.userId || 'unknown'})`,
);
} }
async isUserOnline(userId: string): Promise<boolean> { async isUserOnline(userId: string): Promise<boolean> {
@@ -291,50 +145,7 @@ export class StateGateway
client: AuthenticatedSocket, client: AuthenticatedSocket,
data: CursorPositionDto, data: CursorPositionDto,
) { ) {
const user = client.data.user; await this.cursorHandler.handleCursorReportPosition(client, data);
if (!user) {
throw new WsException('Unauthorized');
}
const currentUserId = client.data.userId;
if (!currentUserId) {
// User has not initialized yet
return;
}
// Do not broadcast cursor position if user has no active doll
if (!client.data.activeDollId) {
return;
}
const now = Date.now();
const lastBroadcast = this.lastBroadcastMap.get(currentUserId) || 0;
if (now - lastBroadcast < 100) {
return;
}
this.lastBroadcastMap.set(currentUserId, now);
// Broadcast to online friends
const friends = client.data.friends;
if (friends) {
const friendIds = Array.from(friends);
const friendSockets =
await this.userSocketService.getFriendsSockets(friendIds);
for (const { socketId } of friendSockets) {
const payload = {
userId: currentUserId,
position: data,
};
this.wsNotificationService.emitToSocket(
socketId,
WS_EVENT.FRIEND_CURSOR_POSITION,
payload,
);
}
}
} }
@SubscribeMessage(WS_EVENT.CLIENT_REPORT_USER_STATUS) @SubscribeMessage(WS_EVENT.CLIENT_REPORT_USER_STATUS)
@@ -342,60 +153,7 @@ export class StateGateway
client: AuthenticatedSocket, client: AuthenticatedSocket,
data: UserStatusDto, data: UserStatusDto,
) { ) {
const user = client.data.user; await this.statusHandler.handleClientReportUserStatus(client, data);
if (!user) {
throw new WsException('Unauthorized');
}
const currentUserId = client.data.userId;
if (!currentUserId) {
// User has not initialized yet
return;
}
// Do not broadcast user status if user has no active doll
if (!client.data.activeDollId) {
return;
}
const now = Date.now();
const lastBroadcast = this.lastBroadcastMap.get(currentUserId) || 0;
if (now - lastBroadcast < USER_STATUS_BROADCAST_THROTTLING_MS) {
return;
}
this.lastBroadcastMap.set(currentUserId, now);
// Broadcast to online friends
const friends = client.data.friends;
if (friends) {
const friendIds = Array.from(friends);
try {
const friendSockets =
await this.userSocketService.getFriendsSockets(friendIds);
for (const { socketId } of friendSockets) {
const payload = {
userId: currentUserId,
status: data,
};
this.wsNotificationService.emitToSocket(
socketId,
WS_EVENT.FRIEND_USER_STATUS,
payload,
);
}
this.logger.debug(
`Broadcasted user status to ${friendSockets.length} friends for user ${currentUserId}`,
);
} catch (error) {
this.logger.error(
`Failed to broadcast user status for user ${currentUserId}: ${(error as Error).message}`,
(error as Error).stack,
);
}
}
} }
@SubscribeMessage(WS_EVENT.CLIENT_SEND_INTERACTION) @SubscribeMessage(WS_EVENT.CLIENT_SEND_INTERACTION)
@@ -403,55 +161,6 @@ export class StateGateway
client: AuthenticatedSocket, client: AuthenticatedSocket,
data: SendInteractionDto, data: SendInteractionDto,
) { ) {
const user = client.data.user; await this.interactionHandler.handleSendInteraction(client, data);
const currentUserId = client.data.userId;
if (!user || !currentUserId) {
throw new WsException('Unauthorized: User not initialized');
}
// 1. Verify recipient is a friend
const friends = client.data.friends;
if (!friends || !friends.has(data.recipientUserId)) {
client.emit(WS_EVENT.INTERACTION_DELIVERY_FAILED, {
recipientUserId: data.recipientUserId,
reason: 'Recipient is not a friend',
});
return;
}
// 2. Check if recipient is online
const isOnline = await this.userSocketService.isUserOnline(
data.recipientUserId,
);
if (!isOnline) {
client.emit(WS_EVENT.INTERACTION_DELIVERY_FAILED, {
recipientUserId: data.recipientUserId,
reason: 'Recipient is offline',
});
return;
}
// 3. Construct payload
const sender = await this.prisma.user.findUnique({
where: { id: currentUserId },
select: { name: true, username: true },
});
const senderName = sender?.name || sender?.username || 'Unknown';
const payload: InteractionPayloadDto = {
senderUserId: currentUserId,
senderName,
content: data.content,
type: data.type,
timestamp: new Date().toISOString(),
};
// 4. Send to recipient
await this.wsNotificationService.emitToUser(
data.recipientUserId,
WS_EVENT.INTERACTION_RECEIVED,
payload,
);
} }
} }

View File

@@ -0,0 +1,70 @@
import { Logger } from '@nestjs/common';
import type { AuthenticatedSocket } from '../../../types/socket';
import { UserStatusDto } from '../../dto/user-status.dto';
import { WS_EVENT } from '../ws-events';
import { Validator } from '../utils/validation';
import { Throttler } from '../utils/throttling';
import { Broadcaster } from '../utils/broadcasting';
const USER_STATUS_BROADCAST_THROTTLING_MS = 200;
export class StatusHandler {
private readonly logger = new Logger(StatusHandler.name);
constructor(
private readonly broadcaster: Broadcaster,
private readonly throttler: Throttler,
) {}
async handleClientReportUserStatus(
client: AuthenticatedSocket,
data: UserStatusDto,
) {
Validator.validateUser(client);
const currentUserId = client.data.userId;
if (!currentUserId) {
// User has not initialized yet
return;
}
// Do not broadcast user status if user has no active doll
if (!client.data.activeDollId) {
return;
}
if (
this.throttler.isThrottled(
currentUserId,
USER_STATUS_BROADCAST_THROTTLING_MS,
)
) {
return;
}
// Broadcast to online friends
const friends = client.data.friends;
if (friends) {
try {
const payload = {
userId: currentUserId,
status: data,
};
await this.broadcaster.broadcastToFriends(
friends,
WS_EVENT.FRIEND_USER_STATUS,
payload,
);
this.logger.debug(
`Broadcasted user status to friends for user ${currentUserId}`,
);
} catch (error) {
this.logger.error(
`Failed to broadcast user status for user ${currentUserId}: ${(error as Error).message}`,
(error as Error).stack,
);
}
}
}
}

View File

@@ -0,0 +1,19 @@
import { UserSocketService } from '../user-socket.service';
import { WsNotificationService } from '../ws-notification.service';
export class Broadcaster {
constructor(
private readonly userSocketService: UserSocketService,
private readonly wsNotificationService: WsNotificationService,
) {}
async broadcastToFriends(friends: Set<string>, event: string, payload: any) {
const friendIds = Array.from(friends);
const friendSockets =
await this.userSocketService.getFriendsSockets(friendIds);
for (const { socketId } of friendSockets) {
this.wsNotificationService.emitToSocket(socketId, event, payload);
}
}
}

View File

@@ -0,0 +1,39 @@
import { Logger } from '@nestjs/common';
import { WsNotificationService } from '../ws-notification.service';
export class RedisHandler {
private readonly logger = new Logger(RedisHandler.name);
constructor(private readonly wsNotificationService: WsNotificationService) {}
async handleActiveDollUpdateMessage(message: string): Promise<void> {
try {
const data = JSON.parse(message) as {
userId: string;
dollId: string | null;
};
const { userId, dollId } = data;
await this.wsNotificationService.updateActiveDollCache(userId, dollId);
} catch (error) {
this.logger.error('Error handling active doll update message', error);
}
}
async handleFriendCacheUpdateMessage(message: string): Promise<void> {
try {
const data = JSON.parse(message) as {
userId: string;
friendId: string;
action: 'add' | 'delete';
};
const { userId, friendId, action } = data;
await this.wsNotificationService.updateFriendsCacheLocal(
userId,
friendId,
action,
);
} catch (error) {
this.logger.error('Error handling friend cache update message', error);
}
}
}

View File

@@ -0,0 +1,17 @@
export class Throttler {
private lastBroadcastMap: Map<string, number> = new Map();
isThrottled(userId: string, throttleMs: number): boolean {
const now = Date.now();
const lastBroadcast = this.lastBroadcastMap.get(userId) || 0;
if (now - lastBroadcast < throttleMs) {
return true;
}
this.lastBroadcastMap.set(userId, now);
return false;
}
remove(userId: string) {
this.lastBroadcastMap.delete(userId);
}
}

View File

@@ -0,0 +1,24 @@
import { WsException } from '@nestjs/websockets';
import type { AuthenticatedSocket } from '../../../types/socket';
export class Validator {
static validateUser(client: AuthenticatedSocket): void {
if (!client.data.user) {
throw new WsException('Unauthorized');
}
}
static validateInitialized(client: AuthenticatedSocket): string {
const userId = client.data.userId;
if (!userId) {
throw new WsException('User not initialized');
}
return userId;
}
static validateActiveDoll(client: AuthenticatedSocket): void {
if (!client.data.activeDollId) {
throw new WsException('No active doll');
}
}
}