sse setup

This commit is contained in:
Zynh0722 2023-11-04 01:59:58 -07:00
parent 24aaac1f74
commit 3de7389500
3 changed files with 66 additions and 18 deletions

View file

@ -125,16 +125,9 @@ async fn add_drink(
async fn ada_subscribe( async fn ada_subscribe(
State(state): State<AppState>, State(state): State<AppState>,
) -> Sse<impl Stream<Item = Result<Event, BroadcastStreamRecvError>>> { ) -> Sse<impl Stream<Item = Result<Event, BroadcastStreamRecvError>>> {
let stream = tokio_stream::wrappers::BroadcastStream::new(state.ada_sender.subscribe()) let stream =
.map(|r| r.map(|s| Event::default().event("ada").data(s))); tokio_stream::wrappers::BroadcastStream::new(state.sse_handler.ada_sender.subscribe())
.map(|r| r.map(|s| Event::default().event("ada").data(s)));
Sse::new(stream).keep_alive(KeepAlive::default()) Sse::new(stream).keep_alive(KeepAlive::default())
} }
async fn get_ada_list() -> String {
let mut buf = Vec::new();
crate::templates::components::ada_list_html(&mut buf).unwrap();
String::from_utf8(buf).unwrap()
}

View file

@ -1,5 +1,6 @@
mod api; mod api;
mod axum_ructe; mod axum_ructe;
mod sse_handler;
use axum_ructe::render; use axum_ructe::render;
@ -26,7 +27,8 @@ use diesel_async::{
}; };
use dotenvy::dotenv; use dotenvy::dotenv;
use std::net::SocketAddr; use sse_handler::SseHandler;
use std::{net::SocketAddr, time::Duration};
use tower_http::services::{ServeDir, ServeFile}; use tower_http::services::{ServeDir, ServeFile};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
@ -45,22 +47,17 @@ fn establish_connection() -> Pool<AsyncMysqlConnection> {
.expect("Error making connection pool") .expect("Error making connection pool")
} }
#[derive(Clone)]
enum AdaUpdate {
RefreshDancers,
}
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct AppState { pub(crate) struct AppState {
connection: Pool<AsyncMysqlConnection>, connection: Pool<AsyncMysqlConnection>,
ada_sender: tokio::sync::broadcast::Sender<AdaUpdate>, sse_handler: SseHandler,
} }
impl AppState { impl AppState {
fn init() -> Self { fn init() -> Self {
Self { Self {
connection: establish_connection(), connection: establish_connection(),
ada_sender: tokio::sync::broadcast::channel(10).0, sse_handler: SseHandler::init(),
} }
} }
} }
@ -78,6 +75,19 @@ async fn main() {
let state = AppState::init(); let state = AppState::init();
tokio::spawn({
let sender = state.clone().sse_handler.sse_sender;
async move {
loop {
sender
.send(sse_handler::SseMessage::RefreshAda)
.await
.expect("Failed to send sse message");
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
});
let fallback_handler = ServeDir::new("dist").not_found_service(ServeFile::new("dist/404.html")); let fallback_handler = ServeDir::new("dist").not_found_service(ServeFile::new("dist/404.html"));
// build our application with a route // build our application with a route

45
src/sse_handler.rs Normal file
View file

@ -0,0 +1,45 @@
#[derive(Debug, Clone)]
pub enum SseMessage {
RefreshAda,
}
#[derive(Debug, Clone)]
pub struct SseHandler {
pub ada_sender: tokio::sync::broadcast::Sender<String>,
pub sse_sender: tokio::sync::mpsc::Sender<SseMessage>,
}
impl SseHandler {
pub fn init() -> Self {
let (ada_sender, _) = tokio::sync::broadcast::channel(10);
let (sse_sender, mut sse_receiver) = tokio::sync::mpsc::channel(10);
tokio::spawn({
let sender = ada_sender.clone();
async move {
while let Some(message) = sse_receiver.recv().await {
match message {
SseMessage::RefreshAda => {
if sender.receiver_count() > 0 {
sender.send(get_ada_list().await).unwrap();
}
}
};
}
}
});
Self {
ada_sender,
sse_sender,
}
}
}
async fn get_ada_list() -> String {
let mut buf = Vec::new();
crate::templates::components::ada_list_html(&mut buf).unwrap();
String::from_utf8(buf).unwrap()
}