From 4151984b5cb89feb05f56d8b80b9a79b7428c919 Mon Sep 17 00:00:00 2001 From: Wind-Explorer Date: Mon, 30 Mar 2026 23:48:18 +0800 Subject: [PATCH] redis pt 4: presence & more cache invalidation --- .env.example | 3 + src/app.module.ts | 2 + src/auth/auth.service.ts | 2 + src/types/socket.d.ts | 1 + src/users/events/user.events.ts | 5 + src/users/users.service.ts | 5 + src/ws/state/connection/handler.ts | 7 +- src/ws/state/cursor/handler.ts | 1 + src/ws/state/interaction/handler.ts | 2 + src/ws/state/state.gateway.ts | 39 ++++++ src/ws/state/status/handler.ts | 1 + src/ws/state/user-socket.service.ts | 179 ++++++++++++++++++++++-- src/ws/state/utils/broadcasting.ts | 5 + src/ws/state/utils/redis-handler.ts | 16 +++ src/ws/state/ws-events.ts | 1 + src/ws/state/ws-notification.service.ts | 72 +++++++++- 16 files changed, 328 insertions(+), 13 deletions(-) diff --git a/.env.example b/.env.example index 1eb4a58..882e48a 100644 --- a/.env.example +++ b/.env.example @@ -12,6 +12,9 @@ REDIS_PORT=6379 REDIS_REQUIRED=false REDIS_CONNECT_TIMEOUT_MS=5000 REDIS_STARTUP_RETRIES=10 +# Stale presence cleanup threshold and interval +PRESENCE_STALE_AGE_MS=604800000 +PRESENCE_CLEANUP_INTERVAL_MS=300000 # Cache CACHE_KEY_PREFIX=friendolls diff --git a/src/app.module.ts b/src/app.module.ts index c450f00..8b14609 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -89,6 +89,8 @@ function validateEnvironment( validateOptionalPositiveNumber(config, 'CACHE_MAX_TTL_SECONDS'); validateOptionalPositiveNumber(config, 'CACHE_METRICS_LOG_INTERVAL_MS'); validateOptionalPositiveNumber(config, 'CACHE_TAG_MAX_ENTRIES'); + validateOptionalPositiveNumber(config, 'PRESENCE_STALE_AGE_MS'); + validateOptionalPositiveNumber(config, 'PRESENCE_CLEANUP_INTERVAL_MS'); validateOptionalProvider(config, 'GOOGLE'); validateOptionalProvider(config, 'DISCORD'); diff --git a/src/auth/auth.service.ts b/src/auth/auth.service.ts index b76e45f..711f5e3 100644 --- a/src/auth/auth.service.ts +++ b/src/auth/auth.service.ts @@ -292,6 +292,7 @@ export class AuthService { this.eventEmitter.emit(UserEvents.SEARCH_INDEX_INVALIDATED, { userId: user.id, }); + this.eventEmitter.emit(UserEvents.PROFILE_UPDATED, { userId: user.id }); return user; } @@ -354,6 +355,7 @@ export class AuthService { this.eventEmitter.emit(UserEvents.SEARCH_INDEX_INVALIDATED, { userId: user.id, }); + this.eventEmitter.emit(UserEvents.PROFILE_UPDATED, { userId: user.id }); return user; } diff --git a/src/types/socket.d.ts b/src/types/socket.d.ts index 8d028c7..69c138c 100644 --- a/src/types/socket.d.ts +++ b/src/types/socket.d.ts @@ -13,5 +13,6 @@ export type AuthenticatedSocket = BaseSocket< friends?: Set; // Set of friend user IDs senderName?: string; senderNameCachedAt?: number; + lastSeenAt?: number; } >; diff --git a/src/users/events/user.events.ts b/src/users/events/user.events.ts index 0e7835c..0b45f09 100644 --- a/src/users/events/user.events.ts +++ b/src/users/events/user.events.ts @@ -3,6 +3,7 @@ import { Doll } from '@prisma/client'; export const UserEvents = { ACTIVE_DOLL_CHANGED: 'user.active-doll.changed', SEARCH_INDEX_INVALIDATED: 'user.search-index.invalidated', + PROFILE_UPDATED: 'user.profile.updated', } as const; export interface UserActiveDollChangedEvent { @@ -14,3 +15,7 @@ export interface UserActiveDollChangedEvent { export interface UserSearchIndexInvalidatedEvent { userId?: string; } + +export interface UserProfileUpdatedEvent { + userId: string; +} diff --git a/src/users/users.service.ts b/src/users/users.service.ts index b59ad71..ff2d690 100644 --- a/src/users/users.service.ts +++ b/src/users/users.service.ts @@ -104,6 +104,7 @@ export class UsersService { }); this.eventEmitter.emit(UserEvents.SEARCH_INDEX_INVALIDATED, { userId: id }); + this.eventEmitter.emit(UserEvents.PROFILE_UPDATED, { userId: id }); this.logger.log(`User ${id} profile update requested`); @@ -135,6 +136,7 @@ export class UsersService { }); this.eventEmitter.emit(UserEvents.SEARCH_INDEX_INVALIDATED, { userId: id }); + this.eventEmitter.emit(UserEvents.PROFILE_UPDATED, { userId: id }); this.logger.log(`User ${id} deleted their account`); } @@ -318,6 +320,7 @@ export class UsersService { this.eventEmitter.emit(UserEvents.SEARCH_INDEX_INVALIDATED, { userId: user.id, }); + this.eventEmitter.emit(UserEvents.PROFILE_UPDATED, { userId: user.id }); return user; } @@ -330,6 +333,8 @@ export class UsersService { where: { id: userId }, data: { passwordHash } as unknown as Prisma.UserUpdateInput, }); + + this.eventEmitter.emit(UserEvents.PROFILE_UPDATED, { userId }); } async updateLastLogin(userId: string): Promise { diff --git a/src/ws/state/connection/handler.ts b/src/ws/state/connection/handler.ts index 706759b..237cbbe 100644 --- a/src/ws/state/connection/handler.ts +++ b/src/ws/state/connection/handler.ts @@ -116,7 +116,9 @@ export class ConnectionHandler { // 3. Register socket mapping (Redis Write) await this.userSocketService.setSocket(userState.id, client.id); + await this.userSocketService.touchLastSeen(userState.id); client.data.userId = userState.id; + client.data.lastSeenAt = Date.now(); client.data.activeDollId = userState.activeDollId || null; client.data.friends = new Set(friends.map((f) => f.friendId)); @@ -149,7 +151,8 @@ export class ConnectionHandler { // Check if this socket is still the active one for the user const currentSocketId = await this.userSocketService.getSocket(userId); if (currentSocketId === client.id) { - await this.userSocketService.removeSocket(userId); + await this.userSocketService.removeSocket(userId, client.id); + await this.userSocketService.touchLastSeen(userId); // Note: throttling remove is done in gateway // Notify friends that this user has disconnected @@ -179,5 +182,7 @@ export class ConnectionHandler { this.logger.log( `Client id: ${client.id} disconnected (user: ${user?.userId || 'unknown'})`, ); + + await this.userSocketService.removeSocketById(client.id); } } diff --git a/src/ws/state/cursor/handler.ts b/src/ws/state/cursor/handler.ts index bb24f41..46c3d07 100644 --- a/src/ws/state/cursor/handler.ts +++ b/src/ws/state/cursor/handler.ts @@ -41,6 +41,7 @@ export class CursorHandler { // Broadcast to online friends const friends = client.data.friends; if (friends) { + await this.broadcaster.touchPresence(client); const payload = { userId: currentUserId, position: data, diff --git a/src/ws/state/interaction/handler.ts b/src/ws/state/interaction/handler.ts index 66942c9..a67e844 100644 --- a/src/ws/state/interaction/handler.ts +++ b/src/ws/state/interaction/handler.ts @@ -50,6 +50,8 @@ export class InteractionHandler { client: AuthenticatedSocket, data: SendInteractionDto, ) { + await this.wsNotificationService.maybeTouchPresence(client); + const user = client.data.user; const currentUserId = Validator.validateInitialized(client); diff --git a/src/ws/state/state.gateway.ts b/src/ws/state/state.gateway.ts index 7c74aaf..5311280 100644 --- a/src/ws/state/state.gateway.ts +++ b/src/ws/state/state.gateway.ts @@ -29,6 +29,11 @@ import { InteractionHandler } from './interaction/handler'; import { RedisHandler } from './utils/redis-handler'; import { Broadcaster } from './utils/broadcasting'; import { Throttler } from './utils/throttling'; +import { ConfigService } from '@nestjs/config'; +import { parsePositiveInteger } from '../../common/config/env.utils'; + +const DEFAULT_PRESENCE_STALE_AGE_MS = 7 * 24 * 60 * 60 * 1000; +const DEFAULT_PRESENCE_CLEANUP_INTERVAL_MS = 5 * 60 * 1000; @WebSocketGateway() export class StateGateway @@ -49,12 +54,16 @@ export class StateGateway private readonly cursorHandler: CursorHandler; private readonly statusHandler: StatusHandler; private readonly interactionHandler: InteractionHandler; + private readonly presenceStaleAgeMs: number; + private readonly presenceCleanupIntervalMs: number; + private presenceCleanupTimer: NodeJS.Timeout | null = null; constructor( private readonly jwtVerificationService: JwtVerificationService, private readonly prisma: PrismaService, private readonly userSocketService: UserSocketService, private readonly wsNotificationService: WsNotificationService, + private readonly configService: ConfigService, @Inject(REDIS_CLIENT) private readonly redisClient: Redis | null, @Inject(REDIS_SUBSCRIBER_CLIENT) private readonly redisSubscriber: Redis | null, @@ -78,6 +87,14 @@ export class StateGateway this.userSocketService, this.wsNotificationService, ); + this.presenceStaleAgeMs = parsePositiveInteger( + this.configService.get('PRESENCE_STALE_AGE_MS'), + DEFAULT_PRESENCE_STALE_AGE_MS, + ); + this.presenceCleanupIntervalMs = parsePositiveInteger( + this.configService.get('PRESENCE_CLEANUP_INTERVAL_MS'), + DEFAULT_PRESENCE_CLEANUP_INTERVAL_MS, + ); // Setup Redis subscription for cross-instance communication if (this.redisSubscriber) { @@ -85,6 +102,7 @@ export class StateGateway .subscribe( REDIS_CHANNEL.ACTIVE_DOLL_UPDATE, REDIS_CHANNEL.FRIEND_CACHE_UPDATE, + REDIS_CHANNEL.USER_PROFILE_CACHE_INVALIDATE, (err) => { if (err) { this.logger.error(`Failed to subscribe to Redis channels`, err); @@ -104,6 +122,9 @@ export class StateGateway } else if (channel === REDIS_CHANNEL.FRIEND_CACHE_UPDATE) { // eslint-disable-next-line @typescript-eslint/no-floating-promises this.redisHandler.handleFriendCacheUpdateMessage(message); + } else if (channel === REDIS_CHANNEL.USER_PROFILE_CACHE_INVALIDATE) { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + this.redisHandler.handleUserProfileCacheInvalidateMessage(message); } }); } @@ -112,6 +133,11 @@ export class StateGateway afterInit() { this.logger.log('Initialized'); this.wsNotificationService.setIo(this.io); + + this.presenceCleanupTimer = setInterval(() => { + void this.cleanupStalePresence(); + }, this.presenceCleanupIntervalMs); + this.presenceCleanupTimer.unref(); } handleConnection(client: AuthenticatedSocket) { @@ -164,5 +190,18 @@ export class StateGateway if (this.redisSubscriber) { this.redisSubscriber.removeAllListeners('message'); } + + if (this.presenceCleanupTimer) { + clearInterval(this.presenceCleanupTimer); + this.presenceCleanupTimer = null; + } + } + + private async cleanupStalePresence(): Promise { + const cutoffMs = Date.now() - this.presenceStaleAgeMs; + const removed = await this.userSocketService.cleanupStalePresence(cutoffMs); + if (removed > 0) { + this.logger.debug(`Cleaned up ${removed} stale presence entries`); + } } } diff --git a/src/ws/state/status/handler.ts b/src/ws/state/status/handler.ts index aff12e5..b2ca029 100644 --- a/src/ws/state/status/handler.ts +++ b/src/ws/state/status/handler.ts @@ -47,6 +47,7 @@ export class StatusHandler { const friends = client.data.friends; if (friends) { try { + await this.broadcaster.touchPresence(client); const payload = { userId: currentUserId, status: data, diff --git a/src/ws/state/user-socket.service.ts b/src/ws/state/user-socket.service.ts index 020ab3c..7aba938 100644 --- a/src/ws/state/user-socket.service.ts +++ b/src/ws/state/user-socket.service.ts @@ -2,12 +2,76 @@ import { Injectable, Inject, Logger } from '@nestjs/common'; import { REDIS_CLIENT } from '../../database/redis.module'; import Redis from 'ioredis'; +const SOCKET_KEY_PREFIX = 'socket:user:'; +const SOCKET_REVERSE_KEY_PREFIX = 'socket:id:'; +const LAST_SEEN_KEY_PREFIX = 'presence:last-seen:'; +const PRESENCE_ZSET_KEY = 'presence:last-seen:zset'; + +const SET_SOCKET_MAPPING_SCRIPT = ` +local userKey = KEYS[1] +local reverseKey = KEYS[2] +local userId = ARGV[1] +local socketId = ARGV[2] +local ttl = ARGV[3] +local reversePrefix = ARGV[4] + +local previousSocketId = redis.call('GET', userKey) +redis.call('SET', userKey, socketId, 'EX', ttl) +redis.call('SET', reverseKey, userId, 'EX', ttl) + +if previousSocketId and previousSocketId ~= socketId then + redis.call('DEL', reversePrefix .. previousSocketId) +end + +return 1 +`; + +const REMOVE_SOCKET_MAPPING_SCRIPT = ` +local userKey = KEYS[1] +local reversePrefix = ARGV[1] +local expectedSocketId = ARGV[2] + +local currentSocketId = redis.call('GET', userKey) +if not currentSocketId then + return 0 +end + +if expectedSocketId ~= '' and currentSocketId ~= expectedSocketId then + return 0 +end + +redis.call('DEL', userKey) +redis.call('DEL', reversePrefix .. currentSocketId) +return 1 +`; + +const REMOVE_BY_SOCKET_ID_SCRIPT = ` +local reverseKey = KEYS[1] +local userPrefix = ARGV[1] +local socketId = ARGV[2] + +local userId = redis.call('GET', reverseKey) +if not userId then + return 0 +end + +local userKey = userPrefix .. userId +local currentSocketId = redis.call('GET', userKey) + +redis.call('DEL', reverseKey) +if currentSocketId == socketId then + redis.call('DEL', userKey) +end + +return 1 +`; + @Injectable() export class UserSocketService { private readonly logger = new Logger(UserSocketService.name); private localUserSocketMap: Map = new Map(); - private readonly PREFIX = 'socket:user:'; private readonly TTL = 86400; // 24 hours + private readonly LAST_SEEN_TTL_SECONDS = 604800; // 7 days constructor( @Inject(REDIS_CLIENT) private readonly redisClient: Redis | null, @@ -16,11 +80,15 @@ export class UserSocketService { async setSocket(userId: string, socketId: string): Promise { if (this.redisClient) { try { - await this.redisClient.set( - `${this.PREFIX}${userId}`, + await this.redisClient.eval( + SET_SOCKET_MAPPING_SCRIPT, + 2, + `${SOCKET_KEY_PREFIX}${userId}`, + `${SOCKET_REVERSE_KEY_PREFIX}${socketId}`, + userId, socketId, - 'EX', - this.TTL, + String(this.TTL), + SOCKET_REVERSE_KEY_PREFIX, ); } catch (error) { this.logger.error( @@ -36,10 +104,16 @@ export class UserSocketService { } } - async removeSocket(userId: string): Promise { + async removeSocket(userId: string, expectedSocketId?: string): Promise { if (this.redisClient) { try { - await this.redisClient.del(`${this.PREFIX}${userId}`); + await this.redisClient.eval( + REMOVE_SOCKET_MAPPING_SCRIPT, + 1, + `${SOCKET_KEY_PREFIX}${userId}`, + SOCKET_REVERSE_KEY_PREFIX, + expectedSocketId || '', + ); } catch (error) { this.logger.error( `Failed to remove socket for user ${userId} from Redis`, @@ -47,13 +121,23 @@ export class UserSocketService { ); } } - this.localUserSocketMap.delete(userId); + if (!expectedSocketId) { + this.localUserSocketMap.delete(userId); + return; + } + + const currentLocalSocketId = this.localUserSocketMap.get(userId); + if (currentLocalSocketId === expectedSocketId) { + this.localUserSocketMap.delete(userId); + } } async getSocket(userId: string): Promise { if (this.redisClient) { try { - const socketId = await this.redisClient.get(`${this.PREFIX}${userId}`); + const socketId = await this.redisClient.get( + `${SOCKET_KEY_PREFIX}${userId}`, + ); return socketId; } catch (error) { this.logger.error( @@ -82,7 +166,7 @@ export class UserSocketService { try { // Use pipeline for batch fetching const pipeline = this.redisClient.pipeline(); - friendIds.forEach((id) => pipeline.get(`${this.PREFIX}${id}`)); + friendIds.forEach((id) => pipeline.get(`${SOCKET_KEY_PREFIX}${id}`)); const results = await pipeline.exec(); const sockets: { userId: string; socketId: string }[] = []; @@ -115,4 +199,79 @@ export class UserSocketService { } return sockets; } + + async touchLastSeen(userId: string): Promise { + const now = Date.now(); + if (this.redisClient) { + try { + const key = `${LAST_SEEN_KEY_PREFIX}${userId}`; + await this.redisClient.set( + key, + String(now), + 'EX', + this.LAST_SEEN_TTL_SECONDS, + ); + await this.redisClient.zadd(PRESENCE_ZSET_KEY, now, userId); + return; + } catch (error) { + this.logger.warn( + `Failed to touch last-seen for user ${userId} in Redis`, + error as Error, + ); + } + } + } + + async removeSocketById(socketId: string): Promise { + if (!this.redisClient) { + return; + } + + try { + await this.redisClient.eval( + REMOVE_BY_SOCKET_ID_SCRIPT, + 1, + `${SOCKET_REVERSE_KEY_PREFIX}${socketId}`, + SOCKET_KEY_PREFIX, + socketId, + ); + } catch (error) { + this.logger.warn( + `Failed to remove socket mapping by socket id ${socketId}`, + error as Error, + ); + } + } + + async cleanupStalePresence(cutoffMs: number): Promise { + if (!this.redisClient) { + return 0; + } + + try { + const staleUserIds = await this.redisClient.zrangebyscore( + PRESENCE_ZSET_KEY, + '-inf', + cutoffMs, + ); + + if (staleUserIds.length === 0) { + return 0; + } + + const pipeline = this.redisClient.pipeline(); + staleUserIds.forEach((userId) => { + pipeline.del(`${LAST_SEEN_KEY_PREFIX}${userId}`); + }); + pipeline.zremrangebyscore(PRESENCE_ZSET_KEY, '-inf', cutoffMs); + await pipeline.exec(); + return staleUserIds.length; + } catch (error) { + this.logger.warn( + 'Failed to cleanup stale presence entries', + error as Error, + ); + return 0; + } + } } diff --git a/src/ws/state/utils/broadcasting.ts b/src/ws/state/utils/broadcasting.ts index 1c579b4..469c587 100644 --- a/src/ws/state/utils/broadcasting.ts +++ b/src/ws/state/utils/broadcasting.ts @@ -1,5 +1,6 @@ import { UserSocketService } from '../user-socket.service'; import { WsNotificationService } from '../ws-notification.service'; +import type { AuthenticatedSocket } from '../../../types/socket'; export class Broadcaster { constructor( @@ -7,6 +8,10 @@ export class Broadcaster { private readonly wsNotificationService: WsNotificationService, ) {} + async touchPresence(client: AuthenticatedSocket) { + await this.wsNotificationService.maybeTouchPresence(client); + } + async broadcastToFriends(friends: Set, event: string, payload: any) { const friendIds = Array.from(friends); const friendSockets = diff --git a/src/ws/state/utils/redis-handler.ts b/src/ws/state/utils/redis-handler.ts index 8df961f..77759cf 100644 --- a/src/ws/state/utils/redis-handler.ts +++ b/src/ws/state/utils/redis-handler.ts @@ -36,4 +36,20 @@ export class RedisHandler { this.logger.error('Error handling friend cache update message', error); } } + + async handleUserProfileCacheInvalidateMessage( + message: string, + ): Promise { + try { + const data = JSON.parse(message) as { + userId: string; + }; + await this.wsNotificationService.clearSenderNameCache(data.userId); + } catch (error) { + this.logger.error( + 'Error handling user profile cache invalidate message', + error, + ); + } + } } diff --git a/src/ws/state/ws-events.ts b/src/ws/state/ws-events.ts index a09e5b9..696d96d 100644 --- a/src/ws/state/ws-events.ts +++ b/src/ws/state/ws-events.ts @@ -22,4 +22,5 @@ export const WS_EVENT = { export const REDIS_CHANNEL = { ACTIVE_DOLL_UPDATE: 'active-doll-update', FRIEND_CACHE_UPDATE: 'friend-cache-update', + USER_PROFILE_CACHE_INVALIDATE: 'user-profile-cache-invalidate', } as const; diff --git a/src/ws/state/ws-notification.service.ts b/src/ws/state/ws-notification.service.ts index 34a3ad4..8e8d736 100644 --- a/src/ws/state/ws-notification.service.ts +++ b/src/ws/state/ws-notification.service.ts @@ -1,10 +1,14 @@ -import { Injectable, Logger, Inject } from '@nestjs/common'; +import { Inject, Injectable, Logger } from '@nestjs/common'; +import { OnEvent } from '@nestjs/event-emitter'; import Redis from 'ioredis'; import { Server } from 'socket.io'; -import { UserSocketService } from './user-socket.service'; +import { UserEvents } from '../../users/events/user.events'; import type { AuthenticatedSocket } from '../../types/socket'; import { REDIS_CLIENT } from '../../database/redis.module'; import { REDIS_CHANNEL } from './ws-events'; +import { UserSocketService } from './user-socket.service'; + +const PRESENCE_UPDATE_THROTTLE_MS = 15_000; @Injectable() export class WsNotificationService { @@ -42,6 +46,11 @@ export class WsNotificationService { this.io.to(socketId).emit(event, payload); } + @OnEvent(UserEvents.PROFILE_UPDATED) + async handleUserProfileUpdated(payload: { userId: string }) { + await this.publishUserProfileCacheInvalidate(payload.userId); + } + async updateFriendsCache( userId: string, friendId: string, @@ -126,4 +135,63 @@ export class WsNotificationService { ); } } + + async publishUserProfileCacheInvalidate(userId: string) { + if (this.redisClient) { + try { + await this.redisClient.publish( + REDIS_CHANNEL.USER_PROFILE_CACHE_INVALIDATE, + JSON.stringify({ userId }), + ); + return; + } catch (error) { + this.logger.warn( + 'Redis publish failed for user profile cache invalidate; applying local update only', + error as Error, + ); + } + } + + await this.clearSenderNameCache(userId); + } + + async clearSenderNameCache(userId: string) { + if (!this.io) { + return; + } + + const socketId = await this.userSocketService.getSocket(userId); + if (!socketId) { + return; + } + + const socket = this.io.sockets.sockets.get(socketId) as + | AuthenticatedSocket + | undefined; + if (!socket?.data) { + return; + } + + socket.data.senderName = undefined; + socket.data.senderNameCachedAt = undefined; + } + + async maybeTouchPresence(client: AuthenticatedSocket): Promise { + const userId = client.data.userId; + if (!userId) { + return; + } + + const now = Date.now(); + const lastSeenAt = client.data.lastSeenAt; + if ( + typeof lastSeenAt === 'number' && + now - lastSeenAt < PRESENCE_UPDATE_THROTTLE_MS + ) { + return; + } + + client.data.lastSeenAt = now; + await this.userSocketService.touchLastSeen(userId); + } }