redis pt 4: presence & more cache invalidation
This commit is contained in:
@@ -12,6 +12,9 @@ REDIS_PORT=6379
|
|||||||
REDIS_REQUIRED=false
|
REDIS_REQUIRED=false
|
||||||
REDIS_CONNECT_TIMEOUT_MS=5000
|
REDIS_CONNECT_TIMEOUT_MS=5000
|
||||||
REDIS_STARTUP_RETRIES=10
|
REDIS_STARTUP_RETRIES=10
|
||||||
|
# Stale presence cleanup threshold and interval
|
||||||
|
PRESENCE_STALE_AGE_MS=604800000
|
||||||
|
PRESENCE_CLEANUP_INTERVAL_MS=300000
|
||||||
|
|
||||||
# Cache
|
# Cache
|
||||||
CACHE_KEY_PREFIX=friendolls
|
CACHE_KEY_PREFIX=friendolls
|
||||||
|
|||||||
@@ -89,6 +89,8 @@ function validateEnvironment(
|
|||||||
validateOptionalPositiveNumber(config, 'CACHE_MAX_TTL_SECONDS');
|
validateOptionalPositiveNumber(config, 'CACHE_MAX_TTL_SECONDS');
|
||||||
validateOptionalPositiveNumber(config, 'CACHE_METRICS_LOG_INTERVAL_MS');
|
validateOptionalPositiveNumber(config, 'CACHE_METRICS_LOG_INTERVAL_MS');
|
||||||
validateOptionalPositiveNumber(config, 'CACHE_TAG_MAX_ENTRIES');
|
validateOptionalPositiveNumber(config, 'CACHE_TAG_MAX_ENTRIES');
|
||||||
|
validateOptionalPositiveNumber(config, 'PRESENCE_STALE_AGE_MS');
|
||||||
|
validateOptionalPositiveNumber(config, 'PRESENCE_CLEANUP_INTERVAL_MS');
|
||||||
|
|
||||||
validateOptionalProvider(config, 'GOOGLE');
|
validateOptionalProvider(config, 'GOOGLE');
|
||||||
validateOptionalProvider(config, 'DISCORD');
|
validateOptionalProvider(config, 'DISCORD');
|
||||||
|
|||||||
@@ -292,6 +292,7 @@ export class AuthService {
|
|||||||
this.eventEmitter.emit(UserEvents.SEARCH_INDEX_INVALIDATED, {
|
this.eventEmitter.emit(UserEvents.SEARCH_INDEX_INVALIDATED, {
|
||||||
userId: user.id,
|
userId: user.id,
|
||||||
});
|
});
|
||||||
|
this.eventEmitter.emit(UserEvents.PROFILE_UPDATED, { userId: user.id });
|
||||||
|
|
||||||
return user;
|
return user;
|
||||||
}
|
}
|
||||||
@@ -354,6 +355,7 @@ export class AuthService {
|
|||||||
this.eventEmitter.emit(UserEvents.SEARCH_INDEX_INVALIDATED, {
|
this.eventEmitter.emit(UserEvents.SEARCH_INDEX_INVALIDATED, {
|
||||||
userId: user.id,
|
userId: user.id,
|
||||||
});
|
});
|
||||||
|
this.eventEmitter.emit(UserEvents.PROFILE_UPDATED, { userId: user.id });
|
||||||
|
|
||||||
return user;
|
return user;
|
||||||
}
|
}
|
||||||
|
|||||||
1
src/types/socket.d.ts
vendored
1
src/types/socket.d.ts
vendored
@@ -13,5 +13,6 @@ export type AuthenticatedSocket = BaseSocket<
|
|||||||
friends?: Set<string>; // Set of friend user IDs
|
friends?: Set<string>; // Set of friend user IDs
|
||||||
senderName?: string;
|
senderName?: string;
|
||||||
senderNameCachedAt?: number;
|
senderNameCachedAt?: number;
|
||||||
|
lastSeenAt?: number;
|
||||||
}
|
}
|
||||||
>;
|
>;
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ import { Doll } from '@prisma/client';
|
|||||||
export const UserEvents = {
|
export const UserEvents = {
|
||||||
ACTIVE_DOLL_CHANGED: 'user.active-doll.changed',
|
ACTIVE_DOLL_CHANGED: 'user.active-doll.changed',
|
||||||
SEARCH_INDEX_INVALIDATED: 'user.search-index.invalidated',
|
SEARCH_INDEX_INVALIDATED: 'user.search-index.invalidated',
|
||||||
|
PROFILE_UPDATED: 'user.profile.updated',
|
||||||
} as const;
|
} as const;
|
||||||
|
|
||||||
export interface UserActiveDollChangedEvent {
|
export interface UserActiveDollChangedEvent {
|
||||||
@@ -14,3 +15,7 @@ export interface UserActiveDollChangedEvent {
|
|||||||
export interface UserSearchIndexInvalidatedEvent {
|
export interface UserSearchIndexInvalidatedEvent {
|
||||||
userId?: string;
|
userId?: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export interface UserProfileUpdatedEvent {
|
||||||
|
userId: string;
|
||||||
|
}
|
||||||
|
|||||||
@@ -104,6 +104,7 @@ export class UsersService {
|
|||||||
});
|
});
|
||||||
|
|
||||||
this.eventEmitter.emit(UserEvents.SEARCH_INDEX_INVALIDATED, { userId: id });
|
this.eventEmitter.emit(UserEvents.SEARCH_INDEX_INVALIDATED, { userId: id });
|
||||||
|
this.eventEmitter.emit(UserEvents.PROFILE_UPDATED, { userId: id });
|
||||||
|
|
||||||
this.logger.log(`User ${id} profile update requested`);
|
this.logger.log(`User ${id} profile update requested`);
|
||||||
|
|
||||||
@@ -135,6 +136,7 @@ export class UsersService {
|
|||||||
});
|
});
|
||||||
|
|
||||||
this.eventEmitter.emit(UserEvents.SEARCH_INDEX_INVALIDATED, { userId: id });
|
this.eventEmitter.emit(UserEvents.SEARCH_INDEX_INVALIDATED, { userId: id });
|
||||||
|
this.eventEmitter.emit(UserEvents.PROFILE_UPDATED, { userId: id });
|
||||||
|
|
||||||
this.logger.log(`User ${id} deleted their account`);
|
this.logger.log(`User ${id} deleted their account`);
|
||||||
}
|
}
|
||||||
@@ -318,6 +320,7 @@ export class UsersService {
|
|||||||
this.eventEmitter.emit(UserEvents.SEARCH_INDEX_INVALIDATED, {
|
this.eventEmitter.emit(UserEvents.SEARCH_INDEX_INVALIDATED, {
|
||||||
userId: user.id,
|
userId: user.id,
|
||||||
});
|
});
|
||||||
|
this.eventEmitter.emit(UserEvents.PROFILE_UPDATED, { userId: user.id });
|
||||||
|
|
||||||
return user;
|
return user;
|
||||||
}
|
}
|
||||||
@@ -330,6 +333,8 @@ export class UsersService {
|
|||||||
where: { id: userId },
|
where: { id: userId },
|
||||||
data: { passwordHash } as unknown as Prisma.UserUpdateInput,
|
data: { passwordHash } as unknown as Prisma.UserUpdateInput,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
this.eventEmitter.emit(UserEvents.PROFILE_UPDATED, { userId });
|
||||||
}
|
}
|
||||||
|
|
||||||
async updateLastLogin(userId: string): Promise<void> {
|
async updateLastLogin(userId: string): Promise<void> {
|
||||||
|
|||||||
@@ -116,7 +116,9 @@ export class ConnectionHandler {
|
|||||||
|
|
||||||
// 3. Register socket mapping (Redis Write)
|
// 3. Register socket mapping (Redis Write)
|
||||||
await this.userSocketService.setSocket(userState.id, client.id);
|
await this.userSocketService.setSocket(userState.id, client.id);
|
||||||
|
await this.userSocketService.touchLastSeen(userState.id);
|
||||||
client.data.userId = userState.id;
|
client.data.userId = userState.id;
|
||||||
|
client.data.lastSeenAt = Date.now();
|
||||||
|
|
||||||
client.data.activeDollId = userState.activeDollId || null;
|
client.data.activeDollId = userState.activeDollId || null;
|
||||||
client.data.friends = new Set(friends.map((f) => f.friendId));
|
client.data.friends = new Set(friends.map((f) => f.friendId));
|
||||||
@@ -149,7 +151,8 @@ export class ConnectionHandler {
|
|||||||
// Check if this socket is still the active one for the user
|
// Check if this socket is still the active one for the user
|
||||||
const currentSocketId = await this.userSocketService.getSocket(userId);
|
const currentSocketId = await this.userSocketService.getSocket(userId);
|
||||||
if (currentSocketId === client.id) {
|
if (currentSocketId === client.id) {
|
||||||
await this.userSocketService.removeSocket(userId);
|
await this.userSocketService.removeSocket(userId, client.id);
|
||||||
|
await this.userSocketService.touchLastSeen(userId);
|
||||||
// Note: throttling remove is done in gateway
|
// Note: throttling remove is done in gateway
|
||||||
|
|
||||||
// Notify friends that this user has disconnected
|
// Notify friends that this user has disconnected
|
||||||
@@ -179,5 +182,7 @@ export class ConnectionHandler {
|
|||||||
this.logger.log(
|
this.logger.log(
|
||||||
`Client id: ${client.id} disconnected (user: ${user?.userId || 'unknown'})`,
|
`Client id: ${client.id} disconnected (user: ${user?.userId || 'unknown'})`,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
await this.userSocketService.removeSocketById(client.id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -41,6 +41,7 @@ export class CursorHandler {
|
|||||||
// Broadcast to online friends
|
// Broadcast to online friends
|
||||||
const friends = client.data.friends;
|
const friends = client.data.friends;
|
||||||
if (friends) {
|
if (friends) {
|
||||||
|
await this.broadcaster.touchPresence(client);
|
||||||
const payload = {
|
const payload = {
|
||||||
userId: currentUserId,
|
userId: currentUserId,
|
||||||
position: data,
|
position: data,
|
||||||
|
|||||||
@@ -50,6 +50,8 @@ export class InteractionHandler {
|
|||||||
client: AuthenticatedSocket,
|
client: AuthenticatedSocket,
|
||||||
data: SendInteractionDto,
|
data: SendInteractionDto,
|
||||||
) {
|
) {
|
||||||
|
await this.wsNotificationService.maybeTouchPresence(client);
|
||||||
|
|
||||||
const user = client.data.user;
|
const user = client.data.user;
|
||||||
const currentUserId = Validator.validateInitialized(client);
|
const currentUserId = Validator.validateInitialized(client);
|
||||||
|
|
||||||
|
|||||||
@@ -29,6 +29,11 @@ import { InteractionHandler } from './interaction/handler';
|
|||||||
import { RedisHandler } from './utils/redis-handler';
|
import { RedisHandler } from './utils/redis-handler';
|
||||||
import { Broadcaster } from './utils/broadcasting';
|
import { Broadcaster } from './utils/broadcasting';
|
||||||
import { Throttler } from './utils/throttling';
|
import { Throttler } from './utils/throttling';
|
||||||
|
import { ConfigService } from '@nestjs/config';
|
||||||
|
import { parsePositiveInteger } from '../../common/config/env.utils';
|
||||||
|
|
||||||
|
const DEFAULT_PRESENCE_STALE_AGE_MS = 7 * 24 * 60 * 60 * 1000;
|
||||||
|
const DEFAULT_PRESENCE_CLEANUP_INTERVAL_MS = 5 * 60 * 1000;
|
||||||
|
|
||||||
@WebSocketGateway()
|
@WebSocketGateway()
|
||||||
export class StateGateway
|
export class StateGateway
|
||||||
@@ -49,12 +54,16 @@ export class StateGateway
|
|||||||
private readonly cursorHandler: CursorHandler;
|
private readonly cursorHandler: CursorHandler;
|
||||||
private readonly statusHandler: StatusHandler;
|
private readonly statusHandler: StatusHandler;
|
||||||
private readonly interactionHandler: InteractionHandler;
|
private readonly interactionHandler: InteractionHandler;
|
||||||
|
private readonly presenceStaleAgeMs: number;
|
||||||
|
private readonly presenceCleanupIntervalMs: number;
|
||||||
|
private presenceCleanupTimer: NodeJS.Timeout | null = null;
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
private readonly jwtVerificationService: JwtVerificationService,
|
private readonly jwtVerificationService: JwtVerificationService,
|
||||||
private readonly prisma: PrismaService,
|
private readonly prisma: PrismaService,
|
||||||
private readonly userSocketService: UserSocketService,
|
private readonly userSocketService: UserSocketService,
|
||||||
private readonly wsNotificationService: WsNotificationService,
|
private readonly wsNotificationService: WsNotificationService,
|
||||||
|
private readonly configService: ConfigService,
|
||||||
@Inject(REDIS_CLIENT) private readonly redisClient: Redis | null,
|
@Inject(REDIS_CLIENT) private readonly redisClient: Redis | null,
|
||||||
@Inject(REDIS_SUBSCRIBER_CLIENT)
|
@Inject(REDIS_SUBSCRIBER_CLIENT)
|
||||||
private readonly redisSubscriber: Redis | null,
|
private readonly redisSubscriber: Redis | null,
|
||||||
@@ -78,6 +87,14 @@ export class StateGateway
|
|||||||
this.userSocketService,
|
this.userSocketService,
|
||||||
this.wsNotificationService,
|
this.wsNotificationService,
|
||||||
);
|
);
|
||||||
|
this.presenceStaleAgeMs = parsePositiveInteger(
|
||||||
|
this.configService.get<string>('PRESENCE_STALE_AGE_MS'),
|
||||||
|
DEFAULT_PRESENCE_STALE_AGE_MS,
|
||||||
|
);
|
||||||
|
this.presenceCleanupIntervalMs = parsePositiveInteger(
|
||||||
|
this.configService.get<string>('PRESENCE_CLEANUP_INTERVAL_MS'),
|
||||||
|
DEFAULT_PRESENCE_CLEANUP_INTERVAL_MS,
|
||||||
|
);
|
||||||
|
|
||||||
// Setup Redis subscription for cross-instance communication
|
// Setup Redis subscription for cross-instance communication
|
||||||
if (this.redisSubscriber) {
|
if (this.redisSubscriber) {
|
||||||
@@ -85,6 +102,7 @@ export class StateGateway
|
|||||||
.subscribe(
|
.subscribe(
|
||||||
REDIS_CHANNEL.ACTIVE_DOLL_UPDATE,
|
REDIS_CHANNEL.ACTIVE_DOLL_UPDATE,
|
||||||
REDIS_CHANNEL.FRIEND_CACHE_UPDATE,
|
REDIS_CHANNEL.FRIEND_CACHE_UPDATE,
|
||||||
|
REDIS_CHANNEL.USER_PROFILE_CACHE_INVALIDATE,
|
||||||
(err) => {
|
(err) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
this.logger.error(`Failed to subscribe to Redis channels`, err);
|
this.logger.error(`Failed to subscribe to Redis channels`, err);
|
||||||
@@ -104,6 +122,9 @@ export class StateGateway
|
|||||||
} else if (channel === REDIS_CHANNEL.FRIEND_CACHE_UPDATE) {
|
} else if (channel === REDIS_CHANNEL.FRIEND_CACHE_UPDATE) {
|
||||||
// eslint-disable-next-line @typescript-eslint/no-floating-promises
|
// eslint-disable-next-line @typescript-eslint/no-floating-promises
|
||||||
this.redisHandler.handleFriendCacheUpdateMessage(message);
|
this.redisHandler.handleFriendCacheUpdateMessage(message);
|
||||||
|
} else if (channel === REDIS_CHANNEL.USER_PROFILE_CACHE_INVALIDATE) {
|
||||||
|
// eslint-disable-next-line @typescript-eslint/no-floating-promises
|
||||||
|
this.redisHandler.handleUserProfileCacheInvalidateMessage(message);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@@ -112,6 +133,11 @@ export class StateGateway
|
|||||||
afterInit() {
|
afterInit() {
|
||||||
this.logger.log('Initialized');
|
this.logger.log('Initialized');
|
||||||
this.wsNotificationService.setIo(this.io);
|
this.wsNotificationService.setIo(this.io);
|
||||||
|
|
||||||
|
this.presenceCleanupTimer = setInterval(() => {
|
||||||
|
void this.cleanupStalePresence();
|
||||||
|
}, this.presenceCleanupIntervalMs);
|
||||||
|
this.presenceCleanupTimer.unref();
|
||||||
}
|
}
|
||||||
|
|
||||||
handleConnection(client: AuthenticatedSocket) {
|
handleConnection(client: AuthenticatedSocket) {
|
||||||
@@ -164,5 +190,18 @@ export class StateGateway
|
|||||||
if (this.redisSubscriber) {
|
if (this.redisSubscriber) {
|
||||||
this.redisSubscriber.removeAllListeners('message');
|
this.redisSubscriber.removeAllListeners('message');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (this.presenceCleanupTimer) {
|
||||||
|
clearInterval(this.presenceCleanupTimer);
|
||||||
|
this.presenceCleanupTimer = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async cleanupStalePresence(): Promise<void> {
|
||||||
|
const cutoffMs = Date.now() - this.presenceStaleAgeMs;
|
||||||
|
const removed = await this.userSocketService.cleanupStalePresence(cutoffMs);
|
||||||
|
if (removed > 0) {
|
||||||
|
this.logger.debug(`Cleaned up ${removed} stale presence entries`);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -47,6 +47,7 @@ export class StatusHandler {
|
|||||||
const friends = client.data.friends;
|
const friends = client.data.friends;
|
||||||
if (friends) {
|
if (friends) {
|
||||||
try {
|
try {
|
||||||
|
await this.broadcaster.touchPresence(client);
|
||||||
const payload = {
|
const payload = {
|
||||||
userId: currentUserId,
|
userId: currentUserId,
|
||||||
status: data,
|
status: data,
|
||||||
|
|||||||
@@ -2,12 +2,76 @@ import { Injectable, Inject, Logger } from '@nestjs/common';
|
|||||||
import { REDIS_CLIENT } from '../../database/redis.module';
|
import { REDIS_CLIENT } from '../../database/redis.module';
|
||||||
import Redis from 'ioredis';
|
import Redis from 'ioredis';
|
||||||
|
|
||||||
|
const SOCKET_KEY_PREFIX = 'socket:user:';
|
||||||
|
const SOCKET_REVERSE_KEY_PREFIX = 'socket:id:';
|
||||||
|
const LAST_SEEN_KEY_PREFIX = 'presence:last-seen:';
|
||||||
|
const PRESENCE_ZSET_KEY = 'presence:last-seen:zset';
|
||||||
|
|
||||||
|
const SET_SOCKET_MAPPING_SCRIPT = `
|
||||||
|
local userKey = KEYS[1]
|
||||||
|
local reverseKey = KEYS[2]
|
||||||
|
local userId = ARGV[1]
|
||||||
|
local socketId = ARGV[2]
|
||||||
|
local ttl = ARGV[3]
|
||||||
|
local reversePrefix = ARGV[4]
|
||||||
|
|
||||||
|
local previousSocketId = redis.call('GET', userKey)
|
||||||
|
redis.call('SET', userKey, socketId, 'EX', ttl)
|
||||||
|
redis.call('SET', reverseKey, userId, 'EX', ttl)
|
||||||
|
|
||||||
|
if previousSocketId and previousSocketId ~= socketId then
|
||||||
|
redis.call('DEL', reversePrefix .. previousSocketId)
|
||||||
|
end
|
||||||
|
|
||||||
|
return 1
|
||||||
|
`;
|
||||||
|
|
||||||
|
const REMOVE_SOCKET_MAPPING_SCRIPT = `
|
||||||
|
local userKey = KEYS[1]
|
||||||
|
local reversePrefix = ARGV[1]
|
||||||
|
local expectedSocketId = ARGV[2]
|
||||||
|
|
||||||
|
local currentSocketId = redis.call('GET', userKey)
|
||||||
|
if not currentSocketId then
|
||||||
|
return 0
|
||||||
|
end
|
||||||
|
|
||||||
|
if expectedSocketId ~= '' and currentSocketId ~= expectedSocketId then
|
||||||
|
return 0
|
||||||
|
end
|
||||||
|
|
||||||
|
redis.call('DEL', userKey)
|
||||||
|
redis.call('DEL', reversePrefix .. currentSocketId)
|
||||||
|
return 1
|
||||||
|
`;
|
||||||
|
|
||||||
|
const REMOVE_BY_SOCKET_ID_SCRIPT = `
|
||||||
|
local reverseKey = KEYS[1]
|
||||||
|
local userPrefix = ARGV[1]
|
||||||
|
local socketId = ARGV[2]
|
||||||
|
|
||||||
|
local userId = redis.call('GET', reverseKey)
|
||||||
|
if not userId then
|
||||||
|
return 0
|
||||||
|
end
|
||||||
|
|
||||||
|
local userKey = userPrefix .. userId
|
||||||
|
local currentSocketId = redis.call('GET', userKey)
|
||||||
|
|
||||||
|
redis.call('DEL', reverseKey)
|
||||||
|
if currentSocketId == socketId then
|
||||||
|
redis.call('DEL', userKey)
|
||||||
|
end
|
||||||
|
|
||||||
|
return 1
|
||||||
|
`;
|
||||||
|
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class UserSocketService {
|
export class UserSocketService {
|
||||||
private readonly logger = new Logger(UserSocketService.name);
|
private readonly logger = new Logger(UserSocketService.name);
|
||||||
private localUserSocketMap: Map<string, string> = new Map();
|
private localUserSocketMap: Map<string, string> = new Map();
|
||||||
private readonly PREFIX = 'socket:user:';
|
|
||||||
private readonly TTL = 86400; // 24 hours
|
private readonly TTL = 86400; // 24 hours
|
||||||
|
private readonly LAST_SEEN_TTL_SECONDS = 604800; // 7 days
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
@Inject(REDIS_CLIENT) private readonly redisClient: Redis | null,
|
@Inject(REDIS_CLIENT) private readonly redisClient: Redis | null,
|
||||||
@@ -16,11 +80,15 @@ export class UserSocketService {
|
|||||||
async setSocket(userId: string, socketId: string): Promise<void> {
|
async setSocket(userId: string, socketId: string): Promise<void> {
|
||||||
if (this.redisClient) {
|
if (this.redisClient) {
|
||||||
try {
|
try {
|
||||||
await this.redisClient.set(
|
await this.redisClient.eval(
|
||||||
`${this.PREFIX}${userId}`,
|
SET_SOCKET_MAPPING_SCRIPT,
|
||||||
|
2,
|
||||||
|
`${SOCKET_KEY_PREFIX}${userId}`,
|
||||||
|
`${SOCKET_REVERSE_KEY_PREFIX}${socketId}`,
|
||||||
|
userId,
|
||||||
socketId,
|
socketId,
|
||||||
'EX',
|
String(this.TTL),
|
||||||
this.TTL,
|
SOCKET_REVERSE_KEY_PREFIX,
|
||||||
);
|
);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
this.logger.error(
|
this.logger.error(
|
||||||
@@ -36,10 +104,16 @@ export class UserSocketService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async removeSocket(userId: string): Promise<void> {
|
async removeSocket(userId: string, expectedSocketId?: string): Promise<void> {
|
||||||
if (this.redisClient) {
|
if (this.redisClient) {
|
||||||
try {
|
try {
|
||||||
await this.redisClient.del(`${this.PREFIX}${userId}`);
|
await this.redisClient.eval(
|
||||||
|
REMOVE_SOCKET_MAPPING_SCRIPT,
|
||||||
|
1,
|
||||||
|
`${SOCKET_KEY_PREFIX}${userId}`,
|
||||||
|
SOCKET_REVERSE_KEY_PREFIX,
|
||||||
|
expectedSocketId || '',
|
||||||
|
);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
this.logger.error(
|
this.logger.error(
|
||||||
`Failed to remove socket for user ${userId} from Redis`,
|
`Failed to remove socket for user ${userId} from Redis`,
|
||||||
@@ -47,13 +121,23 @@ export class UserSocketService {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
this.localUserSocketMap.delete(userId);
|
if (!expectedSocketId) {
|
||||||
|
this.localUserSocketMap.delete(userId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const currentLocalSocketId = this.localUserSocketMap.get(userId);
|
||||||
|
if (currentLocalSocketId === expectedSocketId) {
|
||||||
|
this.localUserSocketMap.delete(userId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async getSocket(userId: string): Promise<string | null> {
|
async getSocket(userId: string): Promise<string | null> {
|
||||||
if (this.redisClient) {
|
if (this.redisClient) {
|
||||||
try {
|
try {
|
||||||
const socketId = await this.redisClient.get(`${this.PREFIX}${userId}`);
|
const socketId = await this.redisClient.get(
|
||||||
|
`${SOCKET_KEY_PREFIX}${userId}`,
|
||||||
|
);
|
||||||
return socketId;
|
return socketId;
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
this.logger.error(
|
this.logger.error(
|
||||||
@@ -82,7 +166,7 @@ export class UserSocketService {
|
|||||||
try {
|
try {
|
||||||
// Use pipeline for batch fetching
|
// Use pipeline for batch fetching
|
||||||
const pipeline = this.redisClient.pipeline();
|
const pipeline = this.redisClient.pipeline();
|
||||||
friendIds.forEach((id) => pipeline.get(`${this.PREFIX}${id}`));
|
friendIds.forEach((id) => pipeline.get(`${SOCKET_KEY_PREFIX}${id}`));
|
||||||
const results = await pipeline.exec();
|
const results = await pipeline.exec();
|
||||||
|
|
||||||
const sockets: { userId: string; socketId: string }[] = [];
|
const sockets: { userId: string; socketId: string }[] = [];
|
||||||
@@ -115,4 +199,79 @@ export class UserSocketService {
|
|||||||
}
|
}
|
||||||
return sockets;
|
return sockets;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async touchLastSeen(userId: string): Promise<void> {
|
||||||
|
const now = Date.now();
|
||||||
|
if (this.redisClient) {
|
||||||
|
try {
|
||||||
|
const key = `${LAST_SEEN_KEY_PREFIX}${userId}`;
|
||||||
|
await this.redisClient.set(
|
||||||
|
key,
|
||||||
|
String(now),
|
||||||
|
'EX',
|
||||||
|
this.LAST_SEEN_TTL_SECONDS,
|
||||||
|
);
|
||||||
|
await this.redisClient.zadd(PRESENCE_ZSET_KEY, now, userId);
|
||||||
|
return;
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.warn(
|
||||||
|
`Failed to touch last-seen for user ${userId} in Redis`,
|
||||||
|
error as Error,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async removeSocketById(socketId: string): Promise<void> {
|
||||||
|
if (!this.redisClient) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
await this.redisClient.eval(
|
||||||
|
REMOVE_BY_SOCKET_ID_SCRIPT,
|
||||||
|
1,
|
||||||
|
`${SOCKET_REVERSE_KEY_PREFIX}${socketId}`,
|
||||||
|
SOCKET_KEY_PREFIX,
|
||||||
|
socketId,
|
||||||
|
);
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.warn(
|
||||||
|
`Failed to remove socket mapping by socket id ${socketId}`,
|
||||||
|
error as Error,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async cleanupStalePresence(cutoffMs: number): Promise<number> {
|
||||||
|
if (!this.redisClient) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
const staleUserIds = await this.redisClient.zrangebyscore(
|
||||||
|
PRESENCE_ZSET_KEY,
|
||||||
|
'-inf',
|
||||||
|
cutoffMs,
|
||||||
|
);
|
||||||
|
|
||||||
|
if (staleUserIds.length === 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
const pipeline = this.redisClient.pipeline();
|
||||||
|
staleUserIds.forEach((userId) => {
|
||||||
|
pipeline.del(`${LAST_SEEN_KEY_PREFIX}${userId}`);
|
||||||
|
});
|
||||||
|
pipeline.zremrangebyscore(PRESENCE_ZSET_KEY, '-inf', cutoffMs);
|
||||||
|
await pipeline.exec();
|
||||||
|
return staleUserIds.length;
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.warn(
|
||||||
|
'Failed to cleanup stale presence entries',
|
||||||
|
error as Error,
|
||||||
|
);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
import { UserSocketService } from '../user-socket.service';
|
import { UserSocketService } from '../user-socket.service';
|
||||||
import { WsNotificationService } from '../ws-notification.service';
|
import { WsNotificationService } from '../ws-notification.service';
|
||||||
|
import type { AuthenticatedSocket } from '../../../types/socket';
|
||||||
|
|
||||||
export class Broadcaster {
|
export class Broadcaster {
|
||||||
constructor(
|
constructor(
|
||||||
@@ -7,6 +8,10 @@ export class Broadcaster {
|
|||||||
private readonly wsNotificationService: WsNotificationService,
|
private readonly wsNotificationService: WsNotificationService,
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
|
async touchPresence(client: AuthenticatedSocket) {
|
||||||
|
await this.wsNotificationService.maybeTouchPresence(client);
|
||||||
|
}
|
||||||
|
|
||||||
async broadcastToFriends(friends: Set<string>, event: string, payload: any) {
|
async broadcastToFriends(friends: Set<string>, event: string, payload: any) {
|
||||||
const friendIds = Array.from(friends);
|
const friendIds = Array.from(friends);
|
||||||
const friendSockets =
|
const friendSockets =
|
||||||
|
|||||||
@@ -36,4 +36,20 @@ export class RedisHandler {
|
|||||||
this.logger.error('Error handling friend cache update message', error);
|
this.logger.error('Error handling friend cache update message', error);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async handleUserProfileCacheInvalidateMessage(
|
||||||
|
message: string,
|
||||||
|
): Promise<void> {
|
||||||
|
try {
|
||||||
|
const data = JSON.parse(message) as {
|
||||||
|
userId: string;
|
||||||
|
};
|
||||||
|
await this.wsNotificationService.clearSenderNameCache(data.userId);
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.error(
|
||||||
|
'Error handling user profile cache invalidate message',
|
||||||
|
error,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -22,4 +22,5 @@ export const WS_EVENT = {
|
|||||||
export const REDIS_CHANNEL = {
|
export const REDIS_CHANNEL = {
|
||||||
ACTIVE_DOLL_UPDATE: 'active-doll-update',
|
ACTIVE_DOLL_UPDATE: 'active-doll-update',
|
||||||
FRIEND_CACHE_UPDATE: 'friend-cache-update',
|
FRIEND_CACHE_UPDATE: 'friend-cache-update',
|
||||||
|
USER_PROFILE_CACHE_INVALIDATE: 'user-profile-cache-invalidate',
|
||||||
} as const;
|
} as const;
|
||||||
|
|||||||
@@ -1,10 +1,14 @@
|
|||||||
import { Injectable, Logger, Inject } from '@nestjs/common';
|
import { Inject, Injectable, Logger } from '@nestjs/common';
|
||||||
|
import { OnEvent } from '@nestjs/event-emitter';
|
||||||
import Redis from 'ioredis';
|
import Redis from 'ioredis';
|
||||||
import { Server } from 'socket.io';
|
import { Server } from 'socket.io';
|
||||||
import { UserSocketService } from './user-socket.service';
|
import { UserEvents } from '../../users/events/user.events';
|
||||||
import type { AuthenticatedSocket } from '../../types/socket';
|
import type { AuthenticatedSocket } from '../../types/socket';
|
||||||
import { REDIS_CLIENT } from '../../database/redis.module';
|
import { REDIS_CLIENT } from '../../database/redis.module';
|
||||||
import { REDIS_CHANNEL } from './ws-events';
|
import { REDIS_CHANNEL } from './ws-events';
|
||||||
|
import { UserSocketService } from './user-socket.service';
|
||||||
|
|
||||||
|
const PRESENCE_UPDATE_THROTTLE_MS = 15_000;
|
||||||
|
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class WsNotificationService {
|
export class WsNotificationService {
|
||||||
@@ -42,6 +46,11 @@ export class WsNotificationService {
|
|||||||
this.io.to(socketId).emit(event, payload);
|
this.io.to(socketId).emit(event, payload);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@OnEvent(UserEvents.PROFILE_UPDATED)
|
||||||
|
async handleUserProfileUpdated(payload: { userId: string }) {
|
||||||
|
await this.publishUserProfileCacheInvalidate(payload.userId);
|
||||||
|
}
|
||||||
|
|
||||||
async updateFriendsCache(
|
async updateFriendsCache(
|
||||||
userId: string,
|
userId: string,
|
||||||
friendId: string,
|
friendId: string,
|
||||||
@@ -126,4 +135,63 @@ export class WsNotificationService {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async publishUserProfileCacheInvalidate(userId: string) {
|
||||||
|
if (this.redisClient) {
|
||||||
|
try {
|
||||||
|
await this.redisClient.publish(
|
||||||
|
REDIS_CHANNEL.USER_PROFILE_CACHE_INVALIDATE,
|
||||||
|
JSON.stringify({ userId }),
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.warn(
|
||||||
|
'Redis publish failed for user profile cache invalidate; applying local update only',
|
||||||
|
error as Error,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
await this.clearSenderNameCache(userId);
|
||||||
|
}
|
||||||
|
|
||||||
|
async clearSenderNameCache(userId: string) {
|
||||||
|
if (!this.io) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const socketId = await this.userSocketService.getSocket(userId);
|
||||||
|
if (!socketId) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const socket = this.io.sockets.sockets.get(socketId) as
|
||||||
|
| AuthenticatedSocket
|
||||||
|
| undefined;
|
||||||
|
if (!socket?.data) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
socket.data.senderName = undefined;
|
||||||
|
socket.data.senderNameCachedAt = undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
async maybeTouchPresence(client: AuthenticatedSocket): Promise<void> {
|
||||||
|
const userId = client.data.userId;
|
||||||
|
if (!userId) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const now = Date.now();
|
||||||
|
const lastSeenAt = client.data.lastSeenAt;
|
||||||
|
if (
|
||||||
|
typeof lastSeenAt === 'number' &&
|
||||||
|
now - lastSeenAt < PRESENCE_UPDATE_THROTTLE_MS
|
||||||
|
) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
client.data.lastSeenAt = now;
|
||||||
|
await this.userSocketService.touchLastSeen(userId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user