From 24aaac1f74eff52273f6095e9c2a46a4628a74d6 Mon Sep 17 00:00:00 2001 From: Zynh0722 Date: Fri, 3 Nov 2023 07:55:24 -0700 Subject: [PATCH] BROKEN: Switching to dedicated handler task --- Cargo.lock | 1 + Cargo.toml | 2 +- src/api.rs | 38 +++++++++++++++++++++++++++++++++----- src/main.rs | 7 +++++++ templates/ada.rs.html | 2 +- 5 files changed, 43 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index deef447..d82d29b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2346,6 +2346,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 24e0482..649e120 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } tower-http = { version = "0.4.4", features = ["full"] } ructe.workspace = true -tokio-stream = "0.1.14" +tokio-stream = { version = "0.1.14", features = ["sync"] } futures-util = "0.3.28" diesel = { version = "2.1.0", features = ["chrono"] } diesel-async = { version = "0.4.1", features = ["mysql", "deadpool"] } diff --git a/src/api.rs b/src/api.rs index 5a2b2b2..a4e89ed 100644 --- a/src/api.rs +++ b/src/api.rs @@ -1,11 +1,21 @@ -use axum::extract::Path; -use axum::Form; -use axum::{extract::State, response::IntoResponse, routing::post}; -use cm_lib::models::{NewDrink, Shift}; -use cm_lib::schema::shifts; +use axum::{ + extract::{Path, State}, + response::{ + sse::{Event, KeepAlive}, + IntoResponse, Sse, + }, + routing::{get, post}, + Form, +}; +use cm_lib::{ + models::{NewDrink, Shift}, + schema::shifts, +}; use diesel::{ExpressionMethods, OptionalExtension, QueryDsl, SelectableHelper}; use diesel_async::{scoped_futures::ScopedFutureExt, AsyncConnection, RunQueryDsl}; +use futures_util::Stream; use serde::Deserialize; +use tokio_stream::{wrappers::errors::BroadcastStreamRecvError, StreamExt as _}; use crate::axum_ructe::render; use crate::AppState; @@ -15,6 +25,7 @@ pub(crate) fn router() -> axum::Router { .route("/drinks", post(add_drink)) .route("/shifts/open", post(open_shift)) .route("/shifts/:id/close", post(close_shift)) + .route("/ada/updates", get(ada_subscribe)) } async fn open_shift(State(state): State) -> impl IntoResponse { @@ -110,3 +121,20 @@ async fn add_drink( render!(crate::templates::home_html, Some(open_shift)), ) } + +async fn ada_subscribe( + State(state): State, +) -> Sse>> { + let stream = tokio_stream::wrappers::BroadcastStream::new(state.ada_sender.subscribe()) + .map(|r| r.map(|s| Event::default().event("ada").data(s))); + + Sse::new(stream).keep_alive(KeepAlive::default()) +} + +async fn get_ada_list() -> String { + let mut buf = Vec::new(); + + crate::templates::components::ada_list_html(&mut buf).unwrap(); + + String::from_utf8(buf).unwrap() +} diff --git a/src/main.rs b/src/main.rs index f172479..095dcd2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -45,15 +45,22 @@ fn establish_connection() -> Pool { .expect("Error making connection pool") } +#[derive(Clone)] +enum AdaUpdate { + RefreshDancers, +} + #[derive(Clone)] pub(crate) struct AppState { connection: Pool, + ada_sender: tokio::sync::broadcast::Sender, } impl AppState { fn init() -> Self { Self { connection: establish_connection(), + ada_sender: tokio::sync::broadcast::channel(10).0, } } } diff --git a/templates/ada.rs.html b/templates/ada.rs.html index e485ba6..c921bbe 100644 --- a/templates/ada.rs.html +++ b/templates/ada.rs.html @@ -5,7 +5,7 @@ @:base_html({ -
+
@:ada_list_html()