Compare commits

...

10 Commits

Author SHA1 Message Date
Zynh0722 be47a0afec type 2023-11-07 01:37:22 -08:00
Zynh0722 3102e77c82 comments! 2023-11-06 20:42:50 -08:00
Zynh0722 cfb9a38c78 remove unneeded select
thanks static typing
2023-11-06 20:42:32 -08:00
Zynh0722 40a5b99f02 use prelude instead of specific imports for diesel traits
sooooo many traits
2023-11-06 20:42:20 -08:00
Zynh0722 da718cf238 switch to using schema dsl for shift_reports
why? when I know damned well that this function is getting dumpstered when I start working on this portion of the code? /shrug
2023-11-06 20:41:53 -08:00
Zynh0722 a9eef2b68f more accurate expect comment 2023-11-06 20:40:50 -08:00
Zynh0722 a22cb3b306 handle adding drinks when no open shift exists 2023-11-06 20:16:27 -08:00
Zynh0722 58fa1864d1 remove unneeded async 2023-11-06 18:36:49 -08:00
Zynh0722 c31c8e91ac throttle updates 2023-11-06 17:54:10 -08:00
Zynh0722 239b332b48 replace qualified path with use 2023-11-06 12:14:50 -08:00
4 changed files with 52 additions and 22 deletions

View File

@ -18,7 +18,7 @@ tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tower-http = { version = "0.4.4", features = ["full"] } tower-http = { version = "0.4.4", features = ["full"] }
ructe.workspace = true ructe.workspace = true
tokio-stream = { version = "0.1.14", features = ["sync"] } tokio-stream = { version = "0.1.14", features = ["sync", "time"] }
futures-util = "0.3.28" futures-util = "0.3.28"
diesel = { version = "2.1.0", features = ["chrono"] } diesel = { version = "2.1.0", features = ["chrono"] }
diesel-async = { version = "0.4.1", features = ["mysql", "deadpool"] } diesel-async = { version = "0.4.1", features = ["mysql", "deadpool"] }

View File

@ -1,8 +1,11 @@
use std::time::Duration;
use axum::{ use axum::{
extract::{Path, State}, extract::{Path, State},
http::StatusCode,
response::{ response::{
sse::{Event, KeepAlive}, sse::{Event, KeepAlive},
IntoResponse, Sse, IntoResponse, Response, Sse,
}, },
routing::{get, post}, routing::{get, post},
Form, Form,
@ -17,7 +20,10 @@ use diesel::{ExpressionMethods, OptionalExtension, QueryDsl, SelectableHelper};
use diesel_async::{scoped_futures::ScopedFutureExt, AsyncConnection, RunQueryDsl}; use diesel_async::{scoped_futures::ScopedFutureExt, AsyncConnection, RunQueryDsl};
use futures_util::Stream; use futures_util::Stream;
use serde::Deserialize; use serde::Deserialize;
use tokio_stream::{wrappers::errors::BroadcastStreamRecvError, StreamExt as _}; use tokio_stream::{
wrappers::{errors::BroadcastStreamRecvError, BroadcastStream},
StreamExt as _,
};
use crate::AppState; use crate::AppState;
@ -94,7 +100,7 @@ async fn add_drink(
State(state): State<AppState>, State(state): State<AppState>,
HxRequest(hx): HxRequest, HxRequest(hx): HxRequest,
Form(form): Form<DrinkForm>, Form(form): Form<DrinkForm>,
) -> impl IntoResponse { ) -> Response {
let mut conn = state.connection.get().await.unwrap(); let mut conn = state.connection.get().await.unwrap();
let open_shift: Option<Shift> = { let open_shift: Option<Shift> = {
@ -108,9 +114,18 @@ async fn add_drink(
.optional() .optional()
.expect("Query failed: No open shifts found") .expect("Query failed: No open shifts found")
}; };
let open_shift = open_shift.unwrap();
async { if open_shift.is_none() {
return (
StatusCode::CONFLICT,
"Unable to add a drink when no shift is open",
)
.into_response();
}
let open_shift = unsafe { open_shift.unwrap_unchecked() };
{
use cm_lib::schema::drinks::dsl::*; use cm_lib::schema::drinks::dsl::*;
diesel::insert_into(drinks) diesel::insert_into(drinks)
@ -118,8 +133,7 @@ async fn add_drink(
.execute(&mut conn) .execute(&mut conn)
.await .await
.unwrap() .unwrap()
} };
.await;
let mut headers = axum::http::HeaderMap::new(); let mut headers = axum::http::HeaderMap::new();
headers.insert("HX-Push-Url", "/".parse().unwrap()); headers.insert("HX-Push-Url", "/".parse().unwrap());
@ -127,6 +141,7 @@ async fn add_drink(
headers, headers,
render!(crate::templates::home_html, Some(open_shift), !hx), render!(crate::templates::home_html, Some(open_shift), !hx),
) )
.into_response()
} }
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug)]
@ -178,9 +193,9 @@ async fn add_dancer(
async fn ada_subscribe( async fn ada_subscribe(
State(state): State<AppState>, State(state): State<AppState>,
) -> Sse<impl Stream<Item = Result<Event, BroadcastStreamRecvError>>> { ) -> Sse<impl Stream<Item = Result<Event, BroadcastStreamRecvError>>> {
let stream = let stream = BroadcastStream::new(state.sse_handler.ada_sender.subscribe())
tokio_stream::wrappers::BroadcastStream::new(state.sse_handler.ada_sender.subscribe()) .map(|r| r.map(|s| Event::default().event("ada").data(s)))
.map(|r| r.map(|s| Event::default().event("ada").data(s))); .throttle(Duration::from_secs(1));
Sse::new(stream).keep_alive(KeepAlive::default()) Sse::new(stream).keep_alive(KeepAlive::default())
} }

View File

@ -20,9 +20,7 @@ use axum::{
Router, Router,
}; };
use diesel::{ use diesel::prelude::*;
result::OptionalExtension, BelongingToDsl, ExpressionMethods, QueryDsl, SelectableHelper,
};
use diesel_async::{ use diesel_async::{
pooled_connection::{deadpool::Pool, AsyncDieselConnectionManager}, pooled_connection::{deadpool::Pool, AsyncDieselConnectionManager},
@ -115,11 +113,10 @@ async fn root(State(state): State<AppState>, HxRequest(hx): HxRequest) -> impl I
shifts shifts
.filter(end.is_null()) .filter(end.is_null())
.select(Shift::as_select())
.first(&mut conn) .first(&mut conn)
.await .await
.optional() .optional()
.expect("Query failed: No open shifts found") .expect("Query failed")
}; };
tracing::debug!("{open_shift:?}"); tracing::debug!("{open_shift:?}");
@ -132,6 +129,10 @@ async fn drinks(Path(id): Path<u32>) -> impl IntoResponse {
} }
async fn shift_report(State(state): State<AppState>, Path(id): Path<u32>) -> impl IntoResponse { async fn shift_report(State(state): State<AppState>, Path(id): Path<u32>) -> impl IntoResponse {
// TODO: Massive rework of how this works
// was experimenting with traits, but most of that
// didn't lead to what I was aiming for
let mut conn = state.connection.get().await.unwrap(); let mut conn = state.connection.get().await.unwrap();
let shift: Shift = shifts::table.find(id).first(&mut conn).await.unwrap(); let shift: Shift = shifts::table.find(id).first(&mut conn).await.unwrap();
let drinks: Vec<Drink> = Drink::belonging_to(&shift).load(&mut conn).await.unwrap(); let drinks: Vec<Drink> = Drink::belonging_to(&shift).load(&mut conn).await.unwrap();
@ -143,13 +144,13 @@ async fn shift_report(State(state): State<AppState>, Path(id): Path<u32>) -> imp
} }
async fn shift_reports(State(state): State<AppState>) -> impl IntoResponse { async fn shift_reports(State(state): State<AppState>) -> impl IntoResponse {
let mut conn = state.connection.get().await.unwrap(); let mut shifts: Vec<Shift> = {
use cm_lib::schema::shifts::dsl::*;
let mut shifts: Vec<Shift> = shifts::table let mut conn = state.connection.get().await.unwrap();
.select(Shift::as_select())
.load(&mut conn) shifts.load(&mut conn).await.unwrap()
.await };
.unwrap();
shifts.sort_by(|a, b| b.start.cmp(&a.start)); shifts.sort_by(|a, b| b.start.cmp(&a.start));

View File

@ -3,6 +3,17 @@ pub enum SseMessage {
RefreshAda, RefreshAda,
} }
// I think for the ada_sender, a watch would be ideal if I want to always use full renders of the
// watchlist, however if I want more atomic updates, it may be useful to keep the broadcast's
// buffer and return an html component that requests a complete rerender if the handler lagged
/// A container struct for keeping access to the channel senders for both channels used in the sse
/// pipeline
///
/// For something wishing to listen in on the broadcast html stream, you should call `.subscribe()`
/// on ada_sender
///
/// Anything wishing to send to sse should pass an [SseMessage] to the [tokio::sync::mpsc::Sender]
/// in sse_sender
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct SseHandler { pub struct SseHandler {
pub ada_sender: tokio::sync::broadcast::Sender<String>, pub ada_sender: tokio::sync::broadcast::Sender<String>,
@ -10,6 +21,8 @@ pub struct SseHandler {
} }
impl SseHandler { impl SseHandler {
/// This function initializes the channels used for sse, as well as spawning a manager task for
/// handling sse messages
pub fn init() -> Self { pub fn init() -> Self {
let (ada_sender, _) = tokio::sync::broadcast::channel(10); let (ada_sender, _) = tokio::sync::broadcast::channel(10);
let (sse_sender, mut sse_receiver) = tokio::sync::mpsc::channel(10); let (sse_sender, mut sse_receiver) = tokio::sync::mpsc::channel(10);
@ -37,6 +50,7 @@ impl SseHandler {
} }
async fn get_ada_list() -> String { async fn get_ada_list() -> String {
// TODO: need to scope out if these errors need to be handled
let mut buf = Vec::new(); let mut buf = Vec::new();
crate::templates::components::ada_list_html(&mut buf).unwrap(); crate::templates::components::ada_list_html(&mut buf).unwrap();