Compare commits
10 commits
dfce42fd03
...
be47a0afec
Author | SHA1 | Date | |
---|---|---|---|
be47a0afec | |||
3102e77c82 | |||
cfb9a38c78 | |||
40a5b99f02 | |||
da718cf238 | |||
a9eef2b68f | |||
a22cb3b306 | |||
58fa1864d1 | |||
c31c8e91ac | |||
239b332b48 |
4 changed files with 52 additions and 22 deletions
|
@ -18,7 +18,7 @@ tracing = "0.1"
|
||||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||||
tower-http = { version = "0.4.4", features = ["full"] }
|
tower-http = { version = "0.4.4", features = ["full"] }
|
||||||
ructe.workspace = true
|
ructe.workspace = true
|
||||||
tokio-stream = { version = "0.1.14", features = ["sync"] }
|
tokio-stream = { version = "0.1.14", features = ["sync", "time"] }
|
||||||
futures-util = "0.3.28"
|
futures-util = "0.3.28"
|
||||||
diesel = { version = "2.1.0", features = ["chrono"] }
|
diesel = { version = "2.1.0", features = ["chrono"] }
|
||||||
diesel-async = { version = "0.4.1", features = ["mysql", "deadpool"] }
|
diesel-async = { version = "0.4.1", features = ["mysql", "deadpool"] }
|
||||||
|
|
35
src/api.rs
35
src/api.rs
|
@ -1,8 +1,11 @@
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
use axum::{
|
use axum::{
|
||||||
extract::{Path, State},
|
extract::{Path, State},
|
||||||
|
http::StatusCode,
|
||||||
response::{
|
response::{
|
||||||
sse::{Event, KeepAlive},
|
sse::{Event, KeepAlive},
|
||||||
IntoResponse, Sse,
|
IntoResponse, Response, Sse,
|
||||||
},
|
},
|
||||||
routing::{get, post},
|
routing::{get, post},
|
||||||
Form,
|
Form,
|
||||||
|
@ -17,7 +20,10 @@ use diesel::{ExpressionMethods, OptionalExtension, QueryDsl, SelectableHelper};
|
||||||
use diesel_async::{scoped_futures::ScopedFutureExt, AsyncConnection, RunQueryDsl};
|
use diesel_async::{scoped_futures::ScopedFutureExt, AsyncConnection, RunQueryDsl};
|
||||||
use futures_util::Stream;
|
use futures_util::Stream;
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use tokio_stream::{wrappers::errors::BroadcastStreamRecvError, StreamExt as _};
|
use tokio_stream::{
|
||||||
|
wrappers::{errors::BroadcastStreamRecvError, BroadcastStream},
|
||||||
|
StreamExt as _,
|
||||||
|
};
|
||||||
|
|
||||||
use crate::AppState;
|
use crate::AppState;
|
||||||
|
|
||||||
|
@ -94,7 +100,7 @@ async fn add_drink(
|
||||||
State(state): State<AppState>,
|
State(state): State<AppState>,
|
||||||
HxRequest(hx): HxRequest,
|
HxRequest(hx): HxRequest,
|
||||||
Form(form): Form<DrinkForm>,
|
Form(form): Form<DrinkForm>,
|
||||||
) -> impl IntoResponse {
|
) -> Response {
|
||||||
let mut conn = state.connection.get().await.unwrap();
|
let mut conn = state.connection.get().await.unwrap();
|
||||||
|
|
||||||
let open_shift: Option<Shift> = {
|
let open_shift: Option<Shift> = {
|
||||||
|
@ -108,9 +114,18 @@ async fn add_drink(
|
||||||
.optional()
|
.optional()
|
||||||
.expect("Query failed: No open shifts found")
|
.expect("Query failed: No open shifts found")
|
||||||
};
|
};
|
||||||
let open_shift = open_shift.unwrap();
|
|
||||||
|
|
||||||
async {
|
if open_shift.is_none() {
|
||||||
|
return (
|
||||||
|
StatusCode::CONFLICT,
|
||||||
|
"Unable to add a drink when no shift is open",
|
||||||
|
)
|
||||||
|
.into_response();
|
||||||
|
}
|
||||||
|
|
||||||
|
let open_shift = unsafe { open_shift.unwrap_unchecked() };
|
||||||
|
|
||||||
|
{
|
||||||
use cm_lib::schema::drinks::dsl::*;
|
use cm_lib::schema::drinks::dsl::*;
|
||||||
|
|
||||||
diesel::insert_into(drinks)
|
diesel::insert_into(drinks)
|
||||||
|
@ -118,8 +133,7 @@ async fn add_drink(
|
||||||
.execute(&mut conn)
|
.execute(&mut conn)
|
||||||
.await
|
.await
|
||||||
.unwrap()
|
.unwrap()
|
||||||
}
|
};
|
||||||
.await;
|
|
||||||
|
|
||||||
let mut headers = axum::http::HeaderMap::new();
|
let mut headers = axum::http::HeaderMap::new();
|
||||||
headers.insert("HX-Push-Url", "/".parse().unwrap());
|
headers.insert("HX-Push-Url", "/".parse().unwrap());
|
||||||
|
@ -127,6 +141,7 @@ async fn add_drink(
|
||||||
headers,
|
headers,
|
||||||
render!(crate::templates::home_html, Some(open_shift), !hx),
|
render!(crate::templates::home_html, Some(open_shift), !hx),
|
||||||
)
|
)
|
||||||
|
.into_response()
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Deserialize, Debug)]
|
#[derive(Deserialize, Debug)]
|
||||||
|
@ -178,9 +193,9 @@ async fn add_dancer(
|
||||||
async fn ada_subscribe(
|
async fn ada_subscribe(
|
||||||
State(state): State<AppState>,
|
State(state): State<AppState>,
|
||||||
) -> Sse<impl Stream<Item = Result<Event, BroadcastStreamRecvError>>> {
|
) -> Sse<impl Stream<Item = Result<Event, BroadcastStreamRecvError>>> {
|
||||||
let stream =
|
let stream = BroadcastStream::new(state.sse_handler.ada_sender.subscribe())
|
||||||
tokio_stream::wrappers::BroadcastStream::new(state.sse_handler.ada_sender.subscribe())
|
.map(|r| r.map(|s| Event::default().event("ada").data(s)))
|
||||||
.map(|r| r.map(|s| Event::default().event("ada").data(s)));
|
.throttle(Duration::from_secs(1));
|
||||||
|
|
||||||
Sse::new(stream).keep_alive(KeepAlive::default())
|
Sse::new(stream).keep_alive(KeepAlive::default())
|
||||||
}
|
}
|
||||||
|
|
21
src/main.rs
21
src/main.rs
|
@ -20,9 +20,7 @@ use axum::{
|
||||||
Router,
|
Router,
|
||||||
};
|
};
|
||||||
|
|
||||||
use diesel::{
|
use diesel::prelude::*;
|
||||||
result::OptionalExtension, BelongingToDsl, ExpressionMethods, QueryDsl, SelectableHelper,
|
|
||||||
};
|
|
||||||
|
|
||||||
use diesel_async::{
|
use diesel_async::{
|
||||||
pooled_connection::{deadpool::Pool, AsyncDieselConnectionManager},
|
pooled_connection::{deadpool::Pool, AsyncDieselConnectionManager},
|
||||||
|
@ -115,11 +113,10 @@ async fn root(State(state): State<AppState>, HxRequest(hx): HxRequest) -> impl I
|
||||||
|
|
||||||
shifts
|
shifts
|
||||||
.filter(end.is_null())
|
.filter(end.is_null())
|
||||||
.select(Shift::as_select())
|
|
||||||
.first(&mut conn)
|
.first(&mut conn)
|
||||||
.await
|
.await
|
||||||
.optional()
|
.optional()
|
||||||
.expect("Query failed: No open shifts found")
|
.expect("Query failed")
|
||||||
};
|
};
|
||||||
|
|
||||||
tracing::debug!("{open_shift:?}");
|
tracing::debug!("{open_shift:?}");
|
||||||
|
@ -132,6 +129,10 @@ async fn drinks(Path(id): Path<u32>) -> impl IntoResponse {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn shift_report(State(state): State<AppState>, Path(id): Path<u32>) -> impl IntoResponse {
|
async fn shift_report(State(state): State<AppState>, Path(id): Path<u32>) -> impl IntoResponse {
|
||||||
|
// TODO: Massive rework of how this works
|
||||||
|
// was experimenting with traits, but most of that
|
||||||
|
// didn't lead to what I was aiming for
|
||||||
|
|
||||||
let mut conn = state.connection.get().await.unwrap();
|
let mut conn = state.connection.get().await.unwrap();
|
||||||
let shift: Shift = shifts::table.find(id).first(&mut conn).await.unwrap();
|
let shift: Shift = shifts::table.find(id).first(&mut conn).await.unwrap();
|
||||||
let drinks: Vec<Drink> = Drink::belonging_to(&shift).load(&mut conn).await.unwrap();
|
let drinks: Vec<Drink> = Drink::belonging_to(&shift).load(&mut conn).await.unwrap();
|
||||||
|
@ -143,13 +144,13 @@ async fn shift_report(State(state): State<AppState>, Path(id): Path<u32>) -> imp
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn shift_reports(State(state): State<AppState>) -> impl IntoResponse {
|
async fn shift_reports(State(state): State<AppState>) -> impl IntoResponse {
|
||||||
|
let mut shifts: Vec<Shift> = {
|
||||||
|
use cm_lib::schema::shifts::dsl::*;
|
||||||
|
|
||||||
let mut conn = state.connection.get().await.unwrap();
|
let mut conn = state.connection.get().await.unwrap();
|
||||||
|
|
||||||
let mut shifts: Vec<Shift> = shifts::table
|
shifts.load(&mut conn).await.unwrap()
|
||||||
.select(Shift::as_select())
|
};
|
||||||
.load(&mut conn)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
shifts.sort_by(|a, b| b.start.cmp(&a.start));
|
shifts.sort_by(|a, b| b.start.cmp(&a.start));
|
||||||
|
|
||||||
|
|
|
@ -3,6 +3,17 @@ pub enum SseMessage {
|
||||||
RefreshAda,
|
RefreshAda,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// I think for the ada_sender, a watch would be ideal if I want to always use full renders of the
|
||||||
|
// watchlist, however if I want more atomic updates, it may be useful to keep the broadcast's
|
||||||
|
// buffer and return an html component that requests a complete rerender if the handler lagged
|
||||||
|
/// A container struct for keeping access to the channel senders for both channels used in the sse
|
||||||
|
/// pipeline
|
||||||
|
///
|
||||||
|
/// For something wishing to listen in on the broadcast html stream, you should call `.subscribe()`
|
||||||
|
/// on ada_sender
|
||||||
|
///
|
||||||
|
/// Anything wishing to send to sse should pass an [SseMessage] to the [tokio::sync::mpsc::Sender]
|
||||||
|
/// in sse_sender
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct SseHandler {
|
pub struct SseHandler {
|
||||||
pub ada_sender: tokio::sync::broadcast::Sender<String>,
|
pub ada_sender: tokio::sync::broadcast::Sender<String>,
|
||||||
|
@ -10,6 +21,8 @@ pub struct SseHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SseHandler {
|
impl SseHandler {
|
||||||
|
/// This function initializes the channels used for sse, as well as spawning a manager task for
|
||||||
|
/// handling sse messages
|
||||||
pub fn init() -> Self {
|
pub fn init() -> Self {
|
||||||
let (ada_sender, _) = tokio::sync::broadcast::channel(10);
|
let (ada_sender, _) = tokio::sync::broadcast::channel(10);
|
||||||
let (sse_sender, mut sse_receiver) = tokio::sync::mpsc::channel(10);
|
let (sse_sender, mut sse_receiver) = tokio::sync::mpsc::channel(10);
|
||||||
|
@ -37,6 +50,7 @@ impl SseHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_ada_list() -> String {
|
async fn get_ada_list() -> String {
|
||||||
|
// TODO: need to scope out if these errors need to be handled
|
||||||
let mut buf = Vec::new();
|
let mut buf = Vec::new();
|
||||||
|
|
||||||
crate::templates::components::ada_list_html(&mut buf).unwrap();
|
crate::templates::components::ada_list_html(&mut buf).unwrap();
|
||||||
|
|
Loading…
Reference in a new issue