Set up a websocket connect function that uses a ping/pong protocol to
Some checks failed
Monorepo build / build-flake (push) Has been cancelled

detect actual connection to server.
This commit is contained in:
2026-03-02 22:59:25 -05:00
parent e76d6ec4cd
commit 46901597d8
10 changed files with 188 additions and 30 deletions

1
Cargo.lock generated
View File

@@ -5103,6 +5103,7 @@ dependencies = [
"serde_json",
"stylist",
"thiserror 2.0.18",
"utils",
"visions-types",
"wasm-bindgen",
"wasm-bindgen-futures",

View File

@@ -3,5 +3,5 @@ name = "utils"
version = "0.1.0"
edition = "2024"
[dependencies]
[target.'cfg(not(target_family = "wasm"))'.dependencies]
tokio.workspace = true

View File

@@ -25,6 +25,10 @@ impl<V: Clone + Copy + PartialOrd> Bounded<V> {
max,
}
}
pub fn set(&mut self, value: V) {
self.current = clamp(value, self.min, self.max);
}
}
impl<V: PartialEq> PartialEq for Bounded<V> {

View File

@@ -1,7 +1,9 @@
mod bounded;
pub use bounded::*;
#[cfg(not(target_family = "wasm"))]
mod retry;
#[cfg(not(target_family = "wasm"))]
pub use retry::*;
#[macro_export]

View File

@@ -1572,26 +1572,31 @@ impl App {
Ok(Ok(user.id.clone()))
}
pub async fn handle_message(&self, user_id: UserId, msg: GameRequest) -> Result<(), Fatal> {
pub async fn handle_message(
&self,
user_id: UserId,
msg: GameRequest,
) -> Result<Option<GameMessage>, Fatal> {
match msg {
GameRequest::JoinGame(_, _) => Ok(()),
GameRequest::Ping => Ok(Some(GameMessage::Pong)),
GameRequest::JoinGame(_, _) => Ok(None),
GameRequest::SetScene(game_id, scene_id) => {
let scene = match self.set_scene(&user_id, &game_id, &scene_id).await? {
Ok(scene) => scene,
Err(err) => {
eprintln!("Unable to set scene: {:?}", err);
return Ok(());
return Ok(None);
}
};
let user = match self.user(&user_id).await? {
Ok(Some(user)) => user,
Ok(None) => {
return Ok(());
return Ok(None);
}
Err(err) => {
eprintln!("Could not retrieve user: {:?}", err);
return Ok(());
return Ok(None);
}
};
@@ -1608,7 +1613,7 @@ impl App {
Ok(images) => images,
Err(err) => {
eprintln!("Could not retrieve images: {:?}", err);
return Ok(());
return Ok(None);
}
};
@@ -1628,7 +1633,7 @@ impl App {
.broadcast_to_game(&game_id, GameMessage::ImagesInScene(images))
.await?;
Ok(())
Ok(None)
}
GameRequest::SetBackground(_) => unimplemented!(),
GameRequest::CardNew(_card) => {
@@ -1640,22 +1645,22 @@ impl App {
Ok(_) => GameMessage::Card(card),
Err(err) => {
eprintln!("Unable to update card: {:?}", err);
return Ok(());
return Ok(None);
}
};
self.inner.read().await.send_to_user(&user_id, msg).await;
Ok(())
Ok(None)
}
GameRequest::Character(id) => {
let msg = match self.charsheet(&user_id, &id).await? {
Ok(sheet) => GameMessage::Charsheet(sheet),
Err(err) => {
eprintln!("Unable to set scene: {:?}", err);
return Ok(());
return Ok(None);
}
};
self.inner.read().await.send_to_user(&user_id, msg).await;
Ok(())
Ok(None)
}
GameRequest::CharacterUpdate(sheet) => {
match self.update_character(&user_id, sheet.clone()).await? {
@@ -1683,7 +1688,7 @@ impl App {
.await
}
}
Ok(())
Ok(None)
}
}
}

View File

@@ -71,10 +71,43 @@ async fn send_message(socket: &mut WebSocket, message: GameMessage) -> Result<()
}
async fn handle_game_socket(mut socket: WebSocket, app: App) -> Result<(), Fatal> {
let request = receive_game_request(&mut socket).await;
eprintln!("request: {:?}", request);
let websocket_listener = tokio::spawn(async move {
loop {
match socket.recv().await {
Some(Ok(Message::Text(text))) => {
let request = serde_json::from_str::<GameRequest>(&text).unwrap();
match request {
GameRequest::Ping => {
let text = serde_json::to_string(&GameMessage::Pong)
.map(Utf8Bytes::from)
.unwrap();
socket.send(Message::Text(text)).await;
}
_ => {
eprintln!("Need to service {:?}", request);
}
}
}
_ => {
eprintln!("failed to receive message");
break;
}
}
}
});
websocket_listener.await;
/*
let (user_id, mut receive_from_dispatch) = match request {
Ok(Some(GameRequest::Ping)) => {
let pong = serde_json::to_string(&GameMessage::Pong)
.map(|js| Utf8Bytes::from(&js))
.map_err(|_| GameSendError::EncodingError)
.unwrap();
socket.send(Message::Text(pong));
return Ok(());
}
Ok(Some(GameRequest::JoinGame(session_id, game_id))) => {
let (sender, receive_from_dispatch) = unbounded_channel::<GameMessage>();
@@ -107,7 +140,16 @@ async fn handle_game_socket(mut socket: WebSocket, app: App) -> Result<(), Fatal
match msg {
Message::Text(text) => {
if let Ok(request) = serde_json::from_str::<GameRequest>(&text) {
app.handle_message(user_id.clone(), request).await?;
let response = app.handle_message(user_id.clone(), request).await?;
match response {
Some(message) => {
let text = serde_json::to_string(&message)
.map(|js| Utf8Bytes::from(&js))
.map_err(|_| GameSendError::EncodingError).unwrap();
socket.send(Message::Text(text)).await.unwrap();
}
None => {},
}
} else {
eprintln!("Invalid request format");
}
@@ -129,4 +171,6 @@ async fn handle_game_socket(mut socket: WebSocket, app: App) -> Result<(), Fatal
}
}
Ok(())
*/
Ok(())
}

View File

@@ -15,6 +15,7 @@ pub struct PwaConfig {
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(tag = "type", content = "content", rename_all = "kebab-case")]
pub enum GameRequest {
Ping,
JoinGame(SessionId, GameId),
SetScene(GameId, SceneId),
SetBackground(String),
@@ -27,6 +28,7 @@ pub enum GameRequest {
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[serde(tag = "type", content = "content", rename_all = "kebab-case")]
pub enum GameMessage {
Pong,
ImagesInScene(Vec<Image>),
AvailableScenes(Vec<(SceneId, String)>),
Background(String),

View File

@@ -39,6 +39,7 @@ web-sys = { workspace = true, features = ["Clipboard",
webauthn-rs-proto = { workspace = true, features = ["wasm"] }
yew = { workspace = true }
yew-router = { workspace = true }
utils = { path = "../../utils"}
[dev-dependencies]
cool_asserts = "2.0.3"

View File

@@ -1,10 +1,15 @@
use std::future::Future;
use std::{
future::Future,
sync::{Arc, RwLock},
time::{Duration, SystemTime},
};
use gloo_console::{error, log};
use gloo_net::http::{QueryParams, Request, Response};
use http::StatusCode;
use serde::de::DeserializeOwned;
use thiserror::Error;
use utils::Bounded;
use visions_types::*;
use wasm_sockets::{EventClient, Message};
use web_sys::CloseEvent;
@@ -150,6 +155,8 @@ pub trait Client: Clone + PartialEq {
on_message: Callback<GameMessage>,
on_socket_closed: Box<dyn Fn(CloseEvent)>,
) -> Result<EventClient, ClientError>;
fn connect_websocket(&self) -> impl Future<Output = Result<EventClient, ClientError>>;
}
#[derive(Clone, PartialEq)]
@@ -865,12 +872,6 @@ impl Client for Connection {
) -> Result<EventClient, ClientError> {
let mut ws_client = EventClient::new("/api/ws").unwrap();
ws_client.set_on_error(Some(Box::new(|error| {
error!("{:#?}", error);
})));
ws_client.set_on_close(Some(on_socket_closed));
let session_id = self.session_id.clone();
clone!(
(session_id, game_id),
@@ -901,7 +902,100 @@ impl Client for Connection {
Message::Binary(_) => unimplemented!(),
}
})));
return Ok(ws_client);
}
Ok(ws_client)
async fn connect_websocket(&self) -> Result<EventClient, ClientError> {
let nonce = UserId::default().to_string();
let tries = Arc::new(RwLock::new(Bounded::new(0, 0, 5)));
let state = Arc::new(RwLock::new(WebsocketState::Idle));
Ok(connect(nonce, tries, state))
}
}
const BACKOFF: LazyCell<Vec<Duration>> = LazyCell::new(|| {
vec![
Duration::from_millis(250),
Duration::from_millis(500),
Duration::from_millis(1000),
Duration::from_millis(2000),
Duration::from_millis(5000),
]
});
#[derive(Debug)]
enum WebsocketState {
Idle,
Connecting,
Connected,
}
fn connect(
nonce: String,
tries: Arc<RwLock<Bounded<usize>>>,
state: Arc<RwLock<WebsocketState>>,
) -> EventClient {
let mut client = EventClient::new("/api/ws").unwrap();
client.set_on_connection(Some(Box::new(clone!(
(nonce, tries, state),
move |client| {
log!(format!(
"on_connection: [{}] {:?} {:?}",
nonce,
tries.read(),
state.read(),
));
client
.send_string(&serde_json::to_string(&GameRequest::Ping).unwrap())
.unwrap();
}
))));
client.set_on_message(Some(Box::new(clone!(
(nonce, tries, state),
move |client, message| {
log!(format!("message: {:?}", message));
let message: GameMessage = if let Message::Text(text) = message {
serde_json::from_str(&text).unwrap()
} else {
return;
};
match message {
GameMessage::Pong => {
log!("connection successful");
tries.write().unwrap().set(0);
*state.write().unwrap() = WebsocketState::Connected;
}
_ => {
log!("not handling all messages");
}
}
}
))));
client.set_on_close(Some(Box::new(clone!((nonce, tries, state), move |_evt| {
log!(format!(
"on_close: [{}] {:?} {:?}",
nonce,
tries.read(),
state.read()
));
// TODO: implement backoff
let mut t = tries.write().unwrap();
*t = *t + 1;
connect(nonce.clone(), tries.clone(), state.clone());
}))));
client.set_on_error(Some(Box::new(clone!((nonce, tries, state), move |_evt| {
log!(format!(
"on_error: [{}] {:?} {:?}",
nonce,
tries.read(),
state.read()
));
}))));
client
}

View File

@@ -55,6 +55,7 @@ pub fn WebsocketProvider<C: Client + Clone + 'static>(
use_effect_with(
(client.clone(), game_id.clone(), on_message.clone()),
clone!(socket_state, move |(client, game_id, on_message)| {
/*
let ws_client = client.connect_to_game(
game_id,
on_message.clone(),
@@ -63,12 +64,16 @@ pub fn WebsocketProvider<C: Client + Clone + 'static>(
socket_state.set(SocketState { socket: None })
})),
);
match ws_client {
Ok(ws_client) => socket_state.set(SocketState {
socket: Some(Rc::new(ws_client)),
}),
Err(err) => error!("failed to connect to websocket: ", err.to_string()),
}
*/
wasm_bindgen_futures::spawn_local(clone!((client, socket_state), async move {
let ws_client = client.connect_websocket().await;
match ws_client {
Ok(ws_client) => socket_state.set(SocketState {
socket: Some(Rc::new(ws_client)),
}),
Err(err) => error!("failed to connect to websocket: ", err.to_string()),
}
}));
{
let socket_state = socket_state.clone();