use crate::server::{Connect, Disconnect, Server, ServerMessage}; use actix::{ dev::ContextFutureSpawner, fut, Actor, ActorContext, ActorFutureExt, Addr, AsyncContext, Handler, Running, StreamHandler, WrapFuture, }; use actix_web::web::Data; use actix_web_actors::ws; use std::{ sync::Arc, time::{Duration, Instant}, }; const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); const CLIENT_TIMEOUT: Duration = Duration::from_secs(30); pub struct Session { id: usize, server: Arc>, heartbeat: Instant, } impl Session { pub fn new(server: Data>>) -> Session { Session { id: 0, server: server.get_ref().clone(), heartbeat: Instant::now(), } } fn heartbeat(&self, ctx: &mut ws::WebsocketContext) { ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { if Instant::now().duration_since(act.heartbeat) > CLIENT_TIMEOUT { act.server.do_send(Disconnect { id: act.id }); ctx.stop(); return; } ctx.ping(b""); }); } } impl Actor for Session { type Context = ws::WebsocketContext; fn started(&mut self, ctx: &mut Self::Context) { self.heartbeat(ctx); self.server .send(Connect { addr: ctx.address().recipient(), }) .into_actor(self) .then(|res, act, ctx| { match res { Ok(res) => act.id = res, _ => ctx.stop(), } fut::ready(()) }) .wait(ctx); } fn stopping(&mut self, _: &mut Self::Context) -> Running { self.server.do_send(Disconnect { id: self.id }); Running::Stop } } impl Handler for Session { type Result = (); fn handle(&mut self, msg: ServerMessage, ctx: &mut Self::Context) { ctx.text(msg.0); } } impl StreamHandler> for Session { fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { match msg { Ok(ws::Message::Ping(msg)) => { self.heartbeat = Instant::now(); ctx.pong(&msg); } Ok(ws::Message::Pong(_)) => { self.heartbeat = Instant::now(); } Ok(ws::Message::Text(text)) => ctx.text(text), Ok(ws::Message::Binary(bin)) => ctx.binary(bin), _ => (), } } }