Compare commits

...

4 Commits
main ... tower

Author SHA1 Message Date
Conrad Ludgate 8f3623eae0
remove but inspired by tower 2022-11-15 21:51:54 +00:00
Conrad Ludgate df4990d62e
fix last tracing test 2022-11-15 17:07:36 +00:00
Conrad Ludgate 571b9abc49
fix docs 2022-11-15 16:34:33 +00:00
Conrad Ludgate 6eaa2365ed
expierments with tower 2022-11-15 14:53:23 +00:00
15 changed files with 628 additions and 424 deletions

View File

@ -11,8 +11,8 @@ to allow for client middleware chains.
This crate provides functionality for building and running middleware but no middleware
implementations. This repository also contains a couple of useful concrete middleware crates:
* [`reqwest-retry`](https://crates.io/crates/reqwest-retry): retry failed requests.
* [`reqwest-tracing`](https://crates.io/crates/reqwest-tracing):
- [`reqwest-retry`](https://crates.io/crates/reqwest-retry): retry failed requests.
- [`reqwest-tracing`](https://crates.io/crates/reqwest-tracing):
[`tracing`](https://crates.io/crates/tracing) integration, optional opentelemetry support.
## Overview
@ -29,10 +29,12 @@ reqwest-middleware = "0.1.6"
reqwest-retry = "0.1.5"
reqwest-tracing = "0.2.3"
tokio = { version = "1.12.0", features = ["macros", "rt-multi-thread"] }
tower = "0.4"
```
```rust
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use reqwest::Response;
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware, Error, Layer, RequestInitialiser, ReqwestService, Service};
use reqwest_retry::{RetryTransientMiddleware, policies::ExponentialBackoff};
use reqwest_tracing::TracingMiddleware;
@ -49,7 +51,12 @@ async fn main() {
run(client).await;
}
async fn run(client: ClientWithMiddleware) {
async fn run<M, I>(client: ClientWithMiddleware<M, I>)
where
M: Layer<ReqwestService>,
M::Service: Service,
I: RequestInitialiser,
{
client
.get("https://truelayer.com")
.header("foo", "bar")

View File

@ -1,6 +1,6 @@
[package]
name = "reqwest-middleware"
version = "0.2.0"
version = "0.3.0"
authors = ["Rodrigo Gryzinski <rodrigo.gryzinski@truelayer.com>"]
edition = "2018"
description = "Wrapper around reqwest to allow for client middleware chains."
@ -18,6 +18,7 @@ reqwest = { version = "0.11", default-features = false, features = ["json", "mul
serde = "1"
task-local-extensions = "0.1.1"
thiserror = "1"
futures = "0.3"
[dev-dependencies]
reqwest = "0.11"

View File

@ -1,81 +1,67 @@
use futures::future::BoxFuture;
use futures::FutureExt;
use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
use reqwest::multipart::Form;
use reqwest::{Body, Client, IntoUrl, Method, Request, Response};
use serde::Serialize;
use std::convert::TryFrom;
use std::fmt::{self, Display};
use std::sync::Arc;
use std::time::Duration;
use task_local_extensions::Extensions;
// use tower::{Layer, Service, ServiceBuilder, ServiceExt};
use crate::error::Result;
use crate::middleware::{Middleware, Next};
use crate::RequestInitialiser;
use crate::{Error, Identity, Layer, RequestInitialiser, RequestStack, Service, Stack};
/// A `ClientBuilder` is used to build a [`ClientWithMiddleware`].
///
/// [`ClientWithMiddleware`]: crate::ClientWithMiddleware
pub struct ClientBuilder {
pub struct ClientBuilder<M, I> {
client: Client,
middleware_stack: Vec<Arc<dyn Middleware>>,
initialiser_stack: Vec<Arc<dyn RequestInitialiser>>,
middleware_stack: M,
initialiser_stack: I,
}
impl ClientBuilder {
impl ClientBuilder<Identity, Identity> {
pub fn new(client: Client) -> Self {
ClientBuilder {
client,
middleware_stack: Vec::new(),
initialiser_stack: Vec::new(),
middleware_stack: Identity,
initialiser_stack: Identity,
}
}
}
impl<M, I> ClientBuilder<M, I> {
/// Convenience method to attach middleware.
pub fn with<T>(self, layer: T) -> ClientBuilder<Stack<T, M>, I> {
ClientBuilder {
client: self.client,
middleware_stack: Stack {
inner: layer,
outer: self.middleware_stack,
},
initialiser_stack: self.initialiser_stack,
}
}
/// Convenience method to attach middleware.
///
/// If you need to keep a reference to the middleware after attaching, use [`with_arc`].
///
/// [`with_arc`]: Self::with_arc
pub fn with<M>(self, middleware: M) -> Self
where
M: Middleware,
{
self.with_arc(Arc::new(middleware))
}
/// Add middleware to the chain. [`with`] is more ergonomic if you don't need the `Arc`.
///
/// [`with`]: Self::with
pub fn with_arc(mut self, middleware: Arc<dyn Middleware>) -> Self {
self.middleware_stack.push(middleware);
self
}
/// Convenience method to attach a request initialiser.
///
/// If you need to keep a reference to the initialiser after attaching, use [`with_arc_init`].
///
/// [`with_arc_init`]: Self::with_arc_init
pub fn with_init<I>(self, initialiser: I) -> Self
where
I: RequestInitialiser,
{
self.with_arc_init(Arc::new(initialiser))
}
/// Add a request initialiser to the chain. [`with_init`] is more ergonomic if you don't need the `Arc`.
///
/// [`with_init`]: Self::with_init
pub fn with_arc_init(mut self, initialiser: Arc<dyn RequestInitialiser>) -> Self {
self.initialiser_stack.push(initialiser);
self
pub fn with_init<T>(self, initialiser: T) -> ClientBuilder<M, RequestStack<T, I>> {
ClientBuilder {
client: self.client,
middleware_stack: self.middleware_stack,
initialiser_stack: RequestStack {
inner: initialiser,
outer: self.initialiser_stack,
},
}
}
/// Returns a `ClientWithMiddleware` using this builder configuration.
pub fn build(self) -> ClientWithMiddleware {
pub fn build(self) -> ClientWithMiddleware<M, I> {
ClientWithMiddleware {
inner: self.client,
middleware_stack: self.middleware_stack.into_boxed_slice(),
initialiser_stack: self.initialiser_stack.into_boxed_slice(),
middleware_stack: self.middleware_stack,
initialiser_stack: self.initialiser_stack,
}
}
}
@ -83,97 +69,68 @@ impl ClientBuilder {
/// `ClientWithMiddleware` is a wrapper around [`reqwest::Client`] which runs middleware on every
/// request.
#[derive(Clone)]
pub struct ClientWithMiddleware {
pub struct ClientWithMiddleware<M, I> {
inner: reqwest::Client,
middleware_stack: Box<[Arc<dyn Middleware>]>,
initialiser_stack: Box<[Arc<dyn RequestInitialiser>]>,
middleware_stack: M,
initialiser_stack: I,
}
impl ClientWithMiddleware {
/// See [`ClientBuilder`] for a more ergonomic way to build `ClientWithMiddleware` instances.
pub fn new<T>(client: Client, middleware_stack: T) -> Self
where
T: Into<Box<[Arc<dyn Middleware>]>>,
{
ClientWithMiddleware {
inner: client,
middleware_stack: middleware_stack.into(),
// TODO(conradludgate) - allow downstream code to control this manually if desired
initialiser_stack: Box::new([]),
}
}
impl<M: Layer<ReqwestService>, I: RequestInitialiser> ClientWithMiddleware<M, I> {
/// See [`Client::get`]
pub fn get<U: IntoUrl>(&self, url: U) -> RequestBuilder {
pub fn get<U: IntoUrl>(&self, url: U) -> RequestBuilder<M, I> {
self.request(Method::GET, url)
}
/// See [`Client::post`]
pub fn post<U: IntoUrl>(&self, url: U) -> RequestBuilder {
pub fn post<U: IntoUrl>(&self, url: U) -> RequestBuilder<M, I> {
self.request(Method::POST, url)
}
/// See [`Client::put`]
pub fn put<U: IntoUrl>(&self, url: U) -> RequestBuilder {
pub fn put<U: IntoUrl>(&self, url: U) -> RequestBuilder<M, I> {
self.request(Method::PUT, url)
}
/// See [`Client::patch`]
pub fn patch<U: IntoUrl>(&self, url: U) -> RequestBuilder {
pub fn patch<U: IntoUrl>(&self, url: U) -> RequestBuilder<M, I> {
self.request(Method::PATCH, url)
}
/// See [`Client::delete`]
pub fn delete<U: IntoUrl>(&self, url: U) -> RequestBuilder {
pub fn delete<U: IntoUrl>(&self, url: U) -> RequestBuilder<M, I> {
self.request(Method::DELETE, url)
}
/// See [`Client::head`]
pub fn head<U: IntoUrl>(&self, url: U) -> RequestBuilder {
pub fn head<U: IntoUrl>(&self, url: U) -> RequestBuilder<M, I> {
self.request(Method::HEAD, url)
}
/// See [`Client::request`]
pub fn request<U: IntoUrl>(&self, method: Method, url: U) -> RequestBuilder {
let req = RequestBuilder {
inner: self.inner.request(method, url),
client: self.clone(),
extensions: Extensions::new(),
};
self.initialiser_stack
.iter()
.fold(req, |req, i| i.init(req))
}
/// See [`Client::execute`]
pub async fn execute(&self, req: Request) -> Result<Response> {
let mut ext = Extensions::new();
self.execute_with_extensions(req, &mut ext).await
}
/// Executes a request with initial [`Extensions`].
pub async fn execute_with_extensions(
&self,
req: Request,
ext: &mut Extensions,
) -> Result<Response> {
let next = Next::new(&self.inner, &self.middleware_stack);
next.run(req, ext).await
}
}
/// Create a `ClientWithMiddleware` without any middleware.
impl From<Client> for ClientWithMiddleware {
fn from(client: Client) -> Self {
ClientWithMiddleware {
inner: client,
middleware_stack: Box::new([]),
initialiser_stack: Box::new([]),
pub fn request<U: IntoUrl>(&self, method: Method, url: U) -> RequestBuilder<'_, M, I> {
let mut extensions = Extensions::new();
let request = self.inner.request(method, url);
let request = self.initialiser_stack.init(request, &mut extensions);
RequestBuilder {
inner: request,
client: self,
extensions,
}
}
}
impl fmt::Debug for ClientWithMiddleware {
/// Create a `ClientWithMiddleware` without any middleware.
impl From<Client> for ClientWithMiddleware<Identity, Identity> {
fn from(client: Client) -> Self {
ClientWithMiddleware {
inner: client,
middleware_stack: Identity,
initialiser_stack: Identity,
}
}
}
impl<M, I> fmt::Debug for ClientWithMiddleware<M, I> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
// skipping middleware_stack field for now
f.debug_struct("ClientWithMiddleware")
@ -184,13 +141,28 @@ impl fmt::Debug for ClientWithMiddleware {
/// This is a wrapper around [`reqwest::RequestBuilder`] exposing the same API.
#[must_use = "RequestBuilder does nothing until you 'send' it"]
pub struct RequestBuilder {
pub struct RequestBuilder<'client, M, I> {
inner: reqwest::RequestBuilder,
client: ClientWithMiddleware,
client: &'client ClientWithMiddleware<M, I>,
extensions: Extensions,
}
impl RequestBuilder {
#[derive(Clone)]
pub struct ReqwestService(Client);
impl Service for ReqwestService {
type Future = BoxFuture<'static, Result<Response, Error>>;
fn call(&mut self, req: Request, _: &mut Extensions) -> Self::Future {
let fut = self.0.execute(req);
async { fut.await.map_err(Error::from) }.boxed()
}
}
impl<M: Layer<ReqwestService>, I: RequestInitialiser> RequestBuilder<'_, M, I>
where
M::Service: Service,
{
pub fn header<K, V>(self, key: K, value: V) -> Self
where
HeaderName: TryFrom<K>,
@ -289,14 +261,19 @@ impl RequestBuilder {
&mut self.extensions
}
pub async fn send(self) -> Result<Response> {
pub async fn send(self) -> Result<Response, Error> {
let Self {
inner,
client,
mut extensions,
} = self;
let req = inner.build()?;
client.execute_with_extensions(req, &mut extensions).await
let mut svc = client
.middleware_stack
.layer(ReqwestService(client.inner.clone()));
svc.call(req, &mut extensions).await
// client.execute_with_extensions(req, &mut extensions).await
}
/// Attempt to clone the RequestBuilder.
@ -309,13 +286,13 @@ impl RequestBuilder {
pub fn try_clone(&self) -> Option<Self> {
self.inner.try_clone().map(|inner| RequestBuilder {
inner,
client: self.client.clone(),
client: self.client,
extensions: Extensions::new(),
})
}
}
impl fmt::Debug for RequestBuilder {
impl<M, I> fmt::Debug for RequestBuilder<'_, M, I> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
// skipping middleware_stack field for now
f.debug_struct("RequestBuilder")

View File

@ -1,7 +1,5 @@
use thiserror::Error;
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Error, Debug)]
pub enum Error {
/// There was an error running some middleware

View File

@ -7,30 +7,44 @@
//!
//! ```
//! use reqwest::{Client, Request, Response};
//! use reqwest_middleware::{ClientBuilder, Middleware, Next, Result};
//! use reqwest_middleware::{ClientBuilder, Error, Extension, Layer, Service};
//! use task_local_extensions::Extensions;
//! use futures::future::{BoxFuture, FutureExt};
//! use std::task::{Context, Poll};
//!
//! struct LoggingMiddleware;
//! struct LoggingLayer;
//! struct LoggingService<S>(S);
//!
//! #[async_trait::async_trait]
//! impl Middleware for LoggingMiddleware {
//! async fn handle(
//! &self,
//! req: Request,
//! extensions: &mut Extensions,
//! next: Next<'_>,
//! ) -> Result<Response> {
//! println!("Request started {:?}", req);
//! let res = next.run(req, extensions).await;
//! println!("Result: {:?}", res);
//! res
//! impl<S> Layer<S> for LoggingLayer {
//! type Service = LoggingService<S>;
//!
//! fn layer(&self, inner: S) -> Self::Service {
//! LoggingService(inner)
//! }
//! }
//!
//! impl<S> Service for LoggingService<S>
//! where
//! S: Service,
//! S::Future: Send + 'static,
//! {
//! type Future = BoxFuture<'static, Result<Response, Error>>;
//!
//! fn call(&mut self, req: Request, ext: &mut Extensions) -> Self::Future {
//! println!("Request started {req:?}");
//! let fut = self.0.call(req, ext);
//! async {
//! let res = fut.await;
//! println!("Result: {res:?}");
//! res
//! }.boxed()
//! }
//! }
//!
//! async fn run() {
//! let reqwest_client = Client::builder().build().unwrap();
//! let client = ClientBuilder::new(reqwest_client)
//! .with(LoggingMiddleware)
//! .with(LoggingLayer)
//! .build();
//! let resp = client.get("https://truelayer.com").send().await.unwrap();
//! println!("TrueLayer page HTML: {}", resp.text().await.unwrap());
@ -51,10 +65,54 @@ pub struct ReadmeDoctests;
mod client;
mod error;
mod middleware;
mod req_init;
pub use client::{ClientBuilder, ClientWithMiddleware, RequestBuilder};
pub use error::{Error, Result};
pub use middleware::{Middleware, Next};
pub use req_init::{Extension, RequestInitialiser};
pub use client::{ClientBuilder, ClientWithMiddleware, RequestBuilder, ReqwestService};
pub use error::Error;
pub use req_init::{Extension, RequestInitialiser, RequestStack};
use reqwest::{Request, Response};
use task_local_extensions::Extensions;
/// Two [`RequestInitialiser`]s or [`Service`]s chained together.
#[derive(Clone)]
pub struct Stack<Inner, Outer> {
pub(crate) inner: Inner,
pub(crate) outer: Outer,
}
pub trait Service {
type Future: std::future::Future<Output = Result<Response, Error>>;
fn call(&mut self, req: Request, ext: &mut Extensions) -> Self::Future;
}
pub struct Identity;
impl<S: Service> Layer<S> for Identity {
type Service = S;
fn layer(&self, inner: S) -> Self::Service {
inner
}
}
pub trait Layer<S> {
/// The wrapped service
type Service;
/// Wrap the given service with the middleware, returning a new service
/// that has been decorated with the middleware.
fn layer(&self, inner: S) -> Self::Service;
}
impl<S, Inner, Outer> Layer<S> for Stack<Inner, Outer>
where
Inner: Layer<S>,
Outer: Layer<Inner::Service>,
{
type Service = Outer::Service;
fn layer(&self, service: S) -> Self::Service {
let inner = self.inner.layer(service);
self.outer.layer(inner)
}
}

View File

@ -1,100 +0,0 @@
use reqwest::{Client, Request, Response};
use std::sync::Arc;
use task_local_extensions::Extensions;
use crate::error::{Error, Result};
/// When attached to a [`ClientWithMiddleware`] (generally using [`with`]), middleware is run
/// whenever the client issues a request, in the order it was attached.
///
/// # Example
///
/// ```
/// use reqwest::{Client, Request, Response};
/// use reqwest_middleware::{ClientBuilder, Middleware, Next, Result};
/// use task_local_extensions::Extensions;
///
/// struct TransparentMiddleware;
///
/// #[async_trait::async_trait]
/// impl Middleware for TransparentMiddleware {
/// async fn handle(
/// &self,
/// req: Request,
/// extensions: &mut Extensions,
/// next: Next<'_>,
/// ) -> Result<Response> {
/// next.run(req, extensions).await
/// }
/// }
/// ```
///
/// [`ClientWithMiddleware`]: crate::ClientWithMiddleware
/// [`with`]: crate::ClientBuilder::with
#[async_trait::async_trait]
pub trait Middleware: 'static + Send + Sync {
/// Invoked with a request before sending it. If you want to continue processing the request,
/// you should explicitly call `next.run(req, extensions)`.
///
/// If you need to forward data down the middleware stack, you can use the `extensions`
/// argument.
async fn handle(
&self,
req: Request,
extensions: &mut Extensions,
next: Next<'_>,
) -> Result<Response>;
}
#[async_trait::async_trait]
impl<F> Middleware for F
where
F: Send
+ Sync
+ 'static
+ for<'a> Fn(Request, &'a mut Extensions, Next<'a>) -> BoxFuture<'a, Result<Response>>,
{
async fn handle(
&self,
req: Request,
extensions: &mut Extensions,
next: Next<'_>,
) -> Result<Response> {
(self)(req, extensions, next).await
}
}
/// Next encapsulates the remaining middleware chain to run in [`Middleware::handle`]. You can
/// forward the request down the chain with [`run`].
///
/// [`Middleware::handle`]: Middleware::handle
/// [`run`]: Self::run
#[derive(Clone)]
pub struct Next<'a> {
client: &'a Client,
middlewares: &'a [Arc<dyn Middleware>],
}
pub type BoxFuture<'a, T> = std::pin::Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;
impl<'a> Next<'a> {
pub(crate) fn new(client: &'a Client, middlewares: &'a [Arc<dyn Middleware>]) -> Self {
Next {
client,
middlewares,
}
}
pub fn run(
mut self,
req: Request,
extensions: &'a mut Extensions,
) -> BoxFuture<'a, Result<Response>> {
if let Some((current, rest)) = self.middlewares.split_first() {
self.middlewares = rest;
Box::pin(current.handle(req, extensions, self))
} else {
Box::pin(async move { self.client.execute(req).await.map_err(Error::from) })
}
}
}

View File

@ -1,4 +1,7 @@
use crate::RequestBuilder;
use reqwest::RequestBuilder;
use task_local_extensions::Extensions;
use crate::Identity;
/// When attached to a [`ClientWithMiddleware`] (generally using [`with_init`]), it is run
/// whenever the client starts building a request, in the order it was attached.
@ -6,12 +9,14 @@ use crate::RequestBuilder;
/// # Example
///
/// ```
/// use reqwest_middleware::{RequestInitialiser, RequestBuilder};
/// use reqwest::RequestBuilder;
/// use reqwest_middleware::RequestInitialiser;
/// use task_local_extensions::Extensions;
///
/// struct AuthInit;
///
/// impl RequestInitialiser for AuthInit {
/// fn init(&self, req: RequestBuilder) -> RequestBuilder {
/// fn init(&self, req: RequestBuilder, ext: &mut Extensions) -> RequestBuilder {
/// req.bearer_auth("my_auth_token")
/// }
/// }
@ -20,15 +25,30 @@ use crate::RequestBuilder;
/// [`ClientWithMiddleware`]: crate::ClientWithMiddleware
/// [`with_init`]: crate::ClientBuilder::with_init
pub trait RequestInitialiser: 'static + Send + Sync {
fn init(&self, req: RequestBuilder) -> RequestBuilder;
fn init(&self, req: RequestBuilder, ext: &mut Extensions) -> RequestBuilder;
}
impl<F> RequestInitialiser for F
impl RequestInitialiser for Identity {
fn init(&self, req: RequestBuilder, _: &mut Extensions) -> RequestBuilder {
req
}
}
/// Two [`RequestInitialiser`]s chained together.
#[derive(Clone)]
pub struct RequestStack<Inner, Outer> {
pub(crate) inner: Inner,
pub(crate) outer: Outer,
}
impl<I, O> RequestInitialiser for RequestStack<I, O>
where
F: Send + Sync + 'static + Fn(RequestBuilder) -> RequestBuilder,
I: RequestInitialiser,
O: RequestInitialiser,
{
fn init(&self, req: RequestBuilder) -> RequestBuilder {
(self)(req)
fn init(&self, req: RequestBuilder, ext: &mut Extensions) -> RequestBuilder {
let req = self.inner.init(req, ext);
self.outer.init(req, ext)
}
}
@ -38,30 +58,45 @@ where
///
/// ```
/// use reqwest::{Client, Request, Response};
/// use reqwest_middleware::{ClientBuilder, Middleware, Next, Result, Extension};
/// use reqwest_middleware::{ClientBuilder, Error, Extension, Layer, Service};
/// use task_local_extensions::Extensions;
/// use futures::future::{BoxFuture, FutureExt};
/// use std::task::{Context, Poll};
///
/// #[derive(Clone)]
/// struct LogName(&'static str);
/// struct LoggingMiddleware;
///
/// #[async_trait::async_trait]
/// impl Middleware for LoggingMiddleware {
/// async fn handle(
/// &self,
/// req: Request,
/// extensions: &mut Extensions,
/// next: Next<'_>,
/// ) -> Result<Response> {
/// struct LoggingLayer;
/// struct LoggingService<S>(S);
///
/// impl<S> Layer<S> for LoggingLayer {
/// type Service = LoggingService<S>;
///
/// fn layer(&self, inner: S) -> Self::Service {
/// LoggingService(inner)
/// }
/// }
///
/// impl<S> Service for LoggingService<S>
/// where
/// S: Service,
/// S::Future: Send + 'static,
/// {
/// type Future = BoxFuture<'static, Result<Response, Error>>;
///
/// fn call(&mut self, req: Request, ext: &mut Extensions) -> Self::Future {
/// // get the log name or default to "unknown"
/// let name = extensions
/// let name = ext
/// .get()
/// .map(|&LogName(name)| name)
/// .unwrap_or("unknown");
/// println!("[{name}] Request started {req:?}");
/// let res = next.run(req, extensions).await;
/// println!("[{name}] Result: {res:?}");
/// res
/// let fut = self.0.call(req, ext);
/// async move {
/// let res = fut.await;
/// println!("[{name}] Result: {res:?}");
/// res
/// }.boxed()
/// }
/// }
///
@ -69,7 +104,7 @@ where
/// let reqwest_client = Client::builder().build().unwrap();
/// let client = ClientBuilder::new(reqwest_client)
/// .with_init(Extension(LogName("my-client")))
/// .with(LoggingMiddleware)
/// .with(LoggingLayer)
/// .build();
/// let resp = client.get("https://truelayer.com").send().await.unwrap();
/// println!("TrueLayer page HTML: {}", resp.text().await.unwrap());
@ -78,7 +113,8 @@ where
pub struct Extension<T>(pub T);
impl<T: Send + Sync + Clone + 'static> RequestInitialiser for Extension<T> {
fn init(&self, req: RequestBuilder) -> RequestBuilder {
req.with_extension(self.0.clone())
fn init(&self, req: RequestBuilder, ext: &mut Extensions) -> RequestBuilder {
ext.insert(self.0.clone());
req
}
}

View File

@ -1,6 +1,6 @@
[package]
name = "reqwest-retry"
version = "0.2.0"
version = "0.3.0"
authors = ["Rodrigo Gryzinski <rodrigo.gryzinski@truelayer.com>"]
edition = "2018"
description = "Retry middleware for reqwest."
@ -10,7 +10,7 @@ keywords = ["reqwest", "http", "middleware", "retry"]
categories = ["web-programming::http-client"]
[dependencies]
reqwest-middleware = { version = "0.2.0", path = "../reqwest-middleware" }
reqwest-middleware = { version = "0.3.0", path = "../reqwest-middleware" }
anyhow = "1"
async-trait = "0.1.51"
@ -23,6 +23,7 @@ retry-policies = "0.1"
task-local-extensions = "0.1.1"
tokio = { version = "1.6", features = ["time"] }
tracing = "0.1.26"
pin-project-lite = "0.2"
[dev-dependencies]
async-std = { version = "1.10"}

View File

@ -1,15 +1,17 @@
//! `RetryTransientMiddleware` implements retrying requests on transient errors.
use std::pin::Pin;
use std::task::{ready, Context, Poll};
use crate::retryable::Retryable;
use anyhow::anyhow;
use chrono::Utc;
use futures::Future;
use pin_project_lite::pin_project;
use reqwest::{Request, Response};
use reqwest_middleware::{Error, Middleware, Next, Result};
use reqwest_middleware::{Error, Layer, Service};
use retry_policies::RetryPolicy;
use task_local_extensions::Extensions;
/// We limit the number of retries to a maximum of `10` to avoid stack-overflow issues due to the recursion.
static MAXIMUM_NUMBER_OF_RETRIES: u32 = 10;
use tokio::time::Sleep;
/// `RetryTransientMiddleware` offers retry logic for requests that fail in a transient manner
/// and can be safely executed again.
@ -58,76 +60,253 @@ impl<T: RetryPolicy + Send + Sync> RetryTransientMiddleware<T> {
}
}
#[async_trait::async_trait]
impl<T: RetryPolicy + Send + Sync> Middleware for RetryTransientMiddleware<T> {
async fn handle(
&self,
req: Request,
extensions: &mut Extensions,
next: Next<'_>,
) -> Result<Response> {
// TODO: Ideally we should create a new instance of the `Extensions` map to pass
// downstream. This will guard against previous retries poluting `Extensions`.
// That is, we only return what's populated in the typemap for the last retry attempt
// and copy those into the the `global` Extensions map.
self.execute_with_retry(req, next, extensions).await
}
}
impl<T, Svc> Layer<Svc> for RetryTransientMiddleware<T>
where
T: RetryPolicy + Clone + Send + Sync + 'static,
{
type Service = Retry<TowerRetryPolicy<T>, Svc>;
impl<T: RetryPolicy + Send + Sync> RetryTransientMiddleware<T> {
/// This function will try to execute the request, if it fails
/// with an error classified as transient it will call itself
/// to retry the request.
async fn execute_with_retry<'a>(
&'a self,
req: Request,
next: Next<'a>,
ext: &'a mut Extensions,
) -> Result<Response> {
let mut n_past_retries = 0;
loop {
// Cloning the request object before-the-fact is not ideal..
// However, if the body of the request is not static, e.g of type `Bytes`,
// the Clone operation should be of constant complexity and not O(N)
// since the byte abstraction is a shared pointer over a buffer.
let duplicate_request = req.try_clone().ok_or_else(|| {
Error::Middleware(anyhow!(
"Request object is not clonable. Are you passing a streaming body?".to_string()
))
})?;
let result = next.clone().run(duplicate_request, ext).await;
// We classify the response which will return None if not
// errors were returned.
break match Retryable::from_reqwest_response(&result) {
Some(retryable)
if retryable == Retryable::Transient
&& n_past_retries < MAXIMUM_NUMBER_OF_RETRIES =>
{
// If the response failed and the error type was transient
// we can safely try to retry the request.
let retry_decicion = self.retry_policy.should_retry(n_past_retries);
if let retry_policies::RetryDecision::Retry { execute_after } = retry_decicion {
let duration = (execute_after - Utc::now())
.to_std()
.map_err(Error::middleware)?;
// Sleep the requested amount before we try again.
tracing::warn!(
"Retry attempt #{}. Sleeping {:?} before the next attempt",
n_past_retries,
duration
);
tokio::time::sleep(duration).await;
n_past_retries += 1;
continue;
} else {
result
}
}
Some(_) | None => result,
};
fn layer(&self, inner: Svc) -> Self::Service {
Retry {
policy: TowerRetryPolicy {
n_past_retries: 0,
retry_policy: self.retry_policy.clone(),
},
service: inner,
}
}
}
#[derive(Clone)]
pub struct TowerRetryPolicy<T> {
n_past_retries: u32,
retry_policy: T,
}
pin_project! {
pub struct RetryFuture<T>
{
retry: Option<TowerRetryPolicy<T>>,
#[pin]
sleep: Sleep,
}
}
impl<T> Future for RetryFuture<T> {
type Output = TowerRetryPolicy<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
ready!(this.sleep.poll(cx));
Poll::Ready(
this.retry
.take()
.expect("poll should not be called more than once"),
)
}
}
impl<T: RetryPolicy + Clone> Policy for TowerRetryPolicy<T> {
type Future = RetryFuture<T>;
fn retry(&self, _req: &Request, result: &Result<Response, Error>) -> Option<Self::Future> {
// We classify the response which will return None if not
// errors were returned.
match Retryable::from_reqwest_response(result) {
Some(Retryable::Transient) => {
// If the response failed and the error type was transient
// we can safely try to retry the request.
let retry_decicion = self.retry_policy.should_retry(self.n_past_retries);
if let retry_policies::RetryDecision::Retry { execute_after } = retry_decicion {
let duration = (execute_after - Utc::now()).to_std().ok()?;
// Sleep the requested amount before we try again.
tracing::warn!(
"Retry attempt #{}. Sleeping {:?} before the next attempt",
self.n_past_retries,
duration
);
let sleep = tokio::time::sleep(duration);
Some(RetryFuture {
retry: Some(TowerRetryPolicy {
n_past_retries: self.n_past_retries + 1,
retry_policy: self.retry_policy.clone(),
}),
sleep,
})
} else {
None
}
}
Some(_) | None => None,
}
}
fn clone_request(&self, req: &Request) -> Option<Request> {
req.try_clone()
}
}
pub trait Policy: Sized {
/// The [`Future`] type returned by [`Policy::retry`].
type Future: Future<Output = Self>;
/// Check the policy if a certain request should be retried.
///
/// This method is passed a reference to the original request, and either
/// the [`Service::Response`] or [`Service::Error`] from the inner service.
///
/// If the request should **not** be retried, return `None`.
///
/// If the request *should* be retried, return `Some` future of a new
/// policy that would apply for the next request attempt.
///
/// [`Service::Response`]: crate::Service::Response
/// [`Service::Error`]: crate::Service::Error
fn retry(&self, req: &Request, result: &Result<Response, Error>) -> Option<Self::Future>;
/// Tries to clone a request before being passed to the inner service.
///
/// If the request cannot be cloned, return [`None`].
fn clone_request(&self, req: &Request) -> Option<Request>;
}
pin_project! {
/// Configure retrying requests of "failed" responses.
///
/// A [`Policy`] classifies what is a "failed" response.
#[derive(Clone, Debug)]
pub struct Retry<P, S> {
#[pin]
policy: P,
service: S,
}
}
impl<P, S> Service for Retry<P, S>
where
P: 'static + Policy + Clone,
S: 'static + Service + Clone,
{
type Future = ResponseFuture<P, S>;
fn call(&mut self, request: Request, ext: &mut Extensions) -> Self::Future {
let cloned = self.policy.clone_request(&request);
let future = self.service.call(request, ext);
ResponseFuture::new(cloned, self.clone(), future)
}
// fn call(&mut self, request: Request) -> Self::Future {
// let cloned = self.policy.clone_request(&request);
// let future = self.service.call(request);
// ResponseFuture::new(cloned, self.clone(), future)
// }
}
pin_project! {
/// The [`Future`] returned by a [`Retry`] service.
#[derive(Debug)]
pub struct ResponseFuture<P, S>
where
P: Policy,
S: Service,
{
request: Option<Request>,
#[pin]
retry: Retry<P, S>,
#[pin]
state: State<S::Future, P::Future>,
}
}
pin_project! {
#[project = StateProj]
#[derive(Debug)]
enum State<F, P> {
// Polling the future from [`Service::call`]
Called {
#[pin]
future: F
},
// Polling the future from [`Policy::retry`]
Checking {
#[pin]
checking: P
},
// Polling [`Service::poll_ready`] after [`Checking`] was OK.
Retrying,
}
}
impl<P, S> ResponseFuture<P, S>
where
P: Policy,
S: Service,
{
pub(crate) fn new(
request: Option<Request>,
retry: Retry<P, S>,
future: S::Future,
) -> ResponseFuture<P, S> {
ResponseFuture {
request,
retry,
state: State::Called { future },
}
}
}
impl<P, S> Future for ResponseFuture<P, S>
where
P: Policy + Clone,
S: Service + Clone,
{
type Output = Result<Response, Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
match this.state.as_mut().project() {
StateProj::Called { future } => {
let result = ready!(future.poll(cx));
if let Some(ref req) = this.request {
match this.retry.policy.retry(req, &result) {
Some(checking) => {
this.state.set(State::Checking { checking });
}
None => return Poll::Ready(result),
}
} else {
// request wasn't cloned, so no way to retry it
return Poll::Ready(result);
}
}
StateProj::Checking { checking } => {
this.retry
.as_mut()
.project()
.policy
.set(ready!(checking.poll(cx)));
this.state.set(State::Retrying);
}
StateProj::Retrying => {
let req = this
.request
.take()
.expect("retrying requires cloned request");
*this.request = this.retry.policy.clone_request(&req);
this.state.set(State::Called {
future: this
.retry
.as_mut()
.project()
.service
.call(req, &mut Extensions::new()),
});
}
}
}
}
}

View File

@ -147,17 +147,6 @@ assert_retry_succeeds!(429, StatusCode::OK);
assert_no_retry!(431, StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE);
assert_no_retry!(451, StatusCode::UNAVAILABLE_FOR_LEGAL_REASONS);
// We assert that we cap retries at 10, which means that we will
// get 11 calls to the RetryResponder.
assert_retry_succeeds_inner!(
500,
assert_maximum_retries_is_not_exceeded,
StatusCode::INTERNAL_SERVER_ERROR,
100,
11,
RetryResponder::new(100_u32, 500)
);
pub struct RetryTimeoutResponder(Arc<AtomicU32>, u32, std::time::Duration);
impl RetryTimeoutResponder {

View File

@ -1,6 +1,6 @@
[package]
name = "reqwest-tracing"
version = "0.4.0"
version = "0.5.0"
authors = ["Rodrigo Gryzinski <rodrigo.gryzinski@truelayer.com>"]
edition = "2018"
description = "Opentracing middleware for reqwest."
@ -19,12 +19,13 @@ opentelemetry_0_18 = ["opentelemetry_0_18_pkg", "tracing-opentelemetry_0_18_pkg"
[dependencies]
reqwest-middleware = { version = "0.2.0", path = "../reqwest-middleware" }
reqwest-middleware = { version = "0.3.0", path = "../reqwest-middleware" }
async-trait = "0.1.51"
reqwest = { version = "0.11", default-features = false }
task-local-extensions = "0.1.1"
tracing = "0.1.26"
pin-project-lite = "0.2"
opentelemetry_0_13_pkg = { package = "opentelemetry", version = "0.13", optional = true }
opentelemetry_0_14_pkg = { package = "opentelemetry", version = "0.14", optional = true }

View File

@ -4,11 +4,11 @@
//!
//! The simplest possible usage:
//! ```no_run
//! # use reqwest_middleware::Result;
//! # use reqwest_middleware::Error;
//! use reqwest_middleware::{ClientBuilder};
//! use reqwest_tracing::TracingMiddleware;
//!
//! # async fn example() -> Result<()> {
//! # async fn example() -> Result<(), Error> {
//! let reqwest_client = reqwest::Client::builder().build().unwrap();
//! let client = ClientBuilder::new(reqwest_client)
//! // Insert the tracing middleware
@ -22,12 +22,12 @@
//!
//! To customise the span names use [`OtelName`].
//! ```no_run
//! # use reqwest_middleware::Result;
//! # use reqwest_middleware::Error;
//! use reqwest_middleware::{ClientBuilder, Extension};
//! use reqwest_tracing::{
//! TracingMiddleware, OtelName
//! };
//! # async fn example() -> Result<()> {
//! # async fn example() -> Result<(), Error> {
//! let reqwest_client = reqwest::Client::builder().build().unwrap();
//! let client = ClientBuilder::new(reqwest_client)
//! // Inserts the extension before the request is started
@ -52,7 +52,7 @@
//!
//! Note that Opentelemetry tracks start and stop already, there is no need to have a custom builder like this.
//! ```rust
//! use reqwest_middleware::Result;
//! use reqwest_middleware::Error;
//! use task_local_extensions::Extensions;
//! use reqwest::{Request, Response};
//! use reqwest_middleware::ClientBuilder;
@ -62,16 +62,17 @@
//! use tracing::Span;
//! use std::time::{Duration, Instant};
//!
//! pub struct TimeTrace;
//! pub struct TimeTrace(Instant);
//!
//! impl ReqwestOtelSpanBackend for TimeTrace {
//! fn on_request_start(req: &Request, extension: &mut Extensions) -> Span {
//! extension.insert(Instant::now());
//! reqwest_otel_span!(name="example-request", req, time_elapsed = tracing::field::Empty)
//! fn on_request_start(req: &Request, _extension: &mut Extensions) -> (Self, Span) {
//! let now = Self(Instant::now());
//! let span = reqwest_otel_span!(name="example-request", req, time_elapsed = tracing::field::Empty);
//! (now, span)
//! }
//!
//! fn on_request_end(span: &Span, outcome: &Result<Response>, extension: &mut Extensions) {
//! let time_elapsed = extension.get::<Instant>().unwrap().elapsed().as_millis() as i64;
//! fn on_request_end(self, span: &Span, outcome: &Result<Response, Error>) {
//! let time_elapsed = self.0.elapsed().as_millis() as i64;
//! default_on_request_end(span, outcome);
//! span.record("time_elapsed", &time_elapsed);
//! }

View File

@ -1,7 +1,14 @@
use std::{
future::Future,
task::{ready, Context, Poll},
};
use pin_project_lite::pin_project;
use reqwest::{Request, Response};
use reqwest_middleware::{Middleware, Next, Result};
use reqwest_middleware::{Error, Layer, Service};
use task_local_extensions::Extensions;
use tracing::Instrument;
// use tower::{Layer, Service};
use tracing::Span;
use crate::{DefaultSpanBackend, ReqwestOtelSpanBackend};
@ -10,6 +17,8 @@ pub struct TracingMiddleware<S: ReqwestOtelSpanBackend> {
span_backend: std::marker::PhantomData<S>,
}
impl<S: ReqwestOtelSpanBackend> Copy for TracingMiddleware<S> {}
impl<S: ReqwestOtelSpanBackend> TracingMiddleware<S> {
pub fn new() -> TracingMiddleware<S> {
TracingMiddleware {
@ -30,38 +39,82 @@ impl Default for TracingMiddleware<DefaultSpanBackend> {
}
}
#[async_trait::async_trait]
impl<ReqwestOtelSpan> Middleware for TracingMiddleware<ReqwestOtelSpan>
impl<ReqwestOtelSpan, Svc> Layer<Svc> for TracingMiddleware<ReqwestOtelSpan>
where
ReqwestOtelSpan: ReqwestOtelSpanBackend + Sync + Send + 'static,
Svc: Service,
{
async fn handle(
&self,
req: Request,
extensions: &mut Extensions,
next: Next<'_>,
) -> Result<Response> {
let request_span = ReqwestOtelSpan::on_request_start(&req, extensions);
type Service = TracingMiddlewareService<ReqwestOtelSpan, Svc>;
let outcome_future = async {
// Adds tracing headers to the given request to propagate the OpenTelemetry context to downstream revivers of the request.
// Spans added by downstream consumers will be part of the same trace.
#[cfg(any(
feature = "opentelemetry_0_13",
feature = "opentelemetry_0_14",
feature = "opentelemetry_0_15",
feature = "opentelemetry_0_16",
feature = "opentelemetry_0_17",
feature = "opentelemetry_0_18",
))]
let req = crate::otel::inject_opentelemetry_context_into_request(req);
// Run the request
let outcome = next.run(req, extensions).await;
ReqwestOtelSpan::on_request_end(&request_span, &outcome, extensions);
outcome
};
outcome_future.instrument(request_span.clone()).await
fn layer(&self, inner: Svc) -> Self::Service {
TracingMiddlewareService {
service: inner,
_layer: *self,
}
}
}
/// Middleware Service for tracing requests using the current Opentelemetry Context.
pub struct TracingMiddlewareService<S: ReqwestOtelSpanBackend, Svc> {
_layer: TracingMiddleware<S>,
service: Svc,
}
impl<ReqwestOtelSpan, Svc> Service for TracingMiddlewareService<ReqwestOtelSpan, Svc>
where
ReqwestOtelSpan: ReqwestOtelSpanBackend + Sync + Send + 'static,
Svc: Service,
{
type Future = TracingMiddlewareFuture<ReqwestOtelSpan, Svc::Future>;
fn call(&mut self, req: Request, ext: &mut Extensions) -> Self::Future {
let (backend, span) = ReqwestOtelSpan::on_request_start(&req, ext);
// Adds tracing headers to the given request to propagate the OpenTelemetry context to downstream revivers of the request.
// Spans added by downstream consumers will be part of the same trace.
#[cfg(any(
feature = "opentelemetry_0_13",
feature = "opentelemetry_0_14",
feature = "opentelemetry_0_15",
feature = "opentelemetry_0_16",
feature = "opentelemetry_0_17",
feature = "opentelemetry_0_18",
))]
let request = crate::otel::inject_opentelemetry_context_into_request(request);
let future = self.service.call(req, ext);
TracingMiddlewareFuture {
span,
backend: Some(backend),
future,
}
}
}
pin_project!(
pub struct TracingMiddlewareFuture<S: ReqwestOtelSpanBackend, F> {
span: Span,
backend: Option<S>,
#[pin]
future: F,
}
);
impl<S: ReqwestOtelSpanBackend, F: Future<Output = Result<Response, Error>>> Future
for TracingMiddlewareFuture<S, F>
{
type Output = F::Output;
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let outcome = {
let _guard = this.span.enter();
ready!(this.future.poll(cx))
};
this.backend
.take()
.expect("poll should not be called after completion")
.on_request_end(this.span, &outcome);
Poll::Ready(outcome)
}
}

View File

@ -2,7 +2,7 @@ use std::borrow::Cow;
use reqwest::header::{HeaderMap, HeaderValue};
use reqwest::{Request, Response, StatusCode as RequestStatusCode, Url};
use reqwest_middleware::{Error, Result};
use reqwest_middleware::Error;
use task_local_extensions::Extensions;
use tracing::Span;
@ -39,17 +39,17 @@ pub const HTTP_USER_AGENT: &str = "http.user_agent";
/// Check out [`reqwest_otel_span`] documentation for examples.
///
/// [`TracingMiddleware`]: crate::middleware::TracingMiddleware.
pub trait ReqwestOtelSpanBackend {
pub trait ReqwestOtelSpanBackend: Sized {
/// Initalized a new span before the request is executed.
fn on_request_start(req: &Request, extension: &mut Extensions) -> Span;
fn on_request_start(req: &Request, extension: &mut Extensions) -> (Self, Span);
/// Runs after the request call has executed.
fn on_request_end(span: &Span, outcome: &Result<Response>, extension: &mut Extensions);
fn on_request_end(self, span: &Span, outcome: &Result<Response, Error>);
}
/// Populates default success/failure fields for a given [`reqwest_otel_span!`] span.
#[inline]
pub fn default_on_request_end(span: &Span, outcome: &Result<Response>) {
pub fn default_on_request_end(span: &Span, outcome: &Result<Response, Error>) {
match outcome {
Ok(res) => default_on_request_success(span, res),
Err(err) => default_on_request_failure(span, err),
@ -95,15 +95,15 @@ pub fn default_on_request_failure(span: &Span, e: &Error) {
pub struct DefaultSpanBackend;
impl ReqwestOtelSpanBackend for DefaultSpanBackend {
fn on_request_start(req: &Request, ext: &mut Extensions) -> Span {
fn on_request_start(req: &Request, ext: &mut Extensions) -> (DefaultSpanBackend, Span) {
let name = ext
.get::<OtelName>()
.map(|on| on.0.as_ref())
.unwrap_or("reqwest-http-client");
reqwest_otel_span!(name = name, req)
(Self, reqwest_otel_span!(name = name, req))
}
fn on_request_end(span: &Span, outcome: &Result<Response>, _: &mut Extensions) {
fn on_request_end(self, span: &Span, outcome: &Result<Response, Error>) {
default_on_request_end(span, outcome)
}
}
@ -119,16 +119,19 @@ fn get_header_value(key: &str, headers: &HeaderMap) -> String {
pub struct SpanBackendWithUrl;
impl ReqwestOtelSpanBackend for SpanBackendWithUrl {
fn on_request_start(req: &Request, ext: &mut Extensions) -> Span {
fn on_request_start(req: &Request, ext: &mut Extensions) -> (Self, Span) {
let name = ext
.get::<OtelName>()
.map(|on| on.0.as_ref())
.unwrap_or("reqwest-http-client");
reqwest_otel_span!(name = name, req, http.url = %remove_credentials(req.url()))
(
Self,
reqwest_otel_span!(name = name, req, http.url = %remove_credentials(req.url())),
)
}
fn on_request_end(span: &Span, outcome: &Result<Response>, _: &mut Extensions) {
fn on_request_end(self, span: &Span, outcome: &Result<Response, Error>) {
default_on_request_end(span, outcome)
}
}
@ -156,28 +159,28 @@ fn get_span_status(request_status: RequestStatusCode) -> Option<&'static str> {
///
/// Usage:
/// ```no_run
/// # use reqwest_middleware::Result;
/// # use reqwest_middleware::Error;
/// use reqwest_middleware::{ClientBuilder, Extension};
/// use reqwest_tracing::{
/// TracingMiddleware, OtelName
/// };
/// # async fn example() -> Result<()> {
/// # async fn example() -> Result<(), Error> {
/// let reqwest_client = reqwest::Client::builder().build().unwrap();
/// let client = ClientBuilder::new(reqwest_client)
/// // Inserts the extension before the request is started
/// .with_init(Extension(OtelName("my-client".into())))
/// // Makes use of that extension to specify the otel name
/// .with(TracingMiddleware::default())
/// .build();
/// // Inserts the extension before the request is started
/// .with_init(Extension(OtelName("my-client".into())))
/// // Makes use of that extension to specify the otel name
/// .with(TracingMiddleware::default())
/// .build();
///
/// let resp = client.get("https://truelayer.com").send().await.unwrap();
///
/// // Or specify it on the individual request (will take priority)
/// let resp = client.post("https://api.truelayer.com/payment")
/// .with_extension(OtelName("POST /payment".into()))
/// .send()
/// .await
/// .unwrap();
/// .send()
/// .await
/// .unwrap();
/// # Ok(())
/// # }
/// ```

View File

@ -30,7 +30,7 @@
/// The second argument passed to [`reqwest_otel_span!`](crate::reqwest_otel_span) is a reference to an [`reqwest::Request`].
///
/// ```rust
/// use reqwest_middleware::Result;
/// use reqwest_middleware::Error;
/// use task_local_extensions::Extensions;
/// use reqwest::{Request, Response};
/// use reqwest_tracing::{
@ -41,11 +41,11 @@
/// pub struct CustomReqwestOtelSpanBackend;
///
/// impl ReqwestOtelSpanBackend for CustomReqwestOtelSpanBackend {
/// fn on_request_start(req: &Request, _extension: &mut Extensions) -> Span {
/// reqwest_otel_span!(name = "reqwest-http-request", req)
/// fn on_request_start(req: &Request, _extension: &mut Extensions) -> (Self, Span) {
/// (Self, reqwest_otel_span!(name = "reqwest-http-request", req))
/// }
///
/// fn on_request_end(span: &Span, outcome: &Result<Response>, _extension: &mut Extensions) {
/// fn on_request_end(self, span: &Span, outcome: &Result<Response, Error>) {
/// default_on_request_end(span, outcome)
/// }
/// }