From d26a9a290d4f2a1a5b747153eab03df01f8efc33 Mon Sep 17 00:00:00 2001 From: Mingwei Samuel Date: Tue, 26 Mar 2024 14:59:40 -0700 Subject: [PATCH] fix: memory leak in `Notify`, fix #67 `notified()` future is now fused --- riven/Cargo.toml | 1 + riven/src/req/rate_limit.rs | 4 +-- riven/src/util/notify.rs | 72 +++++++++++++++++++++++++++++++------ 3 files changed, 65 insertions(+), 12 deletions(-) diff --git a/riven/Cargo.toml b/riven/Cargo.toml index cf583fa..fe4da07 100644 --- a/riven/Cargo.toml +++ b/riven/Cargo.toml @@ -52,6 +52,7 @@ scan_fmt = { version = "0.2", default-features = false } serde = { version = "1.0", features = [ "derive" ] } serde_json = "1.0" serde_repr = "0.1" +slab = "0.4" strum = "0.20" strum_macros = "0.20" tracing = { version = "0.1", optional = true } diff --git a/riven/src/req/rate_limit.rs b/riven/src/req/rate_limit.rs index a118e75..949d356 100644 --- a/riven/src/req/rate_limit.rs +++ b/riven/src/req/rate_limit.rs @@ -54,8 +54,8 @@ impl RateLimit { while let Some(delay) = Self::acquire_both_or_duration(app_rate_limit, method_rate_limit) { futures::select_biased! { _ = sleep(delay).fuse() => continue, - _ = method_rate_limit.update_notify.notified().fuse() => {} - _ = app_rate_limit.update_notify.notified().fuse() => {} + _ = method_rate_limit.update_notify.notified() => {} + _ = app_rate_limit.update_notify.notified() => {} }; log::trace!("Task awoken due to rate limit update."); } diff --git a/riven/src/util/notify.rs b/riven/src/util/notify.rs index f75d065..1a34005 100644 --- a/riven/src/util/notify.rs +++ b/riven/src/util/notify.rs @@ -2,43 +2,93 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll, Waker}; +use futures::future::FusedFuture; +use futures::FutureExt; use parking_lot::Mutex; +use slab::Slab; #[derive(Default)] pub struct Notify { - waiters: Mutex>, + internal: Mutex, } + +#[derive(Default)] +struct NotifyInternal { + /// Incremented each time [`Self::notify_waiters()`] is called. + pub generation: usize, + /// List of waiters. + pub waiters: Slab, +} + impl Notify { + /// Creates a new `Notify` instance. pub fn new() -> Self { Self::default() } - pub fn notified(&self) -> impl '_ + Future { + /// Returns a future which waits for a notification via [`Self::notify_wakers`]. + /// + /// Dropping the returned future will de-register it from this `Notify` instance, which + /// [prevents memory leaks](https://github.com/MingweiSamuel/Riven/pull/67). + pub fn notified(&self) -> impl '_ + FusedFuture { struct Notified<'a> { + /// Parent notify reference. notify: &'a Notify, - registered: bool, + /// Generation of this notify. To prevent the ABA problem with `slab` keys. + /// Starts out `None`, set to the generation once the `Waker` is registered into [`NotifyInternal::waiters`]. + generation_and_key: Option<(usize, usize)>, } impl Future for Notified<'_> { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - if std::mem::replace(&mut self.as_mut().registered, true) { + if let Some(_generation_and_key) = self.generation_and_key.take() { + // Already registered, this is waking via `notify_wakers` (probably). + // `generation_and_key.take()` to set back to `None`, avoid extra `Drop` work. + // Ok since we call `fuse()` to prevent re-polling after this return. Poll::Ready(()) } else { - self.notify.waiters.lock().push(cx.waker().clone()); + // Register and set the generation (to preven ABA problem). + let mut internal = self.notify.internal.lock(); + let key = internal.waiters.insert(cx.waker().clone()); + self.generation_and_key = Some((internal.generation, key)); Poll::Pending } } } + impl Drop for Notified<'_> { + fn drop(&mut self) { + // Only bother deallocating if registered (i.e. `generation_and_key` is set). + if let Some((generation, key)) = self.generation_and_key { + let mut internal = self.notify.internal.lock(); + // Ensure generation matches before removing, to prevent ABA problem. + // If no match, means `Notify::notify_waiters` has already been called and deallocated us. + if internal.generation == generation { + internal + .waiters + .try_remove(key) + .expect("Expected to drop registered `Notified` but waker not found."); + internal.waiters.shrink_to_fit(); + } + } + } + } Notified { notify: self, - registered: false, + generation_and_key: None, } + .fuse() } + /// Notifies all waiting tasks. pub fn notify_waiters(&self) { - self.waiters.lock().drain(..).for_each(Waker::wake); + let mut internal = self.internal.lock(); + // Increment generation when we clear the slab. + // Wrap, although not likely to matter. ABBB...BA problem with `usize::MAX` 'B's. + internal.generation = internal.generation.wrapping_add(1); + internal.waiters.drain().for_each(Waker::wake); + internal.waiters.shrink_to_fit(); } } @@ -54,11 +104,13 @@ mod test { for _ in 0..100 { futures::select_biased! { - _ = notify.notified().fuse() => {} - _ = tokio::task::yield_now().fuse() => {} + _ = notify.notified() => {} + _ = std::future::ready(()).fuse() => {} } } - assert_eq!(0, notify.waiters.lock().len()); + let internal = notify.internal.lock(); + assert_eq!(0, internal.waiters.len()); + assert_eq!(0, internal.waiters.capacity()); } }