1
0
Fork 1
mirror of https://github.com/MingweiSamuel/Riven.git synced 2025-02-21 15:37:27 -08:00

First draft of try_X(min_capacity, ...) APIs

This commit is contained in:
Brian Morin 2025-01-19 13:11:30 -08:00
parent 343d26bdb7
commit 4067363545
7 changed files with 2001 additions and 13 deletions

File diff suppressed because it is too large Load diff

View file

@ -49,6 +49,41 @@ impl RateLimit {
}
}
/// Attempt to acquire tokens from both rate limits only if they above min_capacity before factoring in bust factor.
pub fn acquire_both_if_above_capacity(
app_rate_limit: &Self,
method_rate_limit: &Self,
min_capacity: f32,
) -> bool {
// Check retry after.
if app_rate_limit.get_retry_after_delay().is_some()
|| method_rate_limit.get_retry_after_delay().is_some()
{
return false;
}
// Check buckets.
let app_buckets = app_rate_limit.buckets.read();
let method_buckets = method_rate_limit.buckets.read();
for bucket in app_buckets.iter().chain(method_buckets.iter()) {
if bucket.get_capacity() < min_capacity {
return false;
}
}
// Success.
for bucket in app_buckets.iter().chain(method_buckets.iter()) {
bucket.get_tokens(1);
}
log::trace!(
"Tokens obtained, buckets: APP {:?} METHOD {:?}",
app_buckets,
method_buckets
);
true
}
pub async fn acquire_both(app_rate_limit: &Self, method_rate_limit: &Self) {
while let Some(delay) = Self::acquire_both_or_duration(app_rate_limit, method_rate_limit) {
futures::select_biased! {

View file

@ -33,18 +33,30 @@ impl RegionalRequester {
config: &'a RiotApiConfig,
method_id: &'static str,
request: RequestBuilder,
) -> Result<ResponseInfo> {
min_capacity: Option<f32>,
) -> Option<Result<ResponseInfo>> {
let mut retries: u8 = 0;
loop {
let method_rate_limit = self
.method_rate_limits
.get_or_insert(&method_id, || RateLimit::new(RateLimitType::Method));
// Rate limit.
let rate_limit = RateLimit::acquire_both(&self.app_rate_limit, method_rate_limit);
#[cfg(feature = "tracing")]
let rate_limit = rate_limit.instrument(tracing::info_span!("rate_limit"));
rate_limit.await;
if let Some(min_capacity) = min_capacity {
// Never sleep, return None if we don't have enough capacity.
if !RateLimit::acquire_both_if_above_capacity(
&self.app_rate_limit,
method_rate_limit,
min_capacity,
) {
return None;
}
} else {
// Sleep until we have capcacity
let rate_limit = RateLimit::acquire_both(&self.app_rate_limit, method_rate_limit);
#[cfg(feature = "tracing")]
let rate_limit = rate_limit.instrument(tracing::info_span!("rate_limit"));
rate_limit.await;
}
// Send request.
let request_clone = request
@ -63,7 +75,7 @@ impl RegionalRequester {
"Request failed (retried {} times), failure, returning error.",
retries
);
break Err(RiotApiError::new(e, retries, None, None));
break Some(Err(RiotApiError::new(e, retries, None, None)));
}
let delay = Duration::from_secs(2_u64.pow(retries as u32));
log::debug!("Request failed with cause \"{}\", (retried {} times), using exponential backoff, retrying after {:?}.", e.to_string(), retries, delay);
@ -91,11 +103,11 @@ impl RegionalRequester {
status,
retries
);
break Ok(ResponseInfo {
break Some(Ok(ResponseInfo {
response,
retries,
status_none,
});
}));
}
let err = response.error_for_status_ref().err().unwrap_or_else(|| {
panic!(
@ -114,12 +126,12 @@ impl RegionalRequester {
status,
retries
);
break Err(RiotApiError::new(
break Some(Err(RiotApiError::new(
err,
retries,
Some(response),
Some(status),
));
)));
}
// Is retryable, do exponential backoff if retry-after wasn't specified.

View file

@ -13,6 +13,12 @@ use crate::time::Duration;
/// `"X-App-Rate-Limit": "20:1,100:120"`. Each `TokenBucket` corresponds to a
/// single `"100:120"` (100 requests per 120 seconds).
pub trait TokenBucket {
/// Get the amount of capcaity available in the bucket.
/// # Returns
/// a float representing the amount of capacity available in the bucket from 1.0 to 0.0.
/// returns -1.0 if only burst capacity is available
fn get_capacity(&self) -> f32;
/// Get the duration til the next available token, or None if a token
/// is available.
/// # Returns
@ -129,6 +135,21 @@ impl VectorTokenBucket {
}
impl TokenBucket for VectorTokenBucket {
fn get_capacity(&self) -> f32 {
if self.total_limit <= 0 {
// Handle edge cases by telling the caller we have no capacity.
return -1.0;
}
let timestamps = self.update_get_timestamps();
if timestamps.len() > self.total_limit {
// Only burst cacpacity available.
return -1.0;
}
return 1.0 - (timestamps.len() as f32 / self.total_limit as f32);
}
fn get_delay(&self) -> Option<Duration> {
let timestamps = self.update_get_timestamps();

View file

@ -20,6 +20,27 @@ mod token_bucket {
assert!(!bucket.get_tokens(51), "Should have violated limit.");
}
#[test]
fn test_basic_capacity() {
Instant::set_time(50_000);
let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, ZERO, 0.95, 1.0);
assert!(
bucket.get_capacity() - 1.0 < f32::EPSILON,
"Should be at total capacity"
);
assert!(bucket.get_tokens(50), "Should have not violated limit.");
assert!(
bucket.get_capacity() - 0.5 < f32::EPSILON,
"Should be at half capacity."
);
assert_eq!(None, bucket.get_delay(), "Can get stuff.");
assert!(!bucket.get_tokens(51), "Should have violated limit.");
assert!(
bucket.get_capacity() < f32::EPSILON,
"Should be at zero or less capacity."
);
}
#[test]
fn test_internal_constructor() {
let bucket = VectorTokenBucket::new(Duration::from_millis(1000), 100, ZERO, 1.0, 1.0);

View file

@ -163,6 +163,126 @@ impl RiotApi {
.map_err(|e| RiotApiError::new(e, retries, None, Some(status)))
}
/// This method should generally not be used directly. Consider using endpoint wrappers instead.
///
/// This sends a request based on the given parameters and returns a parsed result.
///
/// # Parameters
/// * `method_id` - A unique string id representing the endpoint method for per-method rate limiting.
/// * `region_platform` - The stringified platform, used in rate limiting.
/// * `request` - The request information. Use `request()` to obtain a `RequestBuilder` instance.
/// * `min_capacity` - Minimum capacity required as a float from 1.0 (all capacity) to 0.0 (no capacity) excluding burst
///
/// # Returns
/// None if min_capacity is not met, otherwise a future resolving to a `Result` containg either a `T` (success) or a `RiotApiError` (failure).
pub async fn try_execute_val<'a, T: serde::de::DeserializeOwned + 'a>(
&'a self,
method_id: &'static str,
region_platform: &'static str,
request: RequestBuilder,
min_capacity: f32,
) -> Option<Result<T>> {
let rinfo = self
.try_execute_raw(method_id, region_platform, request, min_capacity)
.await;
if let Some(rinfo) = rinfo {
match rinfo {
Ok(rinfo) => {
let retries = rinfo.retries;
let status = rinfo.response.status();
let value = rinfo.response.json::<T>().await;
let value =
value.map_err(|e| RiotApiError::new(e, retries, None, Some(status)));
Some(value)
}
Err(e) => Some(Err(e)),
}
} else {
None
}
}
/// This method should generally not be used directly. Consider using endpoint wrappers instead.
///
/// This sends a request based on the given parameters and returns an optional parsed result.
///
/// # Parameters
/// * `method_id` - A unique string id representing the endpoint method for per-method rate limiting.
/// * `region_platform` - The stringified platform, used in rate limiting.
/// * `request` - The request information. Use `request()` to obtain a `RequestBuilder` instance.
/// * `min_capacity` - Minimum capacity required as a float from 1.0 (all capacity) to 0.0 (no capacity) excluding burst
///
/// # Returns
/// None if min_capacity is not met, otherwise a future resolving to a `Result` containg either an `Option<T>` (success) or a `RiotApiError` (failure).
pub async fn try_execute_opt<'a, T: serde::de::DeserializeOwned + 'a>(
&'a self,
method_id: &'static str,
region_platform: &'static str,
request: RequestBuilder,
min_capacity: f32,
) -> Option<Result<Option<T>>> {
let rinfo = self
.try_execute_raw(method_id, region_platform, request, min_capacity)
.await;
if let Some(rinfo) = rinfo {
match rinfo {
Ok(rinfo) => {
let retries = rinfo.retries;
let status = rinfo.response.status();
let value = rinfo.response.json::<Option<T>>().await;
let value =
value.map_err(|e| RiotApiError::new(e, retries, None, Some(status)));
Some(value)
}
Err(e) => Some(Err(e)),
}
} else {
return None;
}
}
/// This method should generally not be used directly. Consider using endpoint wrappers instead.
///
/// This sends a request based on the given parameters but does not deserialize any response body.
///
/// # Parameters
/// * `method_id` - A unique string id representing the endpoint method for per-method rate limiting.
/// * `region_platform` - The stringified platform, used in rate limiting.
/// * `request` - The request information. Use `request()` to obtain a `RequestBuilder` instance.
/// * `min_capacity` - Minimum capacity required as a float from 1.0 (all capacity) to 0.0 (no capacity) excluding burst
///
/// # Returns
/// None if min_capacity is not met, otherwise a future resolving to a `Result` containg either `()` (success) or a `RiotApiError` (failure).
pub async fn try_execute(
&self,
method_id: &'static str,
region_platform: &'static str,
request: RequestBuilder,
min_capacity: f32,
) -> Option<Result<()>> {
let rinfo = self
.try_execute_raw(method_id, region_platform, request, min_capacity)
.await;
if let Some(rinfo) = rinfo {
match rinfo {
Ok(rinfo) => {
let retries = rinfo.retries;
let status = rinfo.response.status();
Some(
rinfo
.response
.error_for_status()
.map(|_| ())
.map_err(|e| RiotApiError::new(e, retries, None, Some(status))),
)
}
Err(e) => Some(Err(e)),
}
} else {
None
}
}
/// This method should generally not be used directly. Consider using endpoint wrappers instead.
///
/// This sends a request based on the given parameters and returns a raw `ResponseInfo`.
@ -182,8 +302,40 @@ impl RiotApi {
region_platform: &'static str,
request: RequestBuilder,
) -> impl Future<Output = Result<ResponseInfo>> + '_ {
self.regional_requester(region_platform)
.execute(&self.config, method_id, request)
async move {
self.regional_requester(region_platform)
.execute(&self.config, method_id, request, None)
.await
.unwrap() // excute only returns None if min_capacity is Some(f32)
}
}
/// This method should generally not be used directly. Consider using endpoint wrappers instead.
///
/// A variabion of `execute_raw` that allows for a minimum capacity to be specified for load shedding.
///
/// # Parameters
/// * `method_id` - A unique string id representing the endpoint method for per-method rate limiting.
/// * `region_platform` - The stringified platform, used in rate limiting.
/// * `request` - The request information. Use `request()` to obtain a `RequestBuilder` instance.
/// * `min_capacity` - Minimum capacity required as a float from 1.0 (all capacity) to 0.0 (no capacity) excluding burst
///
/// # Returns
/// None if there is not enough capacity to make the request, otherwise a future resolving to a `Result` containg
/// either a `ResponseInfo` (success) or a `RiotApiError` (failure).
pub fn try_execute_raw(
&self,
method_id: &'static str,
region_platform: &'static str,
request: RequestBuilder,
min_capacity: f32,
) -> impl Future<Output = Option<Result<ResponseInfo>>> + '_ {
self.regional_requester(region_platform).execute(
&self.config,
method_id,
request,
Some(min_capacity),
)
}
/// Gets the [`RiotApiConfig::rso_clear_header`] for use in RSO endpoints.

View file

@ -209,6 +209,38 @@ impl<'a> {{= endpoint }}<'a> {
future
}
/// Variation that checks for minimum capacity
/// See `{{= method }}` for detailed documentation
/// # Parameters
/// * `min_capacity` - Minimum capacity required as a float from 1.0 (all capacity) to 0.0 (no capacity) excluding burst
///
/// Note: this method is automatically generated.
pub fn try_{{= method }}(&self, min_capacity: f32, {{= argBuilder.join('') }})
-> impl Future<Output = Option<Result<{{= returnType }}>>> + 'a
{
let route_str = route.into();
let request = self.base.request(Method::{{= verb.toUpperCase() }}, route_str, {{= routeArgument }});
{{? isRso }}
let mut request = request.bearer_auth(access_token);
if let Some(clear) = self.base.get_rso_clear_header() { request = request.header(clear, "") }
{{?}}
{{~ queryParams :queryParam }}
{{= dotUtils.formatAddQueryParam(queryParam) }}
{{~}}
{{~ headerParams :headerParam }}
{{= dotUtils.formatAddHeaderParam(headerParam) }}
{{~}}
{{? bodyType }}
let request = request.body(serde_json::ser::to_vec(body).unwrap());
{{?}}
let future = self.base.try_execute{{= hasReturn ? (returnOptional ? '_opt' : '_val') : '' }}{{= returnTypeTurbofish }}("{{= operationId }}", route_str, request, min_capacity);
#[cfg(feature = "tracing")]
let future = future.instrument(tracing::info_span!("{{= operationId }}", route = route_str));
#[cfg(feature = "metrics")]
let future = metrics::timed(future, "{{= operationId }}", route_str);
future
}
{{
}
}