BROKEN: Switching to dedicated handler task

This commit is contained in:
Zynh0722 2023-11-03 07:55:24 -07:00
parent 1494c4f048
commit 24aaac1f74
5 changed files with 43 additions and 7 deletions

1
Cargo.lock generated
View file

@ -2346,6 +2346,7 @@ dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
"tokio-util",
]
[[package]]

View file

@ -18,7 +18,7 @@ tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tower-http = { version = "0.4.4", features = ["full"] }
ructe.workspace = true
tokio-stream = "0.1.14"
tokio-stream = { version = "0.1.14", features = ["sync"] }
futures-util = "0.3.28"
diesel = { version = "2.1.0", features = ["chrono"] }
diesel-async = { version = "0.4.1", features = ["mysql", "deadpool"] }

View file

@ -1,11 +1,21 @@
use axum::extract::Path;
use axum::Form;
use axum::{extract::State, response::IntoResponse, routing::post};
use cm_lib::models::{NewDrink, Shift};
use cm_lib::schema::shifts;
use axum::{
extract::{Path, State},
response::{
sse::{Event, KeepAlive},
IntoResponse, Sse,
},
routing::{get, post},
Form,
};
use cm_lib::{
models::{NewDrink, Shift},
schema::shifts,
};
use diesel::{ExpressionMethods, OptionalExtension, QueryDsl, SelectableHelper};
use diesel_async::{scoped_futures::ScopedFutureExt, AsyncConnection, RunQueryDsl};
use futures_util::Stream;
use serde::Deserialize;
use tokio_stream::{wrappers::errors::BroadcastStreamRecvError, StreamExt as _};
use crate::axum_ructe::render;
use crate::AppState;
@ -15,6 +25,7 @@ pub(crate) fn router() -> axum::Router<AppState> {
.route("/drinks", post(add_drink))
.route("/shifts/open", post(open_shift))
.route("/shifts/:id/close", post(close_shift))
.route("/ada/updates", get(ada_subscribe))
}
async fn open_shift(State(state): State<AppState>) -> impl IntoResponse {
@ -110,3 +121,20 @@ async fn add_drink(
render!(crate::templates::home_html, Some(open_shift)),
)
}
async fn ada_subscribe(
State(state): State<AppState>,
) -> Sse<impl Stream<Item = Result<Event, BroadcastStreamRecvError>>> {
let stream = tokio_stream::wrappers::BroadcastStream::new(state.ada_sender.subscribe())
.map(|r| r.map(|s| Event::default().event("ada").data(s)));
Sse::new(stream).keep_alive(KeepAlive::default())
}
async fn get_ada_list() -> String {
let mut buf = Vec::new();
crate::templates::components::ada_list_html(&mut buf).unwrap();
String::from_utf8(buf).unwrap()
}

View file

@ -45,15 +45,22 @@ fn establish_connection() -> Pool<AsyncMysqlConnection> {
.expect("Error making connection pool")
}
#[derive(Clone)]
enum AdaUpdate {
RefreshDancers,
}
#[derive(Clone)]
pub(crate) struct AppState {
connection: Pool<AsyncMysqlConnection>,
ada_sender: tokio::sync::broadcast::Sender<AdaUpdate>,
}
impl AppState {
fn init() -> Self {
Self {
connection: establish_connection(),
ada_sender: tokio::sync::broadcast::channel(10).0,
}
}
}

View file

@ -5,7 +5,7 @@
@:base_html({
<div style="display: flex; flex-direction: column;">
<div style="display: flex; flex-direction: column;" hx-ext="sse" sse-connect="/api/ada/updates" sse-swap="ada">
@:ada_list_html()
</div>