mirror of
https://github.com/MingweiSamuel/Riven.git
synced 2024-12-26 02:46:31 +00:00
parent
f4c75a421a
commit
d26a9a290d
3 changed files with 65 additions and 12 deletions
|
@ -52,6 +52,7 @@ scan_fmt = { version = "0.2", default-features = false }
|
||||||
serde = { version = "1.0", features = [ "derive" ] }
|
serde = { version = "1.0", features = [ "derive" ] }
|
||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
serde_repr = "0.1"
|
serde_repr = "0.1"
|
||||||
|
slab = "0.4"
|
||||||
strum = "0.20"
|
strum = "0.20"
|
||||||
strum_macros = "0.20"
|
strum_macros = "0.20"
|
||||||
tracing = { version = "0.1", optional = true }
|
tracing = { version = "0.1", optional = true }
|
||||||
|
|
|
@ -54,8 +54,8 @@ impl RateLimit {
|
||||||
while let Some(delay) = Self::acquire_both_or_duration(app_rate_limit, method_rate_limit) {
|
while let Some(delay) = Self::acquire_both_or_duration(app_rate_limit, method_rate_limit) {
|
||||||
futures::select_biased! {
|
futures::select_biased! {
|
||||||
_ = sleep(delay).fuse() => continue,
|
_ = sleep(delay).fuse() => continue,
|
||||||
_ = method_rate_limit.update_notify.notified().fuse() => {}
|
_ = method_rate_limit.update_notify.notified() => {}
|
||||||
_ = app_rate_limit.update_notify.notified().fuse() => {}
|
_ = app_rate_limit.update_notify.notified() => {}
|
||||||
};
|
};
|
||||||
log::trace!("Task awoken due to rate limit update.");
|
log::trace!("Task awoken due to rate limit update.");
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,43 +2,93 @@ use std::future::Future;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::task::{Context, Poll, Waker};
|
use std::task::{Context, Poll, Waker};
|
||||||
|
|
||||||
|
use futures::future::FusedFuture;
|
||||||
|
use futures::FutureExt;
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
|
use slab::Slab;
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub struct Notify {
|
pub struct Notify {
|
||||||
waiters: Mutex<Vec<Waker>>,
|
internal: Mutex<NotifyInternal>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct NotifyInternal {
|
||||||
|
/// Incremented each time [`Self::notify_waiters()`] is called.
|
||||||
|
pub generation: usize,
|
||||||
|
/// List of waiters.
|
||||||
|
pub waiters: Slab<Waker>,
|
||||||
|
}
|
||||||
|
|
||||||
impl Notify {
|
impl Notify {
|
||||||
|
/// Creates a new `Notify` instance.
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
Self::default()
|
Self::default()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn notified(&self) -> impl '_ + Future<Output = ()> {
|
/// Returns a future which waits for a notification via [`Self::notify_wakers`].
|
||||||
|
///
|
||||||
|
/// Dropping the returned future will de-register it from this `Notify` instance, which
|
||||||
|
/// [prevents memory leaks](https://github.com/MingweiSamuel/Riven/pull/67).
|
||||||
|
pub fn notified(&self) -> impl '_ + FusedFuture<Output = ()> {
|
||||||
struct Notified<'a> {
|
struct Notified<'a> {
|
||||||
|
/// Parent notify reference.
|
||||||
notify: &'a Notify,
|
notify: &'a Notify,
|
||||||
registered: bool,
|
/// Generation of this notify. To prevent the ABA problem with `slab` keys.
|
||||||
|
/// Starts out `None`, set to the generation once the `Waker` is registered into [`NotifyInternal::waiters`].
|
||||||
|
generation_and_key: Option<(usize, usize)>,
|
||||||
}
|
}
|
||||||
impl Future for Notified<'_> {
|
impl Future for Notified<'_> {
|
||||||
type Output = ();
|
type Output = ();
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
if std::mem::replace(&mut self.as_mut().registered, true) {
|
if let Some(_generation_and_key) = self.generation_and_key.take() {
|
||||||
|
// Already registered, this is waking via `notify_wakers` (probably).
|
||||||
|
// `generation_and_key.take()` to set back to `None`, avoid extra `Drop` work.
|
||||||
|
// Ok since we call `fuse()` to prevent re-polling after this return.
|
||||||
Poll::Ready(())
|
Poll::Ready(())
|
||||||
} else {
|
} else {
|
||||||
self.notify.waiters.lock().push(cx.waker().clone());
|
// Register and set the generation (to preven ABA problem).
|
||||||
|
let mut internal = self.notify.internal.lock();
|
||||||
|
let key = internal.waiters.insert(cx.waker().clone());
|
||||||
|
self.generation_and_key = Some((internal.generation, key));
|
||||||
Poll::Pending
|
Poll::Pending
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
impl Drop for Notified<'_> {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
// Only bother deallocating if registered (i.e. `generation_and_key` is set).
|
||||||
|
if let Some((generation, key)) = self.generation_and_key {
|
||||||
|
let mut internal = self.notify.internal.lock();
|
||||||
|
// Ensure generation matches before removing, to prevent ABA problem.
|
||||||
|
// If no match, means `Notify::notify_waiters` has already been called and deallocated us.
|
||||||
|
if internal.generation == generation {
|
||||||
|
internal
|
||||||
|
.waiters
|
||||||
|
.try_remove(key)
|
||||||
|
.expect("Expected to drop registered `Notified` but waker not found.");
|
||||||
|
internal.waiters.shrink_to_fit();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Notified {
|
Notified {
|
||||||
notify: self,
|
notify: self,
|
||||||
registered: false,
|
generation_and_key: None,
|
||||||
}
|
}
|
||||||
|
.fuse()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Notifies all waiting tasks.
|
||||||
pub fn notify_waiters(&self) {
|
pub fn notify_waiters(&self) {
|
||||||
self.waiters.lock().drain(..).for_each(Waker::wake);
|
let mut internal = self.internal.lock();
|
||||||
|
// Increment generation when we clear the slab.
|
||||||
|
// Wrap, although not likely to matter. ABBB...BA problem with `usize::MAX` 'B's.
|
||||||
|
internal.generation = internal.generation.wrapping_add(1);
|
||||||
|
internal.waiters.drain().for_each(Waker::wake);
|
||||||
|
internal.waiters.shrink_to_fit();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -54,11 +104,13 @@ mod test {
|
||||||
|
|
||||||
for _ in 0..100 {
|
for _ in 0..100 {
|
||||||
futures::select_biased! {
|
futures::select_biased! {
|
||||||
_ = notify.notified().fuse() => {}
|
_ = notify.notified() => {}
|
||||||
_ = tokio::task::yield_now().fuse() => {}
|
_ = std::future::ready(()).fuse() => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
assert_eq!(0, notify.waiters.lock().len());
|
let internal = notify.internal.lock();
|
||||||
|
assert_eq!(0, internal.waiters.len());
|
||||||
|
assert_eq!(0, internal.waiters.capacity());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue