fixed connection reconnection logic
This commit is contained in:
@@ -9,6 +9,7 @@ use crate::{
|
|||||||
services::{
|
services::{
|
||||||
close_all_windows,
|
close_all_windows,
|
||||||
health_manager::open_health_manager_window,
|
health_manager::open_health_manager_window,
|
||||||
|
health_monitor::{start_health_monitor, stop_health_monitor},
|
||||||
scene::open_scene_window,
|
scene::open_scene_window,
|
||||||
ws::client::{clear_ws_client, establish_websocket_connection},
|
ws::client::{clear_ws_client, establish_websocket_connection},
|
||||||
},
|
},
|
||||||
@@ -39,13 +40,15 @@ async fn connect_user_profile() {
|
|||||||
init_app_data_scoped(AppDataRefreshScope::All).await;
|
init_app_data_scoped(AppDataRefreshScope::All).await;
|
||||||
establish_websocket_connection().await;
|
establish_websocket_connection().await;
|
||||||
start_background_token_refresh().await;
|
start_background_token_refresh().await;
|
||||||
|
start_health_monitor().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Clears the user profile and WebSocket connection.
|
/// Clears the user profile and WebSocket connection.
|
||||||
async fn disconnect_user_profile() {
|
async fn disconnect_user_profile() {
|
||||||
|
stop_health_monitor();
|
||||||
|
stop_background_token_refresh();
|
||||||
clear_app_data();
|
clear_app_data();
|
||||||
clear_ws_client().await;
|
clear_ws_client().await;
|
||||||
stop_background_token_refresh();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Destructs the user session and show health manager window
|
/// Destructs the user session and show health manager window
|
||||||
|
|||||||
72
src-tauri/src/services/health_monitor.rs
Normal file
72
src-tauri/src/services/health_monitor.rs
Normal file
@@ -0,0 +1,72 @@
|
|||||||
|
use crate::{
|
||||||
|
init::lifecycle::{handle_disasterous_failure, validate_server_health},
|
||||||
|
lock_w,
|
||||||
|
services::ws::client::establish_websocket_connection,
|
||||||
|
state::FDOLL,
|
||||||
|
};
|
||||||
|
use tokio::time::{self, Duration};
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
|
use tracing::{info, warn};
|
||||||
|
|
||||||
|
/// Starts a periodic health monitor that validates server connectivity
|
||||||
|
/// and attempts to recover WebSocket connection if health checks fail.
|
||||||
|
pub async fn start_health_monitor() {
|
||||||
|
stop_health_monitor();
|
||||||
|
|
||||||
|
let cancel_token = CancellationToken::new();
|
||||||
|
{
|
||||||
|
let mut guard = lock_w!(FDOLL);
|
||||||
|
guard.network.health_monitor_token = Some(cancel_token.clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let mut interval = time::interval(Duration::from_secs(30)); // Check every 30 seconds
|
||||||
|
let mut consecutive_failures = 0u8;
|
||||||
|
const MAX_FAILURES: u8 = 3;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
_ = cancel_token.cancelled() => {
|
||||||
|
info!("Health monitor stopped");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
_ = interval.tick() => {
|
||||||
|
match validate_server_health().await {
|
||||||
|
Ok(_) => {
|
||||||
|
consecutive_failures = 0;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
consecutive_failures = consecutive_failures.saturating_add(1);
|
||||||
|
warn!(
|
||||||
|
"Health check failed ({}/{}): {}",
|
||||||
|
consecutive_failures, MAX_FAILURES, e
|
||||||
|
);
|
||||||
|
|
||||||
|
if consecutive_failures >= MAX_FAILURES {
|
||||||
|
info!("Server appears unreachable after {} attempts, triggering recovery", MAX_FAILURES);
|
||||||
|
handle_disasterous_failure(Some(format!(
|
||||||
|
"Lost connection to server: {}",
|
||||||
|
e
|
||||||
|
)))
|
||||||
|
.await;
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
// Try to re-establish WebSocket connection
|
||||||
|
info!("Attempting to re-establish WebSocket connection");
|
||||||
|
establish_websocket_connection().await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Stops the health monitor loop.
|
||||||
|
pub fn stop_health_monitor() {
|
||||||
|
let mut guard = lock_w!(FDOLL);
|
||||||
|
if let Some(token) = guard.network.health_monitor_token.take() {
|
||||||
|
token.cancel();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -9,6 +9,7 @@ pub mod client_config_manager;
|
|||||||
pub mod cursor;
|
pub mod cursor;
|
||||||
pub mod doll_editor;
|
pub mod doll_editor;
|
||||||
pub mod health_manager;
|
pub mod health_manager;
|
||||||
|
pub mod health_monitor;
|
||||||
pub mod interaction;
|
pub mod interaction;
|
||||||
pub mod scene;
|
pub mod scene;
|
||||||
pub mod sprite_recolor;
|
pub mod sprite_recolor;
|
||||||
|
|||||||
@@ -19,15 +19,27 @@ pub async fn establish_websocket_connection() {
|
|||||||
|
|
||||||
for _attempt in 1..=MAX_ATTEMPTS {
|
for _attempt in 1..=MAX_ATTEMPTS {
|
||||||
if get_access_token().await.is_some() {
|
if get_access_token().await.is_some() {
|
||||||
init_ws_client().await;
|
if init_ws_client().await {
|
||||||
return;
|
return; // Success
|
||||||
|
} else {
|
||||||
|
// Connection failed, trigger disaster recovery
|
||||||
|
crate::init::lifecycle::handle_disasterous_failure(
|
||||||
|
Some("WebSocket connection failed. Please check your network and try again.".to_string())
|
||||||
|
).await;
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sleep(BACKOFF).await;
|
sleep(BACKOFF).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If we exhausted retries without valid token
|
||||||
|
crate::init::lifecycle::handle_disasterous_failure(
|
||||||
|
Some("Failed to authenticate. Please restart and sign in again.".to_string())
|
||||||
|
).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn init_ws_client() {
|
pub async fn init_ws_client() -> bool {
|
||||||
let app_config = {
|
let app_config = {
|
||||||
let guard = lock_r!(FDOLL);
|
let guard = lock_r!(FDOLL);
|
||||||
guard.app_config.clone()
|
guard.app_config.clone()
|
||||||
@@ -40,10 +52,12 @@ pub async fn init_ws_client() {
|
|||||||
clients.ws_client = Some(ws_client);
|
clients.ws_client = Some(ws_client);
|
||||||
clients.is_ws_initialized = false; // wait for initialized event
|
clients.is_ws_initialized = false; // wait for initialized event
|
||||||
}
|
}
|
||||||
|
true
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("Failed to initialize WebSocket client: {}", e);
|
error!("Failed to initialize WebSocket client: {}", e);
|
||||||
clear_ws_client().await;
|
clear_ws_client().await;
|
||||||
|
false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -53,6 +67,7 @@ pub async fn clear_ws_client() {
|
|||||||
if let Some(clients) = guard.network.clients.as_mut() {
|
if let Some(clients) = guard.network.clients.as_mut() {
|
||||||
clients.ws_client = None;
|
clients.ws_client = None;
|
||||||
clients.is_ws_initialized = false;
|
clients.is_ws_initialized = false;
|
||||||
|
clients.ws_emit_failures = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -2,10 +2,7 @@ use rust_socketio::{Payload, RawClient};
|
|||||||
use tracing::info;
|
use tracing::info;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
lock_w,
|
init::lifecycle::construct_user_session, lock_w, services::health_manager::close_health_manager_window, state::FDOLL
|
||||||
services::health_manager::close_health_manager_window,
|
|
||||||
services::scene::open_scene_window,
|
|
||||||
state::{init_app_data_scoped, AppDataRefreshScope, FDOLL},
|
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::{types::WS_EVENT, utils};
|
use super::{types::WS_EVENT, utils};
|
||||||
@@ -26,33 +23,30 @@ pub fn on_connected(_payload: Payload, socket: RawClient) {
|
|||||||
/// Handler for initialized event
|
/// Handler for initialized event
|
||||||
pub fn on_initialized(payload: Payload, _socket: RawClient) {
|
pub fn on_initialized(payload: Payload, _socket: RawClient) {
|
||||||
if utils::extract_text_value(payload, "initialized").is_ok() {
|
if utils::extract_text_value(payload, "initialized").is_ok() {
|
||||||
let needs_data_refresh = check_and_mark_initialized();
|
let is_reconnection = mark_ws_initialized();
|
||||||
restore_connection_ui();
|
|
||||||
|
|
||||||
if needs_data_refresh {
|
if is_reconnection {
|
||||||
info!("Reconnection detected: refreshing app data");
|
info!("Reconnection detected: reconstructing user session");
|
||||||
tauri::async_runtime::spawn(async {
|
tauri::async_runtime::spawn(async {
|
||||||
init_app_data_scoped(AppDataRefreshScope::All).await;
|
construct_user_session().await;
|
||||||
});
|
});
|
||||||
|
} else {
|
||||||
|
// First-time initialization, just close health manager if open
|
||||||
|
close_health_manager_window();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Mark WebSocket as initialized and check if app data needs refreshing
|
/// Mark WebSocket as initialized and check if this is a reconnection.
|
||||||
///
|
///
|
||||||
/// Returns true if user data is missing (indicating a reconnection
|
/// Returns true if user data is missing (indicating a reconnection
|
||||||
/// after session teardown where app data was cleared).
|
/// after session teardown where app data was cleared).
|
||||||
fn check_and_mark_initialized() -> bool {
|
fn mark_ws_initialized() -> bool {
|
||||||
let mut guard = lock_w!(FDOLL);
|
let mut guard = lock_w!(FDOLL);
|
||||||
if let Some(clients) = guard.network.clients.as_mut() {
|
if let Some(clients) = guard.network.clients.as_mut() {
|
||||||
clients.is_ws_initialized = true;
|
clients.is_ws_initialized = true;
|
||||||
|
clients.ws_emit_failures = 0;
|
||||||
}
|
}
|
||||||
// If user data is gone, we need to re-fetch everything
|
// If user data is gone, we need full session reconstruction
|
||||||
guard.user_data.user.is_none()
|
guard.user_data.user.is_none()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Restore UI after successful connection
|
|
||||||
fn restore_connection_ui() {
|
|
||||||
close_health_manager_window();
|
|
||||||
open_scene_window();
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -3,7 +3,9 @@ use serde::Serialize;
|
|||||||
use tauri::{async_runtime, Emitter};
|
use tauri::{async_runtime, Emitter};
|
||||||
use tracing::{error, warn};
|
use tracing::{error, warn};
|
||||||
|
|
||||||
use crate::{get_app_handle, init::lifecycle::handle_disasterous_failure, lock_r, state::FDOLL};
|
use crate::{
|
||||||
|
get_app_handle, init::lifecycle::handle_disasterous_failure, lock_r, lock_w, state::FDOLL,
|
||||||
|
};
|
||||||
|
|
||||||
/// Acquire WebSocket client and initialization state from app state
|
/// Acquire WebSocket client and initialization state from app state
|
||||||
fn get_ws_state() -> (Option<rust_socketio::client::Client>, bool) {
|
fn get_ws_state() -> (Option<rust_socketio::client::Client>, bool) {
|
||||||
@@ -47,6 +49,27 @@ async fn do_emit<T: Serialize + Send + 'static>(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn handle_soft_emit_failure(err_msg: &str) {
|
||||||
|
const MAX_FAILURES: u8 = 10;
|
||||||
|
let should_reinit = {
|
||||||
|
let mut guard = lock_w!(FDOLL);
|
||||||
|
if let Some(clients) = guard.network.clients.as_mut() {
|
||||||
|
clients.ws_emit_failures = clients.ws_emit_failures.saturating_add(1);
|
||||||
|
clients.ws_emit_failures >= MAX_FAILURES
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if should_reinit {
|
||||||
|
warn!("WebSocket emit failed {} times, reinitializing connection", MAX_FAILURES);
|
||||||
|
let _ = crate::services::ws::client::clear_ws_client().await;
|
||||||
|
crate::services::ws::client::establish_websocket_connection().await;
|
||||||
|
} else {
|
||||||
|
warn!("[non-critical] {}", err_msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Emit critical data to WebSocket server
|
/// Emit critical data to WebSocket server
|
||||||
///
|
///
|
||||||
/// On failure, triggers disaster recovery (session teardown + health manager).
|
/// On failure, triggers disaster recovery (session teardown + health manager).
|
||||||
@@ -78,7 +101,7 @@ pub async fn ws_emit_soft<T: Serialize + Send + 'static>(
|
|||||||
match do_emit(event, payload).await {
|
match do_emit(event, payload).await {
|
||||||
Ok(_) => Ok(()),
|
Ok(_) => Ok(()),
|
||||||
Err(err_msg) => {
|
Err(err_msg) => {
|
||||||
warn!("[non-critical] {}", err_msg);
|
handle_soft_emit_failure(&err_msg).await;
|
||||||
Err(err_msg)
|
Err(err_msg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,19 +1,22 @@
|
|||||||
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Clients {
|
pub struct Clients {
|
||||||
pub http_client: reqwest::Client,
|
pub http_client: reqwest::Client,
|
||||||
pub ws_client: Option<rust_socketio::client::Client>,
|
pub ws_client: Option<rust_socketio::client::Client>,
|
||||||
pub is_ws_initialized: bool,
|
pub is_ws_initialized: bool,
|
||||||
|
pub ws_emit_failures: u8,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct NetworkState {
|
pub struct NetworkState {
|
||||||
pub clients: Option<Clients>,
|
pub clients: Option<Clients>,
|
||||||
|
pub health_monitor_token: Option<tokio_util::sync::CancellationToken>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for NetworkState {
|
impl Default for NetworkState {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self { clients: None }
|
Self {
|
||||||
|
clients: None,
|
||||||
|
health_monitor_token: None,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -30,6 +33,8 @@ pub fn init_network_state() -> NetworkState {
|
|||||||
http_client,
|
http_client,
|
||||||
ws_client: None,
|
ws_client: None,
|
||||||
is_ws_initialized: false,
|
is_ws_initialized: false,
|
||||||
|
ws_emit_failures: 0,
|
||||||
}),
|
}),
|
||||||
|
health_monitor_token: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Reference in New Issue
Block a user