Minor refinements & tuning to system structure

This commit is contained in:
2025-12-23 15:57:05 +08:00
parent 6c63f2d803
commit d06a58cf93
8 changed files with 394 additions and 87 deletions

View File

@@ -1,4 +1,4 @@
import { Logger } from '@nestjs/common';
import { Logger, Inject } from '@nestjs/common';
import {
OnGatewayConnection,
OnGatewayDisconnect,
@@ -9,8 +9,12 @@ import {
WsException,
} from '@nestjs/websockets';
import { OnEvent } from '@nestjs/event-emitter';
import Redis from 'ioredis';
import type { Server } from 'socket.io';
import {
REDIS_CLIENT,
REDIS_SUBSCRIBER_CLIENT,
} from '../../database/redis.module';
import type { AuthenticatedSocket } from '../../types/socket';
import { AuthService } from '../../auth/auth.service';
import { JwtVerificationService } from '../../auth/services/jwt-verification.service';
@@ -19,6 +23,7 @@ import { PrismaService } from '../../database/prisma.service';
import { UserSocketService } from './user-socket.service';
import { FriendEvents } from '../../friends/events/friend.events';
import type {
FriendRequestReceivedEvent,
FriendRequestAcceptedEvent,
@@ -37,6 +42,8 @@ import { UserEvents } from '../../users/events/user.events';
import type { UserActiveDollChangedEvent } from '../../users/events/user.events';
const WS_EVENT = {
CLIENT_INITIALIZE: 'client-initialize',
INITIALIZED: 'initialized',
CURSOR_REPORT_POSITION: 'cursor-report-position',
FRIEND_REQUEST_RECEIVED: 'friend-request-received',
FRIEND_REQUEST_ACCEPTED: 'friend-request-accepted',
@@ -50,6 +57,10 @@ const WS_EVENT = {
FRIEND_ACTIVE_DOLL_CHANGED: 'friend-active-doll-changed',
} as const;
const REDIS_CHANNEL = {
ACTIVE_DOLL_UPDATE: 'active-doll-update',
} as const;
@WebSocketGateway({
cors: {
origin: true,
@@ -69,12 +80,78 @@ export class StateGateway
private readonly jwtVerificationService: JwtVerificationService,
private readonly prisma: PrismaService,
private readonly userSocketService: UserSocketService,
) {}
@Inject(REDIS_CLIENT) private readonly redisClient: Redis | null,
@Inject(REDIS_SUBSCRIBER_CLIENT)
private readonly redisSubscriber: Redis | null,
) {
// Setup Redis subscription for cross-instance communication
if (this.redisSubscriber) {
this.redisSubscriber
.subscribe(REDIS_CHANNEL.ACTIVE_DOLL_UPDATE, (err) => {
if (err) {
this.logger.error(
`Failed to subscribe to ${REDIS_CHANNEL.ACTIVE_DOLL_UPDATE}`,
err,
);
} else {
this.logger.log(
`Subscribed to ${REDIS_CHANNEL.ACTIVE_DOLL_UPDATE} channel`,
);
}
})
.catch((err) => {
this.logger.error(
`Error subscribing to ${REDIS_CHANNEL.ACTIVE_DOLL_UPDATE}`,
err,
);
});
this.redisSubscriber.on('message', (channel, message) => {
if (channel === REDIS_CHANNEL.ACTIVE_DOLL_UPDATE) {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
this.handleActiveDollUpdateMessage(message);
}
});
}
}
afterInit() {
this.logger.log('Initialized');
}
private async handleActiveDollUpdateMessage(message: string) {
try {
const data = JSON.parse(message) as {
userId: string;
dollId: string | null;
};
const { userId, dollId } = data;
// Check if the user is connected to THIS instance
// Note: We need a local way to check if we hold the socket connection.
// io.sockets.sockets is a Map of all connected sockets on this server instance.
// We first get the socket ID from the shared store (UserSocketService)
// to see which socket ID belongs to the user.
const socketId = await this.userSocketService.getSocket(userId);
if (socketId) {
// Now check if we actually have this socket locally
const localSocket = this.io.sockets.sockets.get(socketId);
if (localSocket) {
// We own this connection! Update the local state.
const authSocket = localSocket as AuthenticatedSocket;
authSocket.data.activeDollId = dollId;
this.logger.debug(
`Updated activeDollId locally for user ${userId} to ${dollId}`,
);
}
}
} catch (error) {
this.logger.error('Error handling redis message', error);
}
}
async handleConnection(client: AuthenticatedSocket) {
try {
this.logger.debug(
@@ -106,25 +183,12 @@ export class StateGateway
picture: payload.picture,
};
this.logger.log(`WebSocket authenticated: ${payload.sub}`);
// Initialize defaults
client.data.activeDollId = null;
client.data.friends = new Set();
// userId is not set yet, it will be set in handleClientInitialize
const user = await this.authService.syncUserFromToken(client.data.user);
await this.userSocketService.setSocket(user.id, client.id);
client.data.userId = user.id;
// Sync active doll state to socket
const userWithDoll = await this.prisma.user.findUnique({
where: { id: user.id },
select: { activeDollId: true },
});
client.data.activeDollId = userWithDoll?.activeDollId || null;
// Initialize friends cache using Prisma directly
const friends = await this.prisma.friendship.findMany({
where: { userId: user.id },
select: { friendId: true },
});
client.data.friends = new Set(friends.map((f) => f.friendId));
this.logger.log(`WebSocket authenticated (Pending Init): ${payload.sub}`);
const { sockets } = this.io.sockets;
this.logger.log(
@@ -139,6 +203,49 @@ export class StateGateway
}
}
@SubscribeMessage(WS_EVENT.CLIENT_INITIALIZE)
async handleClientInitialize(client: AuthenticatedSocket) {
try {
const userTokenData = client.data.user;
if (!userTokenData) {
throw new WsException('Unauthorized: No user data found');
}
// 1. Sync user from token (DB Write/Read)
const user = await this.authService.syncUserFromToken(userTokenData);
// 2. Register socket mapping (Redis Write)
await this.userSocketService.setSocket(user.id, client.id);
client.data.userId = user.id;
// 3. Fetch initial state (DB Read)
const [userWithDoll, friends] = await Promise.all([
this.prisma.user.findUnique({
where: { id: user.id },
select: { activeDollId: true },
}),
this.prisma.friendship.findMany({
where: { userId: user.id },
select: { friendId: true },
}),
]);
client.data.activeDollId = userWithDoll?.activeDollId || null;
client.data.friends = new Set(friends.map((f) => f.friendId));
this.logger.log(`Client initialized: ${user.id} (${client.id})`);
// 4. Notify client
client.emit(WS_EVENT.INITIALIZED, {
userId: user.id,
activeDollId: client.data.activeDollId,
});
} catch (error) {
this.logger.error(`Initialization error: ${error}`);
client.disconnect();
}
}
async handleDisconnect(client: AuthenticatedSocket) {
const user = client.data.user;
@@ -167,9 +274,7 @@ export class StateGateway
}
}
}
// Note: We can't iterate over Redis keys easily to find socketId match without userId
// The previous fallback loop over map entries is not efficient with Redis.
// We rely on client.data.userId being set correctly during connection.
// If userId is undefined, client never initialized, so no cleanup needed
}
this.logger.log(
@@ -194,13 +299,13 @@ export class StateGateway
const currentUserId = client.data.userId;
// Do not broadcast cursor position if user has no active doll
if (!client.data.activeDollId) {
if (!currentUserId) {
// User has not initialized yet
return;
}
if (!currentUserId) {
this.logger.warn(`Could not find user ID for client ${client.id}`);
// Do not broadcast cursor position if user has no active doll
if (!client.data.activeDollId) {
return;
}
@@ -408,14 +513,23 @@ export class StateGateway
async handleActiveDollChanged(payload: UserActiveDollChangedEvent) {
const { userId, dollId, doll } = payload;
// 1. Update the user's socket data to reflect the change
const socketId = await this.userSocketService.getSocket(userId);
if (socketId) {
const userSocket = this.io.sockets.sockets.get(
socketId,
) as AuthenticatedSocket;
if (userSocket) {
userSocket.data.activeDollId = dollId;
// 1. Publish update to all instances via Redis so they can update local socket state
if (this.redisClient) {
await this.redisClient.publish(
REDIS_CHANNEL.ACTIVE_DOLL_UPDATE,
JSON.stringify({ userId, dollId }),
);
} else {
// Fallback for single instance (no redis) - update locally directly
// This mimics what handleActiveDollUpdateMessage does
const socketId = await this.userSocketService.getSocket(userId);
if (socketId) {
const userSocket = this.io.sockets.sockets.get(
socketId,
) as AuthenticatedSocket;
if (userSocket) {
userSocket.data.activeDollId = dollId;
}
}
}