feat: reimplement sweeper

This commit is contained in:
Zynh Ludwig 2025-01-12 20:34:09 -08:00
parent 17d194e75b
commit 396f9afcc9
6 changed files with 79 additions and 19 deletions

2
Cargo.lock generated
View file

@ -1335,7 +1335,7 @@ dependencies = [
[[package]]
name = "nyazoom"
version = "0.2.2"
version = "0.2.4"
dependencies = [
"async-bincode",
"async-trait",

View file

@ -1,6 +1,6 @@
[package]
name = "nyazoom"
version = "0.2.2"
version = "0.2.4"
edition = "2021"
authors = ["Zynh Ludwig <Zynh0722@gmail.com>"]
readme = "README.md"

View file

@ -1,7 +1,7 @@
SELECT
id,
cache_name,
downloads,
max_downloads,
julianday('now') - julianday(uploaded) AS age
downloads >= max_downloads AS "out_of_downloads: bool",
julianday('now') - julianday(uploaded) > 5 AS "too_old: bool"
FROM records
WHERE downloads >= max_downloads OR age > 5;
WHERE downloads >= max_downloads OR julianday('now') - julianday(uploaded) > 5;

View file

@ -0,0 +1,2 @@
DELETE FROM records
WHERE id = ?;

View file

@ -10,6 +10,14 @@ pub struct CacheRecord {
pub max_downloads: i32,
}
#[derive(Debug)]
pub struct ExpiredRecord {
pub id: i64,
pub cache_name: String,
pub out_of_downloads: bool,
pub too_old: bool,
}
impl CacheRecord {
pub fn can_be_downloaded(&self) -> bool {
let dur_since_upload = Utc::now().signed_duration_since(self.uploaded);

View file

@ -1,28 +1,78 @@
use std::time::Duration;
use sqlx::{sqlite::SqliteQueryResult, SqliteConnection};
use tracing::{info_span, Instrument};
use crate::state::AppState;
use crate::{db::ExpiredRecord, state::AppState, CACHE_DIR};
/// Spawn a repeating task that will clean files periodically
pub fn spawn(_state: AppState) {
pub fn spawn(state: AppState) {
tokio::spawn(
async move {
tokio::time::sleep(Duration::from_secs(5)).await;
loop {
tracing::debug!("Cleaning Sweep!");
let mut conn = state.pool.acquire().await.unwrap();
let expired_records_result =
sqlx::query_file_as!(ExpiredRecord, "queries/records/get_expired_records.sql")
.fetch_all(&mut *conn)
.await;
let expired_records = match expired_records_result {
Ok(r) => r,
Err(e) => {
tracing::error!(error = ?e);
continue;
}
};
let record_ids: Vec<i64> = expired_records.iter().map(|record| record.id).collect();
tracing::debug!(?record_ids, "Expired Records");
let mut too_old = 0;
let mut out_of_downloads = 0;
for record in expired_records.iter() {
let cache_path = CACHE_DIR.join(&record.cache_name);
if let Err(error) = std::fs::remove_file(&cache_path) {
if error.kind() == std::io::ErrorKind::NotFound {
tracing::warn!(path = ?cache_path, "Cache file not found")
} else {
tracing::error!(?error);
}
}
tracing::debug!(id = record.id, "Removing record");
let remove_result = remove_record_by_id(&mut conn, record.id).await;
match remove_result {
Err(e) => {
tracing::error!(error = ?e, "Unable to remove expired record")
}
Ok(_) => {
too_old += record.too_old as u32;
out_of_downloads += record.out_of_downloads as u32;
}
}
}
tracing::info!(too_old, out_of_downloads, "Cache items cleared");
tokio::time::sleep(Duration::from_secs(15 * 60)).await;
tracing::info!("Cleaning Sweep!");
// let mut records = state.records.lock().await;
//
// for (key, record) in records.clone().into_iter() {
// if !record.can_be_downloaded() {
// tracing::info!("culling: {:?}", record);
// records.remove_record(&key).await.unwrap();
// }
// }
}
}
.instrument(info_span!("sweeper")),
);
}
async fn remove_record_by_id(
conn: &mut SqliteConnection,
id: i64,
) -> sqlx::Result<SqliteQueryResult> {
sqlx::query_file!("queries/records/remove_record_by_id.sql", id)
.execute(conn)
.await
}