Compare commits

...

28 Commits
tower ... main

Author SHA1 Message Date
Zynh0722 c284c446c8 feat: derive(Debug) on ClientBuilder 2024-02-09 00:55:04 -08:00
Ethan Brierley 1c31f4bc7f
chore!(reqwest-tracing): bump patch version 🚀 2024-01-21 20:04:58 +00:00
Julian Tescher 32cc268f07
Add support for `opentelemetry` version `0.21` (#112)
* Add support for `opentelemetry` version `0.21`

* Fix tracer provider import
2024-01-19 11:17:54 +00:00
Ethan Brierley d297b1c5df
chore: `cargo fmt` 2023-11-07 09:59:22 +00:00
Jan Srzednicki b7017dca6f
Document using `tokio::time::sleep` (#111)
* Document using `tokio::time::sleep`

* Update reqwest-retry/src/middleware.rs

---------

Co-authored-by: Ethan Brierley <ethanboxx@gmail.com>
2023-11-07 09:35:16 +00:00
Ethan Brierley 304d6f1ebc
ci: fix `rustdoc::redundant_explicit_links` (#110) 2023-10-24 10:50:53 +01:00
Ari de1b651bcc
add fetch_mode_no_cors_method (#108)
* add fetch_mode_no_cors_method

* Update changelog

* Bump verion
2023-10-24 10:39:06 +01:00
Tom Milligan efdd52f2fa
readme: add link to reqwest-chain crate (#103) 2023-09-12 11:45:07 +01:00
Thom Wright 7eb52bbe65
[EWT-319] Upgrade retry-policies (#104) 2023-09-08 10:22:46 +01:00
tl-rodrigo-gryzinski f1e71bef3c
Disable wasm tests and add warning to README (#105) 2023-09-07 14:46:53 +01:00
tl-rodrigo-gryzinski c2d477edae
[reqwest-retry] Patch release (#102) 2023-08-30 13:15:36 +01:00
Ethan Brierley a54319a9d6
Patch release `v0.4.6` 🚀 (#100)
* Add support for OpenTelemetry v0.20

* Add back dev-deps

* Attempt at tests

* Update reqwest-tracing/src/otel.rs

* Update reqwest-tracing/src/otel.rs

* Update reqwest-tracing/Cargo.toml

* Update reqwest-tracing/CHANGELOG.md
2023-08-23 12:34:20 +01:00
Conrad Ludgate 7b3493d96c
add opentelementry 0.20 support (#101) 2023-08-23 11:49:15 +01:00
Ethan Brierley a530ea7f93
Cut `reqwest-middleware` patch release `0.2.3 (#98) 2023-08-07 10:50:56 +01:00
Oleh Martsokha 31f8aebce4
Impl reqwest::Error methods (#96)
* impl reqwest::Error methods

* update changelog

* use reqwest::StatusCode

* typo

* consistent comments

* fn is_middleware()
2023-08-07 10:34:01 +01:00
Conrad Ludgate b8b9400858
disable context propagation (#94)
* disable context propagation

* bump version

* removes leftover feature

* fix new test

* add back disabled test
2023-06-20 18:07:12 +01:00
Rutger 594075583c
Added a way to specify custom functions which decide whether a request should be retried or not (#33)
* Add a generic function to the middleware struct for the `Retryable` decision.

The generic function can be used to define custom behaviour to decide whether to retry a request or not.

By default, this function is `Retryable::from_reqwest_response` which is the same as it was before.

* Add a way to create custom retry policies.

A RetryStrategy will dictate what decision will be made based on the result of the sent request.

* Add RetryableStrategy in the `RetryTransientMiddleware` struct instead of the seperate functions

* Add constructor to create a `RetryTransientMiddleware` with a custom `RetryableStrategy`

* Run `cargo fmt`

* Add example code to the `RetryableStrategy` struct

* Run `cargo fmt`

* Updated changelog

* use a trait

* docs

* include latest changes

Co-authored-by: Conrad Ludgate <conrad.ludgate@truelayer.com>
2023-05-22 11:53:31 +01:00
Conrad Ludgate fb7a964ba3
add default_span_name helper function (#93) 2023-05-15 14:55:56 +01:00
Conrad Ludgate 385314a298
fix otel status (#92)
* fix otel status

* bump
2023-05-15 11:45:55 +01:00
Alex Wakefield af1080f21c
Move path matching changes to right log (#91)
* doc: move path matching changes to right log

* update

---------

Co-authored-by: Conrad Ludgate <conrad.ludgate@truelayer.com>
2023-05-12 11:16:42 +01:00
Conrad Ludgate 80d56d89f3
release middleware 2023-05-11 14:17:05 +01:00
Jeramy Singleton d5e71a0c89
Added version function to RequestBuilder impl (#90)
* Adds version function to RequestBuilder impl

Addresses an issue where the Http version of a request cannot be changed when using reqwest_middleware

* Removes  function from wasm compiles

---------

Co-authored-by: Jeramy Singleton <wisingle@microsoft.com>
2023-05-11 13:30:15 +01:00
Alex Wakefield 3457bf5702
Add `OtelPathNames` for span names (#89)
* feat: add `OtelPathNames` for span names

If this extension is provided span names will be `<method> <path name>`.
These path names will include parameter names rather than IDs or other elements that would increase the cardinality.

* doc: update changelog

* refactor: update `known_paths` to return error

* refactor: return `anyhow::Error` instead

This is as `reqwest_middleware::Error` is more focused on handling a
request.
2023-04-26 15:28:32 +01:00
Matthew Gapp f8ff599f50
fix: remove middleware retry limit (#87)
* refactor: Simplify retry middleware in reqwest.

Removed retry limit:

- Remove MAXIMUM_NUMBER_OF_RETRIES metadata and references in middleware.rs
- Correct mispelling: retry_decision to retry_decision in middleware.rs
- Update Retryable matching to only match Retryable::Transient in middleware.rs
- Simplify branching by removing n_past_retries < MAXIMUM_NUMBER_OF_RETRIES condition in middleware.rs

* test: Remove retry cap assertion from test file

remove retry limit assertion test
2023-04-17 13:02:13 +01:00
Léo Gaspard fef18b3506
feat: wasm32-unknown-unknown support (#79)
* feat: wasm32-unknown-unknown support

This replaces task-local-extensions with http's extensions, as http was
already in the dependency closure anyway and the other features of
task-local-extensions (that required an incompatible-with-wasm part of
tokio) were not used anyway.

* feat: have ci check that wasm32-unknown-unknown keeps compiling

* revert back to task-local-extensions

* fix ci

* fix random on wasm

* fix ci again

* bump

---------

Co-authored-by: Conrad Ludgate <conrad.ludgate@truelayer.com>
2023-03-09 11:33:13 +00:00
tl-helge-hoff f854725791
feat: classify io error connection reset by peer (#78)
* feat: classify io error connection reset by peer

* doc: explain tests

* chore: add changelog

* fix: use and_then

* chore: docs and clippy

* chore: bump reqwest-retry to 0.2.1

* fix: use get_source_error_type

* fix: remove unused import

* fix: make clippy happy

* doc: describe canceled
2022-12-02 10:57:24 +01:00
John Vandenberg 8763ab1e30
README.md: Add third party middleware list (#75) 2022-11-23 11:20:21 +00:00
John Vandenberg a7f9a112ed
Update example in tracing README (#76) 2022-11-23 07:39:54 +00:00
24 changed files with 1015 additions and 260 deletions

View File

@ -18,6 +18,9 @@ jobs:
- opentelemetry_0_16
- opentelemetry_0_17
- opentelemetry_0_18
- opentelemetry_0_19
- opentelemetry_0_20
- opentelemetry_0_21
steps:
- name: Checkout repository
uses: actions/checkout@v2
@ -63,6 +66,9 @@ jobs:
- opentelemetry_0_16
- opentelemetry_0_17
- opentelemetry_0_18
- opentelemetry_0_19
- opentelemetry_0_20
- opentelemetry_0_21
steps:
- name: Checkout repository
uses: actions/checkout@v2
@ -91,6 +97,9 @@ jobs:
- opentelemetry_0_16
- opentelemetry_0_17
- opentelemetry_0_18
- opentelemetry_0_19
- opentelemetry_0_20
- opentelemetry_0_21
steps:
- name: Checkout repository
uses: actions/checkout@v2
@ -131,27 +140,3 @@ jobs:
with:
command: publish
args: --dry-run --manifest-path reqwest-tracing/Cargo.toml
coverage:
name: Code coverage
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v2
- name: Install stable toolchain
uses: actions-rs/toolchain@v1
with:
toolchain: stable
profile: minimal
override: true
- name: Run cargo-tarpaulin
uses: actions-rs/tarpaulin@v0.1
with:
args: '--ignore-tests --out Lcov'
- name: Upload to Coveralls
# upload only if push
if: ${{ github.event_name == 'push' }}
uses: coverallsapp/github-action@master
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
path-to-lcov: './lcov.info'

10
.gitignore vendored
View File

@ -1,2 +1,10 @@
/target
# OS
.DS_Store
# IDE
.idea/
.vscode/
# Rust
Cargo.lock
/target

View File

@ -4,7 +4,25 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
### [0.2.4] - 2023-09-21
### Added
- Added `fetch_mode_no_cors` method to `reqwest_middleware::RequestBuilder`
## [0.2.3] - 2023-08-07
### Added
- Added all `reqwest::Error` methods for `reqwest_middleware::Error`
## [0.2.2] - 2023-05-11
### Added
- `RequestBuilder::version` method to configure the HTTP version
## [0.2.1] - 2023-03-09
### Added
- Support for `wasm32-unknown-unknown`
## [0.2.0] - 2022-11-15

View File

@ -15,6 +15,11 @@ implementations. This repository also contains a couple of useful concrete middl
* [`reqwest-tracing`](https://crates.io/crates/reqwest-tracing):
[`tracing`](https://crates.io/crates/tracing) integration, optional opentelemetry support.
Note about browser support: automated tests targetting wasm are disabled. The crate may work with
wasm but wasm support is unmaintained. PRs improving wasm are still welcome but you'd need to
reintroduce the tests and get them passing before we'd merge it (see
https://github.com/TrueLayer/reqwest-middleware/pull/105).
## Overview
The `reqwest-middleware` client exposes the same interface as a plain `reqwest` client, but
@ -73,3 +78,14 @@ Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in the work by you, as defined in the Apache-2.0 license, shall be
dual licensed as above, without any additional terms or conditions.
</sub>
## Third-party middleware
The following third-party middleware use `request-middleware`:
- [`reqwest-conditional-middleware`](https://github.com/oxidecomputer/reqwest-conditional-middleware) - Per-request basis middleware
- [`http-cache`](https://github.com/06chaynes/http-cache) - HTTP caching rules
- [`reqwest-cache`](https://gitlab.com/famedly/company/backend/libraries/reqwest-cache) - HTTP caching
- [`aliri_reqwest`](https://github.com/neoeinstein/aliri/tree/main/aliri_reqwest) - Background token management and renewal
- [`http-signature-normalization-reqwest`](https://crates.io/crates/http-signature-normalization-reqwest) (not free software) - HTTP Signatures
- [`reqwest-chain`](https://github.com/tommilligan/reqwest-chain) - Apply custom criteria to any reqwest response, deciding when and how to retry.

View File

@ -1,6 +1,6 @@
[package]
name = "reqwest-middleware"
version = "0.2.0"
version = "0.2.4"
authors = ["Rodrigo Gryzinski <rodrigo.gryzinski@truelayer.com>"]
edition = "2018"
description = "Wrapper around reqwest to allow for client middleware chains."
@ -11,17 +11,17 @@ categories = ["web-programming::http-client"]
readme = "../README.md"
[dependencies]
anyhow = "1"
anyhow = "1.0.0"
async-trait = "0.1.51"
http = "0.2"
reqwest = { version = "0.11", default-features = false, features = ["json", "multipart"] }
serde = "1"
task-local-extensions = "0.1.1"
thiserror = "1"
http = "0.2.0"
reqwest = { version = "0.11.4", default-features = false, features = ["json", "multipart"] }
serde = "1.0.106"
task-local-extensions = "0.1.4"
thiserror = "1.0.21"
[dev-dependencies]
reqwest = "0.11"
reqwest = "0.11.4"
reqwest-retry = { path = "../reqwest-retry" }
reqwest-tracing = { path = "../reqwest-tracing" }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
wiremock = "0.5"
tokio = { version = "1.0.0", features = ["macros", "rt-multi-thread"] }
wiremock = "0.5.0"

View File

@ -5,7 +5,6 @@ use serde::Serialize;
use std::convert::TryFrom;
use std::fmt::{self, Display};
use std::sync::Arc;
use std::time::Duration;
use task_local_extensions::Extensions;
use crate::error::Result;
@ -15,6 +14,7 @@ use crate::RequestInitialiser;
/// A `ClientBuilder` is used to build a [`ClientWithMiddleware`].
///
/// [`ClientWithMiddleware`]: crate::ClientWithMiddleware
#[derive(Debug)]
pub struct ClientBuilder {
client: Client,
middleware_stack: Vec<Arc<dyn Middleware>>,
@ -211,6 +211,14 @@ impl RequestBuilder {
}
}
#[cfg(not(target_arch = "wasm32"))]
pub fn version(self, version: reqwest::Version) -> Self {
RequestBuilder {
inner: self.inner.version(version),
..self
}
}
pub fn basic_auth<U, P>(self, username: U, password: Option<P>) -> Self
where
U: Display,
@ -239,7 +247,8 @@ impl RequestBuilder {
}
}
pub fn timeout(self, timeout: Duration) -> Self {
#[cfg(not(target_arch = "wasm32"))]
pub fn timeout(self, timeout: std::time::Duration) -> Self {
RequestBuilder {
inner: self.inner.timeout(timeout),
..self
@ -274,6 +283,13 @@ impl RequestBuilder {
}
}
pub fn fetch_mode_no_cors(self) -> Self {
RequestBuilder {
inner: self.inner.fetch_mode_no_cors(),
..self
}
}
pub fn build(self) -> reqwest::Result<Request> {
self.inner.build()
}

View File

@ -1,3 +1,4 @@
use reqwest::{StatusCode, Url};
use thiserror::Error;
pub type Result<T> = std::result::Result<T, Error>;
@ -19,4 +20,122 @@ impl Error {
{
Error::Middleware(err.into())
}
/// Returns a possible URL related to this error.
pub fn url(&self) -> Option<&Url> {
match self {
Error::Middleware(_) => None,
Error::Reqwest(e) => e.url(),
}
}
/// Returns a mutable reference to the URL related to this error.
///
/// This is useful if you need to remove sensitive information from the URL
/// (e.g. an API key in the query), but do not want to remove the URL
/// entirely.
pub fn url_mut(&mut self) -> Option<&mut Url> {
match self {
Error::Middleware(_) => None,
Error::Reqwest(e) => e.url_mut(),
}
}
/// Adds a url related to this error (overwriting any existing).
pub fn with_url(self, url: Url) -> Self {
match self {
Error::Middleware(_) => self,
Error::Reqwest(e) => e.with_url(url).into(),
}
}
/// Strips the related URL from this error (if, for example, it contains
/// sensitive information).
pub fn without_url(self) -> Self {
match self {
Error::Middleware(_) => self,
Error::Reqwest(e) => e.without_url().into(),
}
}
/// Returns true if the error is from any middleware.
pub fn is_middleware(&self) -> bool {
match self {
Error::Middleware(_) => true,
Error::Reqwest(_) => false,
}
}
/// Returns true if the error is from a type `Builder`.
pub fn is_builder(&self) -> bool {
match self {
Error::Middleware(_) => false,
Error::Reqwest(e) => e.is_builder(),
}
}
/// Returns true if the error is from a `RedirectPolicy`.
pub fn is_redirect(&self) -> bool {
match self {
Error::Middleware(_) => false,
Error::Reqwest(e) => e.is_redirect(),
}
}
/// Returns true if the error is from `Response::error_for_status`.
pub fn is_status(&self) -> bool {
match self {
Error::Middleware(_) => false,
Error::Reqwest(e) => e.is_status(),
}
}
/// Returns true if the error is related to a timeout.
pub fn is_timeout(&self) -> bool {
match self {
Error::Middleware(_) => false,
Error::Reqwest(e) => e.is_timeout(),
}
}
/// Returns true if the error is related to the request.
pub fn is_request(&self) -> bool {
match self {
Error::Middleware(_) => false,
Error::Reqwest(e) => e.is_request(),
}
}
#[cfg(not(target_arch = "wasm32"))]
/// Returns true if the error is related to connect.
pub fn is_connect(&self) -> bool {
match self {
Error::Middleware(_) => false,
Error::Reqwest(e) => e.is_connect(),
}
}
/// Returns true if the error is related to the request or response body.
pub fn is_body(&self) -> bool {
match self {
Error::Middleware(_) => false,
Error::Reqwest(e) => e.is_body(),
}
}
/// Returns true if the error is related to decoding the response's body.
pub fn is_decode(&self) -> bool {
match self {
Error::Middleware(_) => false,
Error::Reqwest(e) => e.is_decode(),
}
}
/// Returns the status code, if the error was generated from a response.
pub fn status(&self) -> Option<StatusCode> {
match self {
Error::Middleware(_) => None,
Error::Reqwest(e) => e.status(),
}
}
}

View File

@ -31,7 +31,8 @@ use crate::error::{Error, Result};
///
/// [`ClientWithMiddleware`]: crate::ClientWithMiddleware
/// [`with`]: crate::ClientBuilder::with
#[async_trait::async_trait]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
pub trait Middleware: 'static + Send + Sync {
/// Invoked with a request before sending it. If you want to continue processing the request,
/// you should explicitly call `next.run(req, extensions)`.
@ -46,7 +47,14 @@ pub trait Middleware: 'static + Send + Sync {
) -> Result<Response>;
}
#[async_trait::async_trait]
impl std::fmt::Debug for dyn Middleware {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Middleware")
}
}
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
impl<F> Middleware for F
where
F: Send
@ -75,7 +83,10 @@ pub struct Next<'a> {
middlewares: &'a [Arc<dyn Middleware>],
}
#[cfg(not(target_arch = "wasm32"))]
pub type BoxFuture<'a, T> = std::pin::Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;
#[cfg(target_arch = "wasm32")]
pub type BoxFuture<'a, T> = std::pin::Pin<Box<dyn std::future::Future<Output = T> + 'a>>;
impl<'a> Next<'a> {
pub(crate) fn new(client: &'a Client, middlewares: &'a [Arc<dyn Middleware>]) -> Self {

View File

@ -23,6 +23,12 @@ pub trait RequestInitialiser: 'static + Send + Sync {
fn init(&self, req: RequestBuilder) -> RequestBuilder;
}
impl core::fmt::Debug for dyn RequestInitialiser {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "RequestInitialiser")
}
}
impl<F> RequestInitialiser for F
where
F: Send + Sync + 'static + Fn(RequestBuilder) -> RequestBuilder,

View File

@ -4,10 +4,20 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
## [0.3.0] - 2023-09-07
### Changed
- `retry-policies` upgraded to 0.2.0
## [0.2.3] - 2023-08-30
### Added
- `RetryableStrategy` which allows for custom retry decisions based on the response that a request got
## [0.2.1] - 2022-12-01
### Changed
- Classify `io::Error`s and `hyper::Error(Canceled)` as transient
## [0.2.0] - 2022-11-15
### Changed
- Updated `reqwest-middleware` to `0.2.0`

View File

@ -1,6 +1,6 @@
[package]
name = "reqwest-retry"
version = "0.2.0"
version = "0.3.0"
authors = ["Rodrigo Gryzinski <rodrigo.gryzinski@truelayer.com>"]
edition = "2018"
description = "Retry middleware for reqwest."
@ -12,20 +12,27 @@ categories = ["web-programming::http-client"]
[dependencies]
reqwest-middleware = { version = "0.2.0", path = "../reqwest-middleware" }
anyhow = "1"
anyhow = "1.0.0"
async-trait = "0.1.51"
chrono = { version = "0.4.19", features = ["clock"], default-features = false }
futures = "0.3"
http = "0.2"
hyper = "0.14"
reqwest = { version = "0.11", default-features = false }
retry-policies = "0.1"
task-local-extensions = "0.1.1"
tokio = { version = "1.6", features = ["time"] }
futures = "0.3.0"
http = "0.2.0"
reqwest = { version = "0.11.0", default-features = false }
retry-policies = "0.2.0"
task-local-extensions = "0.1.4"
tracing = "0.1.26"
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
hyper = "0.14.0"
tokio = { version = "1.6.0", features = ["time"] }
[target.'cfg(target_arch = "wasm32")'.dependencies]
parking_lot = { version = "0.11.2", features = ["wasm-bindgen"] } # work around https://github.com/tomaka/wasm-timer/issues/14
wasm-timer = "0.2.5"
getrandom = { version = "0.2.0", features = ["js"] }
[dev-dependencies]
async-std = { version = "1.10"}
paste = "1"
tokio = { version = "1", features = ["macros"] }
wiremock = "0.5"
paste = "1.0.0"
tokio = { version = "1.0.0", features = ["full"] }
wiremock = "0.5.0"
futures = "0.3.0"

View File

@ -27,8 +27,13 @@
mod middleware;
mod retryable;
mod retryable_strategy;
pub use retry_policies::{policies, RetryPolicy};
pub use middleware::RetryTransientMiddleware;
pub use retryable::Retryable;
pub use retryable_strategy::{
default_on_request_failure, default_on_request_success, DefaultRetryableStrategy,
RetryableStrategy,
};

View File

@ -1,6 +1,6 @@
//! `RetryTransientMiddleware` implements retrying requests on transient errors.
use crate::retryable::Retryable;
use crate::retryable_strategy::RetryableStrategy;
use crate::{retryable::Retryable, retryable_strategy::DefaultRetryableStrategy};
use anyhow::anyhow;
use chrono::Utc;
use reqwest::{Request, Response};
@ -8,14 +8,13 @@ use reqwest_middleware::{Error, Middleware, Next, Result};
use retry_policies::RetryPolicy;
use task_local_extensions::Extensions;
/// We limit the number of retries to a maximum of `10` to avoid stack-overflow issues due to the recursion.
static MAXIMUM_NUMBER_OF_RETRIES: u32 = 10;
/// `RetryTransientMiddleware` offers retry logic for requests that fail in a transient manner
/// and can be safely executed again.
///
/// Currently, it allows setting a [RetryPolicy][retry_policies::RetryPolicy] algorithm for calculating the __wait_time__
/// between each request retry.
/// Currently, it allows setting a [RetryPolicy] algorithm for calculating the __wait_time__
/// between each request retry. Sleeping on non-`wasm32` archs is performed using
/// [`tokio::time::sleep`], therefore it will respect pauses/auto-advance if run under a
/// runtime that supports them.
///
///```rust
/// use reqwest_middleware::ClientBuilder;
@ -47,19 +46,42 @@ static MAXIMUM_NUMBER_OF_RETRIES: u32 = 10;
/// * You can wrap this middleware in a custom one which skips retries for streaming requests.
/// * You can write a custom retry middleware that builds new streaming requests from the data
/// source directly, avoiding the issue of streaming requests not being clonable.
pub struct RetryTransientMiddleware<T: RetryPolicy + Send + Sync + 'static> {
pub struct RetryTransientMiddleware<
T: RetryPolicy + Send + Sync + 'static,
R: RetryableStrategy + Send + Sync + 'static = DefaultRetryableStrategy,
> {
retry_policy: T,
retryable_strategy: R,
}
impl<T: RetryPolicy + Send + Sync> RetryTransientMiddleware<T> {
/// Construct `RetryTransientMiddleware` with a [retry_policy][retry_policies::RetryPolicy].
impl<T: RetryPolicy + Send + Sync> RetryTransientMiddleware<T, DefaultRetryableStrategy> {
/// Construct `RetryTransientMiddleware` with a [retry_policy][RetryPolicy].
pub fn new_with_policy(retry_policy: T) -> Self {
Self { retry_policy }
Self::new_with_policy_and_strategy(retry_policy, DefaultRetryableStrategy)
}
}
#[async_trait::async_trait]
impl<T: RetryPolicy + Send + Sync> Middleware for RetryTransientMiddleware<T> {
impl<T, R> RetryTransientMiddleware<T, R>
where
T: RetryPolicy + Send + Sync,
R: RetryableStrategy + Send + Sync,
{
/// Construct `RetryTransientMiddleware` with a [retry_policy][RetryPolicy] and [retryable_strategy](RetryableStrategy).
pub fn new_with_policy_and_strategy(retry_policy: T, retryable_strategy: R) -> Self {
Self {
retry_policy,
retryable_strategy,
}
}
}
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
impl<T, R> Middleware for RetryTransientMiddleware<T, R>
where
T: RetryPolicy + Send + Sync,
R: RetryableStrategy + Send + Sync + 'static,
{
async fn handle(
&self,
req: Request,
@ -74,7 +96,11 @@ impl<T: RetryPolicy + Send + Sync> Middleware for RetryTransientMiddleware<T> {
}
}
impl<T: RetryPolicy + Send + Sync> RetryTransientMiddleware<T> {
impl<T, R> RetryTransientMiddleware<T, R>
where
T: RetryPolicy + Send + Sync,
R: RetryableStrategy + Send + Sync,
{
/// This function will try to execute the request, if it fails
/// with an error classified as transient it will call itself
/// to retry the request.
@ -100,15 +126,12 @@ impl<T: RetryPolicy + Send + Sync> RetryTransientMiddleware<T> {
// We classify the response which will return None if not
// errors were returned.
break match Retryable::from_reqwest_response(&result) {
Some(retryable)
if retryable == Retryable::Transient
&& n_past_retries < MAXIMUM_NUMBER_OF_RETRIES =>
{
break match self.retryable_strategy.handle(&result) {
Some(Retryable::Transient) => {
// If the response failed and the error type was transient
// we can safely try to retry the request.
let retry_decicion = self.retry_policy.should_retry(n_past_retries);
if let retry_policies::RetryDecision::Retry { execute_after } = retry_decicion {
let retry_decision = self.retry_policy.should_retry(n_past_retries);
if let retry_policies::RetryDecision::Retry { execute_after } = retry_decision {
let duration = (execute_after - Utc::now())
.to_std()
.map_err(Error::middleware)?;
@ -118,7 +141,12 @@ impl<T: RetryPolicy + Send + Sync> RetryTransientMiddleware<T> {
n_past_retries,
duration
);
#[cfg(not(target_arch = "wasm32"))]
tokio::time::sleep(duration).await;
#[cfg(target_arch = "wasm32")]
wasm_timer::Delay::new(duration)
.await
.expect("failed sleeping");
n_past_retries += 1;
continue;

View File

@ -1,4 +1,4 @@
use http::StatusCode;
use crate::retryable_strategy::{DefaultRetryableStrategy, RetryableStrategy};
use reqwest_middleware::Error;
/// Classification of an error/status returned by request.
@ -16,62 +16,7 @@ impl Retryable {
/// Returns `None` if the response object does not contain any errors.
///
pub fn from_reqwest_response(res: &Result<reqwest::Response, Error>) -> Option<Self> {
match res {
Ok(success) => {
let status = success.status();
if status.is_server_error() {
Some(Retryable::Transient)
} else if status.is_client_error()
&& status != StatusCode::REQUEST_TIMEOUT
&& status != StatusCode::TOO_MANY_REQUESTS
{
Some(Retryable::Fatal)
} else if status.is_success() {
None
} else if status == StatusCode::REQUEST_TIMEOUT
|| status == StatusCode::TOO_MANY_REQUESTS
{
Some(Retryable::Transient)
} else {
Some(Retryable::Fatal)
}
}
Err(error) => match error {
// If something fails in the middleware we're screwed.
Error::Middleware(_) => Some(Retryable::Fatal),
Error::Reqwest(error) => {
if error.is_timeout() || error.is_connect() {
Some(Retryable::Transient)
} else if error.is_body()
|| error.is_decode()
|| error.is_builder()
|| error.is_redirect()
{
Some(Retryable::Fatal)
} else if error.is_request() {
// It seems that hyper::Error(IncompleteMessage) is not correctly handled by reqwest.
// Here we check if the Reqwest error was originated by hyper and map it consistently.
if let Some(hyper_error) = get_source_error_type::<hyper::Error>(&error) {
// The hyper::Error(IncompleteMessage) is raised if the HTTP response is well formatted but does not contain all the bytes.
// This can happen when the server has started sending back the response but the connection is cut halfway thorugh.
// We can safely retry the call, hence marking this error as [`Retryable::Transient`].
if hyper_error.is_incomplete_message() {
Some(Retryable::Transient)
} else {
Some(Retryable::Fatal)
}
} else {
Some(Retryable::Fatal)
}
} else {
// We omit checking if error.is_status() since we check that already.
// However, if Response::error_for_status is used the status will still
// remain in the response object.
None
}
}
},
}
DefaultRetryableStrategy.handle(res)
}
}
@ -80,19 +25,3 @@ impl From<&reqwest::Error> for Retryable {
Retryable::Transient
}
}
/// Downcasts the given err source into T.
fn get_source_error_type<T: std::error::Error + 'static>(
err: &dyn std::error::Error,
) -> Option<&T> {
let mut source = err.source();
while let Some(err) = source {
if let Some(hyper_err) = err.downcast_ref::<T>() {
return Some(hyper_err);
}
source = err.source();
}
None
}

View File

@ -0,0 +1,213 @@
use crate::retryable::Retryable;
use http::StatusCode;
use reqwest_middleware::Error;
/// A strategy to create a [`Retryable`] from a [`Result<reqwest::Response, reqwest_middleware::Error>`]
///
/// A [`RetryableStrategy`] has a single `handler` functions.
/// The result of calling the request could be:
/// - [`reqwest::Response`] In case the request has been sent and received correctly
/// This could however still mean that the server responded with a erroneous response.
/// For example a HTTP statuscode of 500
/// - [`reqwest_middleware::Error`] In this case the request actually failed.
/// This could, for example, be caused by a timeout on the connection.
///
/// Example:
///
/// ```
/// use reqwest_retry::{default_on_request_failure, policies::ExponentialBackoff, Retryable, RetryableStrategy, RetryTransientMiddleware};
/// use reqwest::{Request, Response};
/// use reqwest_middleware::{ClientBuilder, Middleware, Next, Result};
/// use task_local_extensions::Extensions;
///
/// // Log each request to show that the requests will be retried
/// struct LoggingMiddleware;
///
/// #[async_trait::async_trait]
/// impl Middleware for LoggingMiddleware {
/// async fn handle(
/// &self,
/// req: Request,
/// extensions: &mut Extensions,
/// next: Next<'_>,
/// ) -> Result<Response> {
/// println!("Request started {}", req.url());
/// let res = next.run(req, extensions).await;
/// println!("Request finished");
/// res
/// }
/// }
///
/// // Just a toy example, retry when the successful response code is 201, else do nothing.
/// struct Retry201;
/// impl RetryableStrategy for Retry201 {
/// fn handle(&self, res: &Result<reqwest::Response>) -> Option<Retryable> {
/// match res {
/// // retry if 201
/// Ok(success) if success.status() == 201 => Some(Retryable::Transient),
/// // otherwise do not retry a successful request
/// Ok(success) => None,
/// // but maybe retry a request failure
/// Err(error) => default_on_request_failure(error),
/// }
/// }
/// }
///
/// #[tokio::main]
/// async fn main() {
/// // Exponential backoff with max 2 retries
/// let retry_policy = ExponentialBackoff::builder()
/// .build_with_max_retries(2);
///
/// // Create the actual middleware, with the exponential backoff and custom retry stategy.
/// let ret_s = RetryTransientMiddleware::new_with_policy_and_strategy(
/// retry_policy,
/// Retry201,
/// );
///
/// let client = ClientBuilder::new(reqwest::Client::new())
/// // Retry failed requests.
/// .with(ret_s)
/// // Log the requests
/// .with(LoggingMiddleware)
/// .build();
///
/// // Send request which should get a 201 response. So it will be retried
/// let r = client
/// .get("https://httpbin.org/status/201")
/// .send()
/// .await;
/// println!("{:?}", r);
///
/// // Send request which should get a 200 response. So it will not be retried
/// let r = client
/// .get("https://httpbin.org/status/200")
/// .send()
/// .await;
/// println!("{:?}", r);
/// }
/// ```
pub trait RetryableStrategy {
fn handle(&self, res: &Result<reqwest::Response, Error>) -> Option<Retryable>;
}
/// The default [`RetryableStrategy`] for [`RetryTransientMiddleware`](crate::RetryTransientMiddleware).
pub struct DefaultRetryableStrategy;
impl RetryableStrategy for DefaultRetryableStrategy {
fn handle(&self, res: &Result<reqwest::Response, Error>) -> Option<Retryable> {
match res {
Ok(success) => default_on_request_success(success),
Err(error) => default_on_request_failure(error),
}
}
}
/// Default request success retry strategy.
///
/// Will only retry if:
/// * The status was 5XX (server error)
/// * The status was 408 (request timeout) or 429 (too many requests)
///
/// Note that success here means that the request finished without interruption, not that it was logically OK.
pub fn default_on_request_success(success: &reqwest::Response) -> Option<Retryable> {
let status = success.status();
if status.is_server_error() {
Some(Retryable::Transient)
} else if status.is_client_error()
&& status != StatusCode::REQUEST_TIMEOUT
&& status != StatusCode::TOO_MANY_REQUESTS
{
Some(Retryable::Fatal)
} else if status.is_success() {
None
} else if status == StatusCode::REQUEST_TIMEOUT || status == StatusCode::TOO_MANY_REQUESTS {
Some(Retryable::Transient)
} else {
Some(Retryable::Fatal)
}
}
/// Default request failure retry strategy.
///
/// Will only retry if the request failed due to a network error
pub fn default_on_request_failure(error: &Error) -> Option<Retryable> {
match error {
// If something fails in the middleware we're screwed.
Error::Middleware(_) => Some(Retryable::Fatal),
Error::Reqwest(error) => {
#[cfg(not(target_arch = "wasm32"))]
let is_connect = error.is_connect();
#[cfg(target_arch = "wasm32")]
let is_connect = false;
if error.is_timeout() || is_connect {
Some(Retryable::Transient)
} else if error.is_body()
|| error.is_decode()
|| error.is_builder()
|| error.is_redirect()
{
Some(Retryable::Fatal)
} else if error.is_request() {
// It seems that hyper::Error(IncompleteMessage) is not correctly handled by reqwest.
// Here we check if the Reqwest error was originated by hyper and map it consistently.
#[cfg(not(target_arch = "wasm32"))]
if let Some(hyper_error) = get_source_error_type::<hyper::Error>(&error) {
// The hyper::Error(IncompleteMessage) is raised if the HTTP response is well formatted but does not contain all the bytes.
// This can happen when the server has started sending back the response but the connection is cut halfway thorugh.
// We can safely retry the call, hence marking this error as [`Retryable::Transient`].
// Instead hyper::Error(Canceled) is raised when the connection is
// gracefully closed on the server side.
if hyper_error.is_incomplete_message() || hyper_error.is_canceled() {
Some(Retryable::Transient)
// Try and downcast the hyper error to io::Error if that is the
// underlying error, and try and classify it.
} else if let Some(io_error) =
get_source_error_type::<std::io::Error>(hyper_error)
{
Some(classify_io_error(io_error))
} else {
Some(Retryable::Fatal)
}
} else {
Some(Retryable::Fatal)
}
#[cfg(target_arch = "wasm32")]
Some(Retryable::Fatal)
} else {
// We omit checking if error.is_status() since we check that already.
// However, if Response::error_for_status is used the status will still
// remain in the response object.
None
}
}
}
}
#[cfg(not(target_arch = "wasm32"))]
fn classify_io_error(error: &std::io::Error) -> Retryable {
match error.kind() {
std::io::ErrorKind::ConnectionReset | std::io::ErrorKind::ConnectionAborted => {
Retryable::Transient
}
_ => Retryable::Fatal,
}
}
/// Downcasts the given err source into T.
#[cfg(not(target_arch = "wasm32"))]
fn get_source_error_type<T: std::error::Error + 'static>(
err: &dyn std::error::Error,
) -> Option<&T> {
let mut source = err.source();
while let Some(err) = source {
if let Some(err) = err.downcast_ref::<T>() {
return Some(err);
}
source = err.source();
}
None
}

View File

@ -1,9 +1,12 @@
use async_std::io::ReadExt;
use async_std::io::WriteExt;
use async_std::net::{TcpListener, TcpStream};
use futures::stream::StreamExt;
use futures::future::BoxFuture;
use std::error::Error;
use std::fmt;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
type CustomMessageHandler = Box<
dyn Fn(TcpStream) -> BoxFuture<'static, Result<(), Box<dyn std::error::Error>>> + Send + Sync,
>;
/// This is a simple server that returns the responses given at creation time: [`self.raw_http_responses`] following a round-robin mechanism.
pub struct SimpleServer {
@ -12,6 +15,7 @@ pub struct SimpleServer {
host: String,
raw_http_responses: Vec<String>,
calls_counter: usize,
custom_handler: Option<CustomMessageHandler>,
}
/// Request-Line = Method SP Request-URI SP HTTP-Version CRLF
@ -46,9 +50,21 @@ impl SimpleServer {
host: host.to_string(),
raw_http_responses,
calls_counter: 0,
custom_handler: None,
})
}
pub fn set_custom_handler(
&mut self,
custom_handler: impl Fn(TcpStream) -> BoxFuture<'static, Result<(), Box<dyn std::error::Error>>>
+ Send
+ Sync
+ 'static,
) -> &mut Self {
self.custom_handler.replace(Box::new(custom_handler));
self
}
/// Returns the uri in which the server is listening to.
pub fn uri(&self) -> String {
format!("http://{}:{}", self.host, self.port)
@ -56,9 +72,9 @@ impl SimpleServer {
/// Starts the TcpListener and handles the requests.
pub async fn start(mut self) {
while let Some(stream) = self.listener.incoming().next().await {
match stream {
Ok(stream) => {
loop {
match self.listener.accept().await {
Ok((stream, _)) => {
match self.handle_connection(stream).await {
Ok(_) => (),
Err(e) => {
@ -79,11 +95,15 @@ impl SimpleServer {
///
/// Returns a 400 if the request if formatted badly.
async fn handle_connection(&self, mut stream: TcpStream) -> Result<(), Box<dyn Error>> {
if let Some(ref custom_handler) = self.custom_handler {
return custom_handler(stream).await;
}
let mut buffer = vec![0; 1024];
stream.read(&mut buffer).await.unwrap();
let n = stream.read(&mut buffer).await.unwrap();
let request = String::from_utf8_lossy(&buffer[..]);
let request = String::from_utf8_lossy(&buffer[..n]);
let request_line = request.lines().next().unwrap();
let response = match Self::parse_request_line(request_line) {
@ -98,7 +118,7 @@ impl SimpleServer {
};
println!("-- Response --\n{}\n--------------", response.clone());
stream.write(response.as_bytes()).await.unwrap();
stream.write_all(response.as_bytes()).await.unwrap();
stream.flush().await.unwrap();
Ok(())

View File

@ -1,12 +1,16 @@
use futures::FutureExt;
use paste::paste;
use reqwest::Client;
use reqwest::StatusCode;
use reqwest_middleware::ClientBuilder;
use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
use std::sync::atomic::AtomicI8;
use std::sync::{
atomic::{AtomicU32, Ordering},
Arc,
};
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, Respond, ResponseTemplate};
@ -49,12 +53,12 @@ macro_rules! assert_retry_succeeds_inner {
let reqwest_client = Client::builder().build().unwrap();
let client = ClientBuilder::new(reqwest_client)
.with(RetryTransientMiddleware::new_with_policy(
ExponentialBackoff {
max_n_retries: retry_amount,
max_retry_interval: std::time::Duration::from_millis(30),
min_retry_interval: std::time::Duration::from_millis(100),
backoff_exponent: 2,
},
ExponentialBackoff::builder()
.retry_bounds(
std::time::Duration::from_millis(30),
std::time::Duration::from_millis(100),
)
.build_with_max_retries(retry_amount),
))
.build();
@ -147,17 +151,6 @@ assert_retry_succeeds!(429, StatusCode::OK);
assert_no_retry!(431, StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE);
assert_no_retry!(451, StatusCode::UNAVAILABLE_FOR_LEGAL_REASONS);
// We assert that we cap retries at 10, which means that we will
// get 11 calls to the RetryResponder.
assert_retry_succeeds_inner!(
500,
assert_maximum_retries_is_not_exceeded,
StatusCode::INTERNAL_SERVER_ERROR,
100,
11,
RetryResponder::new(100_u32, 500)
);
pub struct RetryTimeoutResponder(Arc<AtomicU32>, u32, std::time::Duration);
impl RetryTimeoutResponder {
@ -196,12 +189,12 @@ async fn assert_retry_on_request_timeout() {
let reqwest_client = Client::builder().build().unwrap();
let client = ClientBuilder::new(reqwest_client)
.with(RetryTransientMiddleware::new_with_policy(
ExponentialBackoff {
max_n_retries: 3,
max_retry_interval: std::time::Duration::from_millis(100),
min_retry_interval: std::time::Duration::from_millis(30),
backoff_exponent: 2,
},
ExponentialBackoff::builder()
.retry_bounds(
std::time::Duration::from_millis(30),
std::time::Duration::from_millis(100),
)
.build_with_max_retries(3),
))
.build();
@ -251,12 +244,111 @@ async fn assert_retry_on_incomplete_message() {
let reqwest_client = Client::builder().build().unwrap();
let client = ClientBuilder::new(reqwest_client)
.with(RetryTransientMiddleware::new_with_policy(
ExponentialBackoff {
max_n_retries: 3,
max_retry_interval: std::time::Duration::from_millis(100),
min_retry_interval: std::time::Duration::from_millis(30),
backoff_exponent: 2,
},
ExponentialBackoff::builder()
.retry_bounds(
std::time::Duration::from_millis(30),
std::time::Duration::from_millis(100),
)
.build_with_max_retries(3),
))
.build();
let resp = client
.get(&format!("{}/foo", uri))
.timeout(std::time::Duration::from_millis(100))
.send()
.await
.expect("call failed");
assert_eq!(resp.status(), 200);
}
#[tokio::test]
async fn assert_retry_on_hyper_canceled() {
let counter = Arc::new(AtomicI8::new(0));
let mut simple_server = SimpleServer::new("127.0.0.1", None, vec![])
.await
.expect("Error when creating a simple server");
simple_server.set_custom_handler(move |mut stream| {
let counter = counter.clone();
async move {
let mut buffer = Vec::new();
stream.read_buf(&mut buffer).await.unwrap();
if counter.fetch_add(1, Ordering::SeqCst) > 1 {
// This triggeres hyper:Error(Canceled).
let _res = stream
.into_std()
.unwrap()
.shutdown(std::net::Shutdown::Both);
} else {
let _res = stream.write("HTTP/1.1 200 OK\r\n\r\n".as_bytes()).await;
}
Ok(())
}
.boxed()
});
let uri = simple_server.uri();
tokio::spawn(simple_server.start());
let reqwest_client = Client::builder().build().unwrap();
let client = ClientBuilder::new(reqwest_client)
.with(RetryTransientMiddleware::new_with_policy(
ExponentialBackoff::builder()
.retry_bounds(
std::time::Duration::from_millis(30),
std::time::Duration::from_millis(100),
)
.build_with_max_retries(3),
))
.build();
let resp = client
.get(&format!("{}/foo", uri))
.timeout(std::time::Duration::from_millis(100))
.send()
.await
.expect("call failed");
assert_eq!(resp.status(), 200);
}
#[tokio::test]
async fn assert_retry_on_connection_reset_by_peer() {
let counter = Arc::new(AtomicI8::new(0));
let mut simple_server = SimpleServer::new("127.0.0.1", None, vec![])
.await
.expect("Error when creating a simple server");
simple_server.set_custom_handler(move |mut stream| {
let counter = counter.clone();
async move {
let mut buffer = Vec::new();
stream.read_buf(&mut buffer).await.unwrap();
if counter.fetch_add(1, Ordering::SeqCst) > 1 {
// This triggeres hyper:Error(Io, io::Error(ConnectionReset)).
drop(stream);
} else {
let _res = stream.write("HTTP/1.1 200 OK\r\n\r\n".as_bytes()).await;
}
Ok(())
}
.boxed()
});
let uri = simple_server.uri();
tokio::spawn(simple_server.start());
let reqwest_client = Client::builder().build().unwrap();
let client = ClientBuilder::new(reqwest_client)
.with(RetryTransientMiddleware::new_with_policy(
ExponentialBackoff::builder()
.retry_bounds(
std::time::Duration::from_millis(30),
std::time::Duration::from_millis(100),
)
.build_with_max_retries(3),
))
.build();

View File

@ -6,6 +6,41 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
## [0.4.6] - 2023-08-23
### Added
- Add support for opentelemetry 0.20
## [0.4.5] - 2023-06-20
### Added
- A new extension `DisableOtelPropagation` which stops opentelemetry contexts propagating
- Support for opentelemetry 0.19
## [0.4.4] - 2023-05-15
### Added
- A new `default_span_name` method for use in custom span backends.
## [0.4.3] - 2023-05-15
### Fixed
- Fix span and http status codes
## [0.4.2] - 2023-05-12
### Added
- `OtelPathNames` extension to provide known parameterized paths that will be used in span names
### Changed
- `DefaultSpanBackend` and `SpanBackendWithUrl` default span name to HTTP method name instead of `reqwest-http-client`
## [0.4.1] - 2023-03-09
### Added
- Support for `wasm32-unknown-unknown` target
## [0.4.0] - 2022-11-15
### Changed

View File

@ -1,6 +1,6 @@
[package]
name = "reqwest-tracing"
version = "0.4.0"
version = "0.4.7"
authors = ["Rodrigo Gryzinski <rodrigo.gryzinski@truelayer.com>"]
edition = "2018"
description = "Opentracing middleware for reqwest."
@ -16,32 +16,49 @@ opentelemetry_0_15 = ["opentelemetry_0_15_pkg", "tracing-opentelemetry_0_14_pkg"
opentelemetry_0_16 = ["opentelemetry_0_16_pkg", "tracing-opentelemetry_0_16_pkg"]
opentelemetry_0_17 = ["opentelemetry_0_17_pkg", "tracing-opentelemetry_0_17_pkg"]
opentelemetry_0_18 = ["opentelemetry_0_18_pkg", "tracing-opentelemetry_0_18_pkg"]
opentelemetry_0_19 = ["opentelemetry_0_19_pkg", "tracing-opentelemetry_0_19_pkg"]
opentelemetry_0_20 = ["opentelemetry_0_20_pkg", "tracing-opentelemetry_0_20_pkg"]
opentelemetry_0_21 = ["opentelemetry_0_21_pkg", "tracing-opentelemetry_0_22_pkg"]
[dependencies]
reqwest-middleware = { version = "0.2.0", path = "../reqwest-middleware" }
anyhow = "1.0.70"
async-trait = "0.1.51"
reqwest = { version = "0.11", default-features = false }
task-local-extensions = "0.1.1"
matchit = "0.7.0"
reqwest = { version = "0.11.0", default-features = false }
task-local-extensions = "0.1.4"
tracing = "0.1.26"
opentelemetry_0_13_pkg = { package = "opentelemetry", version = "0.13", optional = true }
opentelemetry_0_14_pkg = { package = "opentelemetry", version = "0.14", optional = true }
opentelemetry_0_15_pkg = { package = "opentelemetry", version = "0.15", optional = true }
opentelemetry_0_16_pkg = { package = "opentelemetry", version = "0.16", optional = true }
opentelemetry_0_17_pkg = { package = "opentelemetry", version = "0.17", optional = true }
opentelemetry_0_18_pkg = { package = "opentelemetry", version = "0.18", optional = true }
tracing-opentelemetry_0_12_pkg = { package = "tracing-opentelemetry",version = "0.12", optional = true }
tracing-opentelemetry_0_13_pkg = { package = "tracing-opentelemetry", version = "0.13", optional = true }
tracing-opentelemetry_0_14_pkg = { package = "tracing-opentelemetry",version = "0.14", optional = true }
tracing-opentelemetry_0_16_pkg = { package = "tracing-opentelemetry",version = "0.16", optional = true }
tracing-opentelemetry_0_17_pkg = { package = "tracing-opentelemetry",version = "0.17", optional = true }
tracing-opentelemetry_0_18_pkg = { package = "tracing-opentelemetry",version = "0.18", optional = true }
opentelemetry_0_13_pkg = { package = "opentelemetry", version = "0.13.0", optional = true }
opentelemetry_0_14_pkg = { package = "opentelemetry", version = "0.14.0", optional = true }
opentelemetry_0_15_pkg = { package = "opentelemetry", version = "0.15.0", optional = true }
opentelemetry_0_16_pkg = { package = "opentelemetry", version = "0.16.0", optional = true }
opentelemetry_0_17_pkg = { package = "opentelemetry", version = "0.17.0", optional = true }
opentelemetry_0_18_pkg = { package = "opentelemetry", version = "0.18.0", optional = true }
opentelemetry_0_19_pkg = { package = "opentelemetry", version = "0.19.0", optional = true }
opentelemetry_0_20_pkg = { package = "opentelemetry", version = "0.20.0", optional = true }
opentelemetry_0_21_pkg = { package = "opentelemetry", version = "0.21.0", optional = true }
tracing-opentelemetry_0_12_pkg = { package = "tracing-opentelemetry", version = "0.12.0", optional = true }
tracing-opentelemetry_0_13_pkg = { package = "tracing-opentelemetry", version = "0.13.0", optional = true }
tracing-opentelemetry_0_14_pkg = { package = "tracing-opentelemetry", version = "0.14.0", optional = true }
tracing-opentelemetry_0_16_pkg = { package = "tracing-opentelemetry", version = "0.16.0", optional = true }
tracing-opentelemetry_0_17_pkg = { package = "tracing-opentelemetry", version = "0.17.0", optional = true }
tracing-opentelemetry_0_18_pkg = { package = "tracing-opentelemetry", version = "0.18.0", optional = true }
tracing-opentelemetry_0_19_pkg = { package = "tracing-opentelemetry", version = "0.19.0", optional = true }
tracing-opentelemetry_0_20_pkg = { package = "tracing-opentelemetry", version = "0.20.0", optional = true }
tracing-opentelemetry_0_22_pkg = { package = "tracing-opentelemetry", version = "0.22.0", optional = true }
[target.'cfg(target_arch = "wasm32")'.dependencies]
getrandom = { version = "0.2.0", features = ["js"] }
[dev-dependencies]
tokio = { version = "1", features = ["macros"] }
tracing_subscriber_0_2 = { package = "tracing-subscriber", version = "0.2" }
tracing_subscriber_0_3 = { package = "tracing-subscriber", version = "0.3" }
wiremock = "0.5"
tokio = { version = "1.0.0", features = ["macros"] }
tracing_subscriber_0_2 = { package = "tracing-subscriber", version = "0.2.0" }
tracing_subscriber_0_3 = { package = "tracing-subscriber", version = "0.3.0" }
wiremock = "0.5.0"
opentelemetry_sdk_0_21 = { package = "opentelemetry_sdk", version = "0.21.0", features = ["trace"] }
opentelemetry_stdout_0_1 = { package = "opentelemetry-stdout", version = "0.1.0", features = ["trace"] }
opentelemetry_stdout_0_2 = { package = "opentelemetry-stdout", version = "0.2.0", features = ["trace"] }

View File

@ -25,7 +25,7 @@ tokio = { version = "1.12.0", features = ["macros", "rt-multi-thread"] }
tracing = "0.1"
tracing-opentelemetry = "0.18"
tracing-subscriber = "0.3"
task-local-extensions = "0.1.0"
task-local-extensions = "0.1.4"
```
```rust,skip
@ -44,7 +44,7 @@ pub struct TimeTrace;
impl ReqwestOtelSpanBackend for TimeTrace {
fn on_request_start(req: &Request, extension: &mut Extensions) -> Span {
extension.insert(Instant::now());
reqwest_otel_span!(req, time_elapsed = tracing::field::Empty)
reqwest_otel_span!(name="example-request", req, time_elapsed = tracing::field::Empty)
}
fn on_request_end(span: &Span, outcome: &Result<Response>, extension: &mut Extensions) {
@ -92,8 +92,9 @@ an opentelemetry version feature:
reqwest-tracing = { version = "0.3.1", features = ["opentelemetry_0_18"] }
```
Available opentelemetry features are `opentelemetry_0_18`, `opentelemetry_0_17`, `opentelemetry_0_16`, `opentelemetry_0_15`, `opentelemetry_0_14` and
`opentelemetry_0_13`.
Available opentelemetry features are `opentelemetry_0_21`, `opentelemetry_0_20`,
`opentelemetry_0_19`, `opentelemetry_0_18`, `opentelemetry_0_17`, `opentelemetry_0_16`,
`opentelemetry_0_15`, `opentelemetry_0_14` and `opentelemetry_0_13`.
#### License

View File

@ -90,15 +90,19 @@ mod middleware;
feature = "opentelemetry_0_16",
feature = "opentelemetry_0_17",
feature = "opentelemetry_0_18",
feature = "opentelemetry_0_19",
feature = "opentelemetry_0_20",
feature = "opentelemetry_0_21",
))]
mod otel;
mod reqwest_otel_span_builder;
pub use middleware::TracingMiddleware;
pub use reqwest_otel_span_builder::{
default_on_request_end, default_on_request_failure, default_on_request_success,
DefaultSpanBackend, OtelName, ReqwestOtelSpanBackend, SpanBackendWithUrl, ERROR_CAUSE_CHAIN,
ERROR_MESSAGE, HTTP_HOST, HTTP_METHOD, HTTP_SCHEME, HTTP_STATUS_CODE, HTTP_URL,
HTTP_USER_AGENT, NET_HOST_PORT, OTEL_KIND, OTEL_NAME, OTEL_STATUS_CODE,
default_span_name, DefaultSpanBackend, DisableOtelPropagation, OtelName, OtelPathNames,
ReqwestOtelSpanBackend, SpanBackendWithUrl, ERROR_CAUSE_CHAIN, ERROR_MESSAGE, HTTP_HOST,
HTTP_METHOD, HTTP_SCHEME, HTTP_STATUS_CODE, HTTP_URL, HTTP_USER_AGENT, NET_HOST_PORT,
OTEL_KIND, OTEL_NAME, OTEL_STATUS_CODE,
};
#[doc(hidden)]

View File

@ -30,7 +30,8 @@ impl Default for TracingMiddleware<DefaultSpanBackend> {
}
}
#[async_trait::async_trait]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
impl<ReqwestOtelSpan> Middleware for TracingMiddleware<ReqwestOtelSpan>
where
ReqwestOtelSpan: ReqwestOtelSpanBackend + Sync + Send + 'static,
@ -44,8 +45,6 @@ where
let request_span = ReqwestOtelSpan::on_request_start(&req, extensions);
let outcome_future = async {
// Adds tracing headers to the given request to propagate the OpenTelemetry context to downstream revivers of the request.
// Spans added by downstream consumers will be part of the same trace.
#[cfg(any(
feature = "opentelemetry_0_13",
feature = "opentelemetry_0_14",
@ -53,8 +52,17 @@ where
feature = "opentelemetry_0_16",
feature = "opentelemetry_0_17",
feature = "opentelemetry_0_18",
feature = "opentelemetry_0_19",
feature = "opentelemetry_0_20",
feature = "opentelemetry_0_21",
))]
let req = crate::otel::inject_opentelemetry_context_into_request(req);
let req = if !extensions.contains::<crate::DisableOtelPropagation>() {
// Adds tracing headers to the given request to propagate the OpenTelemetry context to downstream revivers of the request.
// Spans added by downstream consumers will be part of the same trace.
crate::otel::inject_opentelemetry_context_into_request(req)
} else {
req
};
// Run the request
let outcome = next.run(req, extensions).await;

View File

@ -21,6 +21,15 @@ use opentelemetry_0_17_pkg as opentelemetry;
#[cfg(feature = "opentelemetry_0_18")]
use opentelemetry_0_18_pkg as opentelemetry;
#[cfg(feature = "opentelemetry_0_19")]
use opentelemetry_0_19_pkg as opentelemetry;
#[cfg(feature = "opentelemetry_0_20")]
use opentelemetry_0_20_pkg as opentelemetry;
#[cfg(feature = "opentelemetry_0_21")]
use opentelemetry_0_21_pkg as opentelemetry;
#[cfg(feature = "opentelemetry_0_13")]
pub use tracing_opentelemetry_0_12_pkg as tracing_opentelemetry;
@ -39,6 +48,15 @@ pub use tracing_opentelemetry_0_17_pkg as tracing_opentelemetry;
#[cfg(feature = "opentelemetry_0_18")]
pub use tracing_opentelemetry_0_18_pkg as tracing_opentelemetry;
#[cfg(feature = "opentelemetry_0_19")]
pub use tracing_opentelemetry_0_19_pkg as tracing_opentelemetry;
#[cfg(feature = "opentelemetry_0_20")]
pub use tracing_opentelemetry_0_20_pkg as tracing_opentelemetry;
#[cfg(feature = "opentelemetry_0_21")]
pub use tracing_opentelemetry_0_22_pkg as tracing_opentelemetry;
use opentelemetry::global;
use opentelemetry::propagation::Injector;
use tracing_opentelemetry::OpenTelemetrySpanExt;
@ -80,10 +98,16 @@ impl<'a> Injector for RequestCarrier<'a> {
#[cfg(test)]
mod test {
use std::sync::OnceLock;
use super::*;
use crate::TracingMiddleware;
use crate::{DisableOtelPropagation, TracingMiddleware};
#[cfg(not(feature = "opentelemetry_0_21"))]
use opentelemetry::sdk::propagation::TraceContextPropagator;
use reqwest_middleware::ClientBuilder;
#[cfg(feature = "opentelemetry_0_21")]
use opentelemetry_sdk_0_21::propagation::TraceContextPropagator;
use reqwest::Response;
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware, Extension};
use tracing::{info_span, Instrument, Level};
#[cfg(any(
feature = "opentelemetry_0_13",
@ -99,17 +123,50 @@ mod test {
use tracing_subscriber_0_3::{filter, layer::SubscriberExt, Registry};
use wiremock::{matchers::any, Mock, MockServer, ResponseTemplate};
#[tokio::test]
async fn tracing_middleware_propagates_otel_data_even_when_the_span_is_disabled() {
let tracer = opentelemetry::sdk::export::trace::stdout::new_pipeline()
.with_writer(std::io::sink())
.install_simple();
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
let subscriber = Registry::default()
.with(filter::Targets::new().with_target("reqwest_tracing::otel::test", Level::DEBUG))
.with(telemetry);
tracing::subscriber::set_global_default(subscriber).unwrap();
global::set_text_map_propagator(TraceContextPropagator::new());
async fn make_echo_request_in_otel_context(client: ClientWithMiddleware) -> Response {
static TELEMETRY: OnceLock<()> = OnceLock::new();
TELEMETRY.get_or_init(|| {
#[cfg(all(
not(feature = "opentelemetry_0_20"),
not(feature = "opentelemetry_0_21")
))]
let tracer = opentelemetry::sdk::export::trace::stdout::new_pipeline()
.with_writer(std::io::sink())
.install_simple();
#[cfg(any(feature = "opentelemetry_0_20", feature = "opentelemetry_0_21"))]
let tracer = {
use opentelemetry::trace::TracerProvider;
#[cfg(feature = "opentelemetry_0_20")]
use opentelemetry_stdout_0_1::SpanExporterBuilder;
#[cfg(feature = "opentelemetry_0_21")]
use opentelemetry_stdout_0_2::SpanExporterBuilder;
let exporter = SpanExporterBuilder::default()
.with_writer(std::io::sink())
.build();
#[cfg(feature = "opentelemetry_0_20")]
let provider = opentelemetry::sdk::trace::TracerProvider::builder()
.with_simple_exporter(exporter)
.build();
#[cfg(feature = "opentelemetry_0_21")]
let provider = opentelemetry_sdk_0_21::trace::TracerProvider::builder()
.with_simple_exporter(exporter)
.build();
let tracer = provider.versioned_tracer("reqwest", None::<&str>, None::<&str>, None);
let _ = global::set_tracer_provider(provider);
tracer
};
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
let subscriber = Registry::default()
.with(
filter::Targets::new().with_target("reqwest_tracing::otel::test", Level::DEBUG),
)
.with(telemetry);
tracing::subscriber::set_global_default(subscriber).unwrap();
global::set_text_map_propagator(TraceContextPropagator::new());
});
// Mock server - sends all request headers back in the response
let server = MockServer::start().await;
@ -124,17 +181,40 @@ mod test {
.mount(&server)
.await;
let client = ClientBuilder::new(reqwest::Client::new())
.with(TracingMiddleware::default())
.build();
let resp = client
client
.get(server.uri())
.send()
.instrument(info_span!("some_span"))
.await
.unwrap();
.unwrap()
}
assert!(resp.headers().contains_key("traceparent"));
#[tokio::test]
async fn tracing_middleware_propagates_otel_data_even_when_the_span_is_disabled() {
let client = ClientBuilder::new(reqwest::Client::new())
.with(TracingMiddleware::default())
.build();
let resp = make_echo_request_in_otel_context(client).await;
assert!(
resp.headers().contains_key("traceparent"),
"by default, the tracing middleware will propagate otel contexts"
);
}
#[tokio::test]
async fn context_no_propagated() {
let client = ClientBuilder::new(reqwest::Client::new())
.with_init(Extension(DisableOtelPropagation))
.with(TracingMiddleware::default())
.build();
let resp = make_echo_request_in_otel_context(client).await;
assert!(
!resp.headers().contains_key("traceparent"),
"request should not contain traceparent if context propagation is disabled"
);
}
}

View File

@ -1,10 +1,11 @@
use std::borrow::Cow;
use matchit::Router;
use reqwest::header::{HeaderMap, HeaderValue};
use reqwest::{Request, Response, StatusCode as RequestStatusCode, Url};
use reqwest_middleware::{Error, Result};
use task_local_extensions::Extensions;
use tracing::Span;
use tracing::{warn, Span};
use crate::reqwest_otel_span;
@ -22,8 +23,8 @@ pub const NET_HOST_PORT: &str = "net.host.port";
pub const OTEL_KIND: &str = "otel.kind";
/// The `otel.name` field added to the span by [`reqwest_otel_span`]
pub const OTEL_NAME: &str = "otel.name";
/// The `otel.status_code.code` field added to the span by [`reqwest_otel_span`]
pub const OTEL_STATUS_CODE: &str = "http.status_code";
/// The `otel.status_code` field added to the span by [`reqwest_otel_span`]
pub const OTEL_STATUS_CODE: &str = "otel.status_code";
/// The `error.message` field added to the span by [`reqwest_otel_span`]
pub const ERROR_MESSAGE: &str = "error.message";
/// The `error.cause_chain` field added to the span by [`reqwest_otel_span`]
@ -60,12 +61,11 @@ pub fn default_on_request_end(span: &Span, outcome: &Result<Response>) {
#[inline]
pub fn default_on_request_success(span: &Span, response: &Response) {
let span_status = get_span_status(response.status());
let status_code = response.status().as_u16() as i64;
let user_agent = get_header_value("user_agent", response.headers());
if let Some(span_status) = span_status {
span.record(OTEL_STATUS_CODE, span_status);
}
span.record(HTTP_STATUS_CODE, status_code);
span.record(HTTP_STATUS_CODE, response.status().as_u16());
span.record(HTTP_USER_AGENT, user_agent.as_str());
}
@ -78,13 +78,30 @@ pub fn default_on_request_failure(span: &Span, e: &Error) {
span.record(ERROR_MESSAGE, error_message.as_str());
span.record(ERROR_CAUSE_CHAIN, error_cause_chain.as_str());
if let Error::Reqwest(e) = e {
span.record(
HTTP_STATUS_CODE,
e.status()
.map(|s| s.to_string())
.unwrap_or_else(|| "".to_string())
.as_str(),
);
if let Some(status) = e.status() {
span.record(HTTP_STATUS_CODE, status.as_u16());
}
}
}
/// Determine the name of the span that should be associated with this request.
///
/// This tries to be PII safe by default, not including any path information unless
/// specifically opted in using either [`OtelName`] or [`OtelPathNames`]
#[inline]
pub fn default_span_name<'a>(req: &'a Request, ext: &'a Extensions) -> Cow<'a, str> {
if let Some(name) = ext.get::<OtelName>() {
Cow::Borrowed(name.0.as_ref())
} else if let Some(path_names) = ext.get::<OtelPathNames>() {
path_names
.find(req.url().path())
.map(|path| Cow::Owned(format!("{} {}", req.method(), path)))
.unwrap_or_else(|| {
warn!("no OTEL path name found");
Cow::Owned(format!("{} UNKNOWN", req.method().as_str()))
})
} else {
Cow::Borrowed(req.method().as_str())
}
}
@ -96,10 +113,7 @@ pub struct DefaultSpanBackend;
impl ReqwestOtelSpanBackend for DefaultSpanBackend {
fn on_request_start(req: &Request, ext: &mut Extensions) -> Span {
let name = ext
.get::<OtelName>()
.map(|on| on.0.as_ref())
.unwrap_or("reqwest-http-client");
let name = default_span_name(req, ext);
reqwest_otel_span!(name = name, req)
}
@ -120,11 +134,7 @@ pub struct SpanBackendWithUrl;
impl ReqwestOtelSpanBackend for SpanBackendWithUrl {
fn on_request_start(req: &Request, ext: &mut Extensions) -> Span {
let name = ext
.get::<OtelName>()
.map(|on| on.0.as_ref())
.unwrap_or("reqwest-http-client");
let name = default_span_name(req, ext);
reqwest_otel_span!(name = name, req, http.url = %remove_credentials(req.url()))
}
@ -152,7 +162,7 @@ fn get_span_status(request_status: RequestStatusCode) -> Option<&'static str> {
}
/// [`OtelName`] allows customisation of the name of the spans created by
/// DefaultSpanBackend.
/// [`DefaultSpanBackend`] and [`SpanBackendWithUrl`].
///
/// Usage:
/// ```no_run
@ -184,6 +194,123 @@ fn get_span_status(request_status: RequestStatusCode) -> Option<&'static str> {
#[derive(Clone)]
pub struct OtelName(pub Cow<'static, str>);
/// [`OtelPathNames`] allows including templated paths in the spans created by
/// [`DefaultSpanBackend`] and [`SpanBackendWithUrl`].
///
/// When creating spans this can be used to try to match the path against some
/// known paths. If the path matches value returned is the templated path. This
/// can be used in span names as it will not contain values that would
/// increase the cardinality.
///
/// ```
/// /// # use reqwest_middleware::Result;
/// use reqwest_middleware::{ClientBuilder, Extension};
/// use reqwest_tracing::{
/// TracingMiddleware, OtelPathNames
/// };
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
/// let reqwest_client = reqwest::Client::builder().build()?;
/// let client = ClientBuilder::new(reqwest_client)
/// // Inserts the extension before the request is started
/// .with_init(Extension(OtelPathNames::known_paths(["/payment/:paymentId"])?))
/// // Makes use of that extension to specify the otel name
/// .with(TracingMiddleware::default())
/// .build();
///
/// let resp = client.get("https://truelayer.com/payment/id-123").send().await?;
///
/// // Or specify it on the individual request (will take priority)
/// let resp = client.post("https://api.truelayer.com/payment/id-123/authorization-flow")
/// .with_extension(OtelPathNames::known_paths(["/payment/:paymentId/authorization-flow"])?)
/// .send()
/// .await?;
/// # Ok(())
/// # }
/// ```
#[derive(Clone)]
pub struct OtelPathNames(matchit::Router<String>);
impl OtelPathNames {
/// Create a new [`OtelPathNames`] from a set of known paths.
///
/// Paths in this set will be found with `find`.
///
/// Paths can have different parameters:
/// - Named parameters like `:paymentId` match anything until the next `/` or the end of the path.
/// - Catch-all parameters start with `*` and match everything after the `/`. They must be at the end of the route.
/// ```
/// # use reqwest_tracing::OtelPathNames;
/// OtelPathNames::known_paths([
/// "/",
/// "/payment",
/// "/payment/:paymentId",
/// "/payment/:paymentId/*action",
/// ]).unwrap();
/// ```
pub fn known_paths<Paths, Path>(paths: Paths) -> anyhow::Result<Self>
where
Paths: IntoIterator<Item = Path>,
Path: Into<String>,
{
let mut router = Router::new();
for path in paths {
let path = path.into();
router.insert(path.clone(), path)?;
}
Ok(Self(router))
}
/// Find the templated path from the actual path.
///
/// Returns the templated path if a match is found.
///
/// ```
/// # use reqwest_tracing::OtelPathNames;
/// let path_names = OtelPathNames::known_paths(["/payment/:paymentId"]).unwrap();
/// let path = path_names.find("/payment/payment-id-123");
/// assert_eq!(path, Some("/payment/:paymentId"));
/// ```
pub fn find(&self, path: &str) -> Option<&str> {
self.0.at(path).map(|mtch| mtch.value.as_str()).ok()
}
}
/// `DisableOtelPropagation` disables opentelemetry header propagation, while still tracing the HTTP request.
///
/// By default, the [`TracingMiddleware`](super::TracingMiddleware) middleware will also propagate any opentelemtry
/// contexts to the server. For any external facing requests, this can be problematic and it should be disabled.
///
/// Usage:
/// ```no_run
/// # use reqwest_middleware::Result;
/// use reqwest_middleware::{ClientBuilder, Extension};
/// use reqwest_tracing::{
/// TracingMiddleware, DisableOtelPropagation
/// };
/// # async fn example() -> Result<()> {
/// let reqwest_client = reqwest::Client::builder().build().unwrap();
/// let client = ClientBuilder::new(reqwest_client)
/// // Inserts the extension before the request is started
/// .with_init(Extension(DisableOtelPropagation))
/// // Makes use of that extension to specify the otel name
/// .with(TracingMiddleware::default())
/// .build();
///
/// let resp = client.get("https://truelayer.com").send().await.unwrap();
///
/// // Or specify it on the individual request (will take priority)
/// let resp = client.post("https://api.truelayer.com/payment")
/// .with_extension(DisableOtelPropagation)
/// .send()
/// .await
/// .unwrap();
/// # Ok(())
/// # }
/// ```
#[derive(Clone)]
pub struct DisableOtelPropagation;
/// Removes the username and/or password parts of the url, if present.
fn remove_credentials(url: &Url) -> Cow<'_, str> {
if !url.username().is_empty() || url.password().is_some() {