Initial commit

This commit is contained in:
Rodrigo Gryzinski 2021-08-10 17:07:08 +01:00
commit 7fe55152ec
34 changed files with 1839 additions and 0 deletions

1
.github/CODEOWNERS vendored Normal file
View file

@ -0,0 +1 @@
* @TrueLayer/rust-oss

31
.github/ISSUE_TEMPLATE/bug_report.md vendored Normal file
View file

@ -0,0 +1,31 @@
---
name: Bug report
about: Create a report to help us improve
title: ''
labels: bug
assignees: ''
---
## Bug description
<!-- A clear and concise description of what the bug is. -->
## To Reproduce
<!-- Steps to reproduce the behavior. -->
## Expected behavior
<!-- A clear and concise description of what you expected to happen. -->
## Environment
<!-- Please fill the following information. -->
- OS: [e.g. Windows]
- Rust version [e.g. 1.51.0]
## Additional context
<!-- Add any other context about the problem here. -->

1
.github/ISSUE_TEMPLATE/config.yml vendored Normal file
View file

@ -0,0 +1 @@
blank_issues_enabled: true

View file

@ -0,0 +1,27 @@
---
name: Feature request
about: Suggest an idea for this project
title: ''
labels: enhancement
assignees: ''
---
## Motivations
<!--
If your feature request is related to a problem, please describe it.
Ex. I hate when [...]
-->
## Solution
<!-- Describe the solution you'd like. -->
## Alternatives
<!-- Describe any alternative solutions or features you've considered. -->
## Additional context
<!-- Add any other context or screenshots about the feature request here. -->

12
.github/PULL_REQUEST_TEMPLATE.md vendored Normal file
View file

@ -0,0 +1,12 @@
<!-- Please explain the changes you made -->
<!--
Please, make sure:
- you have read the contributing guidelines:
https://github.com/TrueLayer/reqwest-middleware/blob/main/docs/CONTRIBUTING.md
- you have formatted the code using rustfmt:
https://github.com/rust-lang/rustfmt
- you have checked that all tests pass, by running `cargo test --all`
- you have updated the changelog (if needed):
https://github.com/TrueLayer/reqwest-middleware/blob/main/CHANGELOG.md
-->

22
.github/workflows/audit.yml vendored Normal file
View file

@ -0,0 +1,22 @@
name: Security audit
on:
schedule:
# Runs at 00:00 UTC everyday
- cron: '0 0 * * *'
push:
paths:
- '**/Cargo.toml'
- '**/Cargo.lock'
pull_request:
jobs:
audit:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v2
- uses: actions-rs/audit-check@v1
with:
token: ${{ secrets.GITHUB_TOKEN }}

130
.github/workflows/ci.yml vendored Normal file
View file

@ -0,0 +1,130 @@
on:
push:
branches: [main]
pull_request:
name: CI # Continuous Integration
jobs:
test:
name: Test Suite
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v2
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: stable
profile: minimal
override: true
- uses: actions-rs/cargo@v1
with:
command: test
args: --workspace --all-targets --features opentelemetry_0_15
rustfmt:
name: Rustfmt
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v2
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: stable
profile: minimal
override: true
components: rustfmt
- name: Check formatting
uses: actions-rs/cargo@v1
with:
command: fmt
args: --all -- --check
clippy:
name: Clippy
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v2
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: stable
profile: minimal
override: true
components: clippy
- name: Clippy check
uses: actions-rs/cargo@v1
with:
command: clippy
args: --all-targets --features opentelemetry_0_15 --workspace -- -D warnings
docs:
name: Docs
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v2
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: stable
profile: minimal
override: true
- name: Check documentation
env:
RUSTDOCFLAGS: -D warnings
uses: actions-rs/cargo@v1
with:
command: doc
args: --no-deps --document-private-items --features opentelemetry_0_15 --workspace
publish-check:
name: Publish dry run
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
toolchain: stable
profile: minimal
override: true
- uses: actions-rs/cargo@v1
with:
command: publish
args: --dry-run --manifest-path reqwest-middleware/Cargo.toml
- uses: actions-rs/cargo@v1
with:
command: publish
args: --dry-run --manifest-path reqwest-retry/Cargo.toml
- uses: actions-rs/cargo@v1
with:
command: publish
args: --dry-run --manifest-path reqwest-tracing/Cargo.toml
coverage:
name: Code coverage
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v2
- name: Install stable toolchain
uses: actions-rs/toolchain@v1
with:
toolchain: stable
profile: minimal
override: true
- name: Run cargo-tarpaulin
uses: actions-rs/tarpaulin@v0.1
with:
args: '--ignore-tests --out Lcov'
- name: Upload to Coveralls
# upload only if push
if: ${{ github.event_name == 'push' }}
uses: coverallsapp/github-action@master
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
path-to-lcov: './lcov.info'

2
.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
/target
Cargo.lock

7
CHANGELOG.md Normal file
View file

@ -0,0 +1,7 @@
# Changelog
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]

3
CODE_OF_CONDUCT.md Normal file
View file

@ -0,0 +1,3 @@
# Code of Conduct
This project adheres to the Rust Code of Conduct, which [can be found online](https://www.rust-lang.org/conduct.html).

68
CONTRIBUTING.md Normal file
View file

@ -0,0 +1,68 @@
# Contribution guidelines
First off, thank you for considering contributing to reqwest-middleware.
If your contribution is not straightforward, please first discuss the change you
wish to make by creating a new issue before making the change.
## Reporting issues
Before reporting an issue on the
[issue tracker](https://github.com/TrueLayer/reqwest-middleware/issues),
please check that it has not already been reported by searching for some related
keywords.
## Pull requests
Try to do one pull request per change.
### Updating the changelog
Update the changes you have made in
[CHANGELOG](https://github.com/TrueLayer/reqwest-middleware/blob/master/CHANGELOG.md)
file under the **Unreleased** section.
Add the changes of your pull request to one of the following subsections,
depending on the types of changes defined by
[Keep a changelog](https://keepachangelog.com/en/1.0.0/):
- `Added` for new features.
- `Changed` for changes in existing functionality.
- `Deprecated` for soon-to-be removed features.
- `Removed` for now removed features.
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.
If the required subsection does not exist yet under **Unreleased**, create it!
## Developing
### Set up
This is no different than other Rust projects.
```shell
git clone https://github.com/TrueLayer/reqwest-middleware
cd reqwest-middleware
cargo test
```
### Useful Commands
- Run Clippy:
```shell
cargo clippy
```
- Check to see if there are code formatting issues
```shell
cargo fmt --all -- --check
```
- Format the code in the project
```shell
cargo fmt --all
```

6
Cargo.toml Normal file
View file

@ -0,0 +1,6 @@
[workspace]
members = [
"reqwest-middleware",
"reqwest-tracing",
"reqwest-retry",
]

176
LICENSE-APACHE Normal file
View file

@ -0,0 +1,176 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS

21
LICENSE-MIT Normal file
View file

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2021 TrueLayer
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

65
README.md Normal file
View file

@ -0,0 +1,65 @@
# reqwest-middleware
A crate implementing a wrapper around [reqwest](https://crates.io/crates/reqwest)
to allow for client middleware chains.
[![Crates.io](https://img.shields.io/crates/v/reqwest-middleware.svg)](https://crates.io/crates/reqwest-middleware)
[![Docs.rs](https://docs.rs/reqwest-middleware/badge.svg)](https://docs.rs/reqwest-middleware)
[![CI](https://github.com/TrueLayer/reqwest-middleware/workflows/CI/badge.svg)](https://github.com/TrueLayer/reqwest-middleware/actions)
[![Coverage Status](https://coveralls.io/repos/github/TrueLayer/reqwest-middleware/badge.svg?branch=main&t=YKhONc)](https://coveralls.io/github/TrueLayer/reqwest-middleware?branch=main)
## Overview
The `reqwest-middleware` client exposes the same interface as a plain `reqwest` client, but
`ClientBuilder` exposes functionality to attach middleware:
```rust
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use reqwest_retry::{RetryTransientMiddleware, policies::ExponentialBackoff};
use reqwest_tracing::TracingMiddleware;
#[tokio::main]
async fn main() {
    let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3);
    let client = ClientBuilder::new(reqwest::Client::new())
        .with(TracingMiddleware)
        .with(RetryTransientMiddleware::new_with_policy(retry_policy))
        .build();
    run(client).await;
}
async fn run(client: ClientWithMiddleware) {
    // free retries!
    client
        .get("https://some-external-service.com")
        .header("foo", "bar")
        .send()
        .await
        .unwrap();
}
```
## How to install
Add `reqwest-middleware` to your dependencies
```toml
[dependencies]
# ...
reqwest-middleware = "0.1.0"
```
#### License
<sup>
Licensed under either of <a href="LICENSE-APACHE">Apache License, Version
2.0</a> or <a href="LICENSE-MIT">MIT license</a> at your option.
</sup>
<br>
<sub>
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in the work by you, as defined in the Apache-2.0 license, shall be
dual licensed as above, without any additional terms or conditions.
</sub>

View file

@ -0,0 +1,25 @@
[package]
name = "reqwest-middleware"
version = "0.1.0"
authors = ["Rodrigo Gryzinski <rodrigo.gryzinski@truelayer.com>"]
edition = "2018"
description = "Wrapper around reqwest to allow for client middleware chains."
repository = "https://github.com/TrueLayer/reqwest-middleware"
license = "MIT OR Apache-2.0"
keywords = ["reqwest", "http", "middleware"]
categories = ["web-programming::http-client"]
readme = "../README.md"
[dependencies]
anyhow = "1"
async-trait = "0.1.51"
futures = "0.3"
http = "0.2"
reqwest = { version = "0.11", features = ["json", "multipart"] }
serde = "1"
thiserror = "1"
truelayer-extensions = "0.1"
[dev-dependencies]
wiremock = "0.5"
tokio = { version = "1", features = ["macros"] }

View file

@ -0,0 +1,248 @@
use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
use reqwest::multipart::Form;
use reqwest::{Body, Client, IntoUrl, Method, Request, Response};
use serde::Serialize;
use std::convert::TryFrom;
use std::fmt::Display;
use std::sync::Arc;
use std::time::Duration;
use truelayer_extensions::Extensions;
use crate::error::Result;
use crate::middleware::{Middleware, Next};
/// A `ClientBuilder` is used to build a [`ClientWithMiddleware`].
///
/// [`ClientWithMiddleware`]: struct.ClientWithMiddleware.html
pub struct ClientBuilder {
client: Client,
middleware_stack: Vec<Arc<dyn Middleware>>,
}
impl ClientBuilder {
pub fn new(client: Client) -> Self {
ClientBuilder {
client,
middleware_stack: Vec::new(),
}
}
/// Convenience method to attach middleware.
///
/// If you need to keep a reference to the middleware after attaching, use [`with_arc`].
///
/// [`with_arc`]: #method.with_arc
pub fn with<M>(self, middleware: M) -> Self
where
M: Middleware,
{
self.with_arc(Arc::new(middleware))
}
/// Add middleware to the chain. [`with`] is more ergonomic if you don't need the `Arc`.
///
/// [`with`]: #method.with
pub fn with_arc(mut self, middleware: Arc<dyn Middleware>) -> Self {
self.middleware_stack.push(middleware);
self
}
/// Returns a `ClientWithMiddleware` using this builder configuration.
pub fn build(self) -> ClientWithMiddleware {
ClientWithMiddleware::new(self.client, self.middleware_stack)
}
}
/// `ClientWithMiddleware` is a wrapper around [`reqwest::Client`] which runs middleware on every
/// request.
///
/// [`reqwest::Client`]: https://docs.rs/reqwest/0.10.8/reqwest/struct.Client.html
#[derive(Clone)]
pub struct ClientWithMiddleware {
inner: reqwest::Client,
middleware_stack: Box<[Arc<dyn Middleware>]>,
}
impl ClientWithMiddleware {
/// See [`ClientBuilder`] for a more ergonomic way to build `ClientWithMiddleware` instances.
///
/// [`ClientBuilder`]: struct.ClientBuilder.html
pub fn new<T>(client: Client, middleware_stack: T) -> Self
where
T: Into<Box<[Arc<dyn Middleware>]>>,
{
ClientWithMiddleware {
inner: client,
middleware_stack: middleware_stack.into(),
}
}
/// See [`Client::get`](https://docs.rs/reqwest/latest/reqwest/struct.Client.html#method.get)
pub fn get<U: IntoUrl>(&self, url: U) -> RequestBuilder {
self.request(Method::GET, url)
}
/// See [`Client::post`](https://docs.rs/reqwest/latest/reqwest/struct.Client.html#method.post)
pub fn post<U: IntoUrl>(&self, url: U) -> RequestBuilder {
self.request(Method::POST, url)
}
/// See [`Client::put`](https://docs.rs/reqwest/latest/reqwest/struct.Client.html#method.put)
pub fn put<U: IntoUrl>(&self, url: U) -> RequestBuilder {
self.request(Method::PUT, url)
}
/// See
/// [`Client::patch`](https://docs.rs/reqwest/latest/reqwest/struct.Client.html#method.patch)
pub fn patch<U: IntoUrl>(&self, url: U) -> RequestBuilder {
self.request(Method::PATCH, url)
}
/// See
/// [`Client::delete`](https://docs.rs/reqwest/latest/reqwest/struct.Client.html#method.delete)
pub fn delete<U: IntoUrl>(&self, url: U) -> RequestBuilder {
self.request(Method::DELETE, url)
}
/// See [`Client::head`](https://docs.rs/reqwest/latest/reqwest/struct.Client.html#method.head)
pub fn head<U: IntoUrl>(&self, url: U) -> RequestBuilder {
self.request(Method::HEAD, url)
}
/// See
/// [`Client::request`](https://docs.rs/reqwest/latest/reqwest/struct.Client.html#method.request)
pub fn request<U: IntoUrl>(&self, method: Method, url: U) -> RequestBuilder {
RequestBuilder {
inner: self.inner.request(method, url),
client: self.clone(),
}
}
/// See
/// [`Client::execute`](https://docs.rs/reqwest/latest/reqwest/struct.Client.html#method.execute)
pub async fn execute(&self, req: Request) -> Result<Response> {
let next = Next::new(&self.inner, &self.middleware_stack);
let mut ext = Extensions::new();
next.run(req, &mut ext).await
}
}
/// Create a `ClientWithMiddleware` without any middleware.
impl From<Client> for ClientWithMiddleware {
fn from(client: Client) -> Self {
ClientWithMiddleware {
inner: client,
middleware_stack: Box::new([]),
}
}
}
/// This is a wrapper around [`reqwest::RequestBuilder`] exposing the same API.
///
/// [`reqwest::RequestBuilder`]: https://docs.rs/reqwest/0.10.8/reqwest/struct.RequestBuilder.html
#[must_use = "RequestBuilder does nothing until you 'send' it"]
pub struct RequestBuilder {
inner: reqwest::RequestBuilder,
client: ClientWithMiddleware,
}
impl RequestBuilder {
pub fn header<K, V>(self, key: K, value: V) -> Self
where
HeaderName: TryFrom<K>,
<HeaderName as TryFrom<K>>::Error: Into<http::Error>,
HeaderValue: TryFrom<V>,
<HeaderValue as TryFrom<V>>::Error: Into<http::Error>,
{
RequestBuilder {
inner: self.inner.header(key, value),
client: self.client,
}
}
pub fn headers(self, headers: HeaderMap) -> Self {
RequestBuilder {
inner: self.inner.headers(headers),
client: self.client,
}
}
pub fn basic_auth<U, P>(self, username: U, password: Option<P>) -> Self
where
U: Display,
P: Display,
{
RequestBuilder {
inner: self.inner.basic_auth(username, password),
client: self.client,
}
}
pub fn bearer_auth<T>(self, token: T) -> Self
where
T: Display,
{
RequestBuilder {
inner: self.inner.bearer_auth(token),
client: self.client,
}
}
pub fn body<T: Into<Body>>(self, body: T) -> Self {
RequestBuilder {
inner: self.inner.body(body),
client: self.client,
}
}
pub fn timeout(self, timeout: Duration) -> Self {
RequestBuilder {
inner: self.inner.timeout(timeout),
client: self.client,
}
}
pub fn multipart(self, multipart: Form) -> Self {
RequestBuilder {
inner: self.inner.multipart(multipart),
client: self.client,
}
}
pub fn query<T: Serialize + ?Sized>(self, query: &T) -> Self {
RequestBuilder {
inner: self.inner.query(query),
client: self.client,
}
}
pub fn form<T: Serialize + ?Sized>(self, form: &T) -> Self {
RequestBuilder {
inner: self.inner.form(form),
client: self.client,
}
}
pub fn json<T: Serialize + ?Sized>(self, json: &T) -> Self {
RequestBuilder {
inner: self.inner.json(json),
client: self.client,
}
}
pub fn build(self) -> reqwest::Result<Request> {
self.inner.build()
}
pub async fn send(self) -> Result<Response> {
let req = self.inner.build()?;
self.client.execute(req).await
}
pub fn try_clone(self) -> Option<Self> {
let client = self.client;
self.inner
.try_clone()
.map(|inner| RequestBuilder { inner, client })
}
}

View file

@ -0,0 +1,22 @@
use thiserror::Error;
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Error, Debug)]
pub enum Error {
/// There was an error running some middleware
#[error("Middleware error: {0}")]
Middleware(#[from] anyhow::Error),
/// Error from the underlying reqwest client
#[error("Request error: {0}")]
Reqwest(#[from] reqwest::Error),
}
impl Error {
pub fn middleware<E>(err: E) -> Self
where
E: 'static + Send + Sync + std::error::Error,
{
Error::Middleware(err.into())
}
}

View file

@ -0,0 +1,51 @@
//! This crate provides [`ClientWithMiddleware`], a wrapper around [`reqwest::Client`] with the
//! ability to attach middleware which runs on every request.
//!
//! You'll want to instantiate [`ClientWithMiddleware`] using [`ClientBuilder`], then you can
//! attach your middleware using [`with`], finalize it with [`build`] and from then on sending
//! requests is the same as with reqwest:
//!
//! ```
//! use reqwest::{Client, Request, Response};
//! use reqwest_middleware::{ClientBuilder, Middleware, Next, Result};
//! use truelayer_extensions::Extensions;
//!
//! struct LoggingMiddleware;
//!
//! #[async_trait::async_trait]
//! impl Middleware for LoggingMiddleware {
//! async fn handle(
//! &self,
//! req: Request,
//! extensions: &mut Extensions,
//! next: Next<'_>,
//! ) -> Result<Response> {
//! println!("Request started {:?}", req);
//! let res = next.run(req, extensions).await;
//! println!("Result: {:?}", res);
//! res
//! }
//! }
//!
//! async fn run() {
//! let reqwest_client = Client::builder().build().unwrap();
//! let client = ClientBuilder::new(reqwest_client)
//! .with(LoggingMiddleware)
//! .build();
//! let resp = client.get("https://truelayer.com").send().await.unwrap();
//! println!("TrueLayer page HTML: {}", resp.text().await.unwrap());
//! }
//! ```
//!
//! [`build`]: struct.ClientBuilder.html#method.build
//! [`ClientBuilder`]: struct.ClientBuilder.html
//! [`ClientWithMiddleware`]: struct.ClientWithMiddleware.html
//! [`reqwest::Client`]: https://docs.rs/reqwest/0.10.8/reqwest/struct.Client.html
//! [`with`]: struct.ClientBuilder.html#method.with
mod client;
mod error;
mod middleware;
pub use client::{ClientBuilder, ClientWithMiddleware, RequestBuilder};
pub use error::{Error, Result};
pub use middleware::{Middleware, Next};

View file

@ -0,0 +1,100 @@
use futures::future::{BoxFuture, FutureExt, TryFutureExt};
use reqwest::{Client, Request, Response};
use std::sync::Arc;
use truelayer_extensions::Extensions;
use crate::error::{Error, Result};
/// When attached to a [`ClientWithMiddleware`] (generally using [`with`]), middleware is run
/// whenever the client issues a request, in the order it was attached.
///
/// # Example
///
/// ```
/// use reqwest::{Client, Request, Response};
/// use reqwest_middleware::{ClientBuilder, Middleware, Next, Result};
/// use truelayer_extensions::Extensions;
///
/// struct TransparentMiddleware;
///
/// #[async_trait::async_trait]
/// impl Middleware for TransparentMiddleware {
/// async fn handle(
/// &self,
/// req: Request,
/// extensions: &mut Extensions,
/// next: Next<'_>,
/// ) -> Result<Response> {
/// next.run(req, extensions).await
/// }
/// }
/// ```
///
/// [`ClientWithMiddleware`]: struct.ClientWithMiddleware.html
/// [`Extensions`]: TODO
/// [`with`]: struct.ClientBuilder.html#method.with
#[async_trait::async_trait]
pub trait Middleware: 'static + Send + Sync {
/// Invoked with a request before sending it. If you want to continue processing the request,
/// you should explicitly call `next.run(req, extensions)`.
///
/// If you need to forward data down the middleware stack, you can use the `extensions`
/// argument.
async fn handle(
&self,
req: Request,
extensions: &mut Extensions,
next: Next<'_>,
) -> Result<Response>;
}
#[async_trait::async_trait]
impl<F> Middleware for F
where
F: Send
+ Sync
+ 'static
+ for<'a> Fn(Request, &'a mut Extensions, Next<'a>) -> BoxFuture<'a, Result<Response>>,
{
async fn handle(
&self,
req: Request,
extensions: &mut Extensions,
next: Next<'_>,
) -> Result<Response> {
(self)(req, extensions, next).await
}
}
/// Next encapsulates the remaining middleware chain to run in [`Middleware::handle`]. You can
/// forward the request down the chain with [`run`].
///
/// [`Middleware::handle`]: trait.Middleware.html#tymethod.handle
/// [`run`]: #method.run
#[derive(Clone)]
pub struct Next<'a> {
client: &'a Client,
middlewares: &'a [Arc<dyn Middleware>],
}
impl<'a> Next<'a> {
pub(crate) fn new(client: &'a Client, middlewares: &'a [Arc<dyn Middleware>]) -> Self {
Next {
client,
middlewares,
}
}
pub fn run(
mut self,
req: Request,
extensions: &'a mut Extensions,
) -> BoxFuture<'a, Result<Response>> {
if let Some((current, rest)) = self.middlewares.split_first() {
self.middlewares = rest;
current.handle(req, extensions, self).boxed()
} else {
self.client.execute(req).map_err(Error::from).boxed()
}
}
}

2
reqwest-retry/.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
/target
Cargo.lock

View file

@ -0,0 +1,7 @@
# Changelog
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]

29
reqwest-retry/Cargo.toml Normal file
View file

@ -0,0 +1,29 @@
[package]
name = "reqwest-retry"
version = "0.1.0"
authors = ["Rodrigo Gryzinski <rodrigo.gryzinski@truelayer.com>"]
edition = "2018"
description = "Retry middleware for reqwest."
repository = "https://github.com/TrueLayer/reqwest-middleware"
license = "MIT OR Apache-2.0"
keywords = ["reqwest", "http", "middleware", "retry"]
categories = ["web-programming::http-client"]
[dependencies]
reqwest-middleware = { version = "0.1", path = "../reqwest-middleware" }
anyhow = "1"
async-trait = "0.1.51"
chrono = "0.4"
futures = "0.3"
http = "0.2"
retry-policies = "0.1"
reqwest = "0.11"
tokio = { version = "1.6", features = ["time"] }
tracing = "0.1.26"
truelayer-extensions = "0.1"
[dev-dependencies]
wiremock = "0.5"
tokio = { version = "1", features = ["macros"] }
paste = "1"

41
reqwest-retry/README.md Normal file
View file

@ -0,0 +1,41 @@
# reqwest-retry
Retry middleware implementation for
[`reqwest-middleware`](https://crates.io/crates/reqwest-middleware).
[![Crates.io](https://img.shields.io/crates/v/reqwest-retry.svg)](https://crates.io/crates/reqwest-retry)
[![Docs.rs](https://docs.rs/reqwest-retry/badge.svg)](https://docs.rs/reqwest-retry)
[![CI](https://github.com/TrueLayer/reqwest-middleware/workflows/CI/badge.svg)](https://github.com/TrueLayer/reqwest-middleware/actions)
[![Coverage Status](https://coveralls.io/repos/github/TrueLayer/reqwest-middleware/badge.svg?branch=main&t=UWgSpm)](https://coveralls.io/github/TrueLayer/reqwest-middleware?branch=main)
## Overview
Build `RetryTransientMiddleware` from a `RetryPolicy`, then attach it to a
`reqwest_middleware::ClientBuilder`.
[`retry-policies::policies`](https://crates.io/crates/retry-policies) is reexported under
`reqwest_retry::policies` for convenience.
## How to install
Add `reqwest-retry` to your dependencies
```toml
[dependencies]
# ...
reqwest-retry = "0.1.0"
```
#### License
<sup>
Licensed under either of <a href="LICENSE-APACHE">Apache License, Version
2.0</a> or <a href="LICENSE-MIT">MIT license</a> at your option.
</sup>
<br>
<sub>
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in the work by you, as defined in the Apache-2.0 license, shall be
dual licensed as above, without any additional terms or conditions.
</sub>

7
reqwest-retry/src/lib.rs Normal file
View file

@ -0,0 +1,7 @@
mod middleware;
mod retryable;
pub use retry_policies::policies;
pub use middleware::RetryTransientMiddleware;
pub use retryable::Retryable;

View file

@ -0,0 +1,139 @@
//! `RetryTransientMiddleware` implements retrying requests on transient errors.
use crate::retryable::Retryable;
use anyhow::anyhow;
use chrono::Utc;
use reqwest::{Request, Response};
use reqwest_middleware::{Error, Middleware, Next, Result};
use retry_policies::RetryPolicy;
use truelayer_extensions::Extensions;
/// We limit the number of retries to a maximum of `10` to avoid stack-overflow issues due to the recursion.
static MAXIMUM_NUMBER_OF_RETRIES: u32 = 10;
/// `RetryTransientMiddleware` offers retry logic for requests that fail in a transient manner
/// and can be safely executed again.
///
/// Currently, it allows setting a [RetryPolicy][retry_policies::RetryPolicy] algorithm for calculating the __wait_time__
/// between each request retry.
///
///```rust
/// use reqwest_middleware::ClientBuilder;
/// use reqwest_retry::{RetryTransientMiddleware, policies::ExponentialBackoff};
/// use reqwest::Client;
///
/// // We create a ExponentialBackoff retry policy which implements `RetryPolicy`.
/// let retry_policy = ExponentialBackoff {
/// /// How many times the policy will tell the middleware to retry the request.
/// max_n_retries: 3,
/// max_retry_interval: std::time::Duration::from_millis(30),
/// min_retry_interval: std::time::Duration::from_millis(100),
/// backoff_exponent: 2,
/// };
///
/// let retry_transient_middleware = RetryTransientMiddleware::new_with_policy(retry_policy);
/// let client = ClientBuilder::new(Client::new()).with(retry_transient_middleware).build();
///```
///
pub struct RetryTransientMiddleware<T: RetryPolicy + Send + Sync + 'static> {
retry_policy: T,
}
impl<T: RetryPolicy + Send + Sync> RetryTransientMiddleware<T> {
/// Construct `RetryTransientMiddleware` with a [retry_policy][retry_policies::RetryPolicy].
pub fn new_with_policy(retry_policy: T) -> Self {
Self { retry_policy }
}
}
#[async_trait::async_trait]
impl<T: RetryPolicy + Send + Sync> Middleware for RetryTransientMiddleware<T> {
async fn handle(
&self,
req: Request,
extensions: &mut Extensions,
next: Next<'_>,
) -> Result<Response> {
// TODO: Ideally we should create a new instance of the `Extensions` map to pass
// downstream. This will guard against previous retries poluting `Extensions`.
// That is, we only return what's populated in the typemap for the last retry attempt
// and copy those into the the `global` Extensions map.
self.execute_with_retry_recursive(req, next, extensions, 0)
.await
}
}
impl<T: RetryPolicy + Send + Sync> RetryTransientMiddleware<T> {
/// **RECURSIVE**.
///
/// SAFETY: The condition for termination is the number of retries
/// set on the `RetryOption` object which is capped to 10 therefore
/// we can know that this will not cause a overflow of the stack.
///
/// This function will try to execute the request, if it fails
/// with an error classified as transient it will call itself
/// to retry the request.
///
/// NOTE: This function is not async because calling an async function
/// recursively is not allowed.
///
fn execute_with_retry_recursive<'a>(
&'a self,
req: Request,
next: Next<'a>,
mut ext: &'a mut Extensions,
n_past_retries: u32,
) -> futures::future::BoxFuture<'a, Result<Response>> {
Box::pin(async move {
// Cloning the request object before-the-fact is not ideal..
// However, if the body of the request is not static, e.g of type `Bytes`,
// the Clone operation should be of constant complexity and not O(N)
// since the byte abstraction is a shared pointer over a buffer.
let duplicate_request = req.try_clone().ok_or_else(|| {
Error::Middleware(anyhow!(
"Request object is not clonable. Are you passing a streaming body?".to_string()
))
})?;
let cloned_next = next.clone();
let result = next.run(req, &mut ext).await;
// We classify the response which will return None if not
// errors were returned.
match Retryable::from_reqwest_response(&result) {
Some(retryable)
if retryable == Retryable::Transient
&& n_past_retries < MAXIMUM_NUMBER_OF_RETRIES =>
{
// If the response failed and the error type was transient
// we can safely try to retry the request.
let retry_decicion = self.retry_policy.should_retry(n_past_retries);
if let retry_policies::RetryDecision::Retry { execute_after } = retry_decicion {
let duration = (execute_after - Utc::now())
.to_std()
.map_err(Error::middleware)?;
// Sleep the requested amount before we try again.
tracing::warn!(
"Retry attempt #{}. Sleeping {:?} before the next attempt",
n_past_retries,
duration
);
tokio::time::sleep(duration).await;
self.execute_with_retry_recursive(
duplicate_request,
cloned_next,
ext,
n_past_retries + 1,
)
.await
} else {
result
}
}
Some(_) | None => result,
}
})
}
}

View file

@ -0,0 +1,68 @@
use http::StatusCode;
use reqwest_middleware::Error;
/// Classification of an error/status returned by request.
#[derive(PartialEq, Eq)]
pub enum Retryable {
/// The failure was due to something tha might resolve in the future.
Transient,
/// Unresolvable error.
Fatal,
}
impl Retryable {
/// Try to map a `reqwest` response into `Retryable`.
///
/// Returns `None` if the response object does not contain any errors.
///
pub fn from_reqwest_response(res: &Result<reqwest::Response, Error>) -> Option<Self> {
match res {
Ok(success) => {
let status = success.status();
if status.is_server_error() {
Some(Retryable::Transient)
} else if status.is_client_error()
&& status != StatusCode::REQUEST_TIMEOUT
&& status != StatusCode::TOO_MANY_REQUESTS
{
Some(Retryable::Fatal)
} else if status.is_success() {
None
} else if status == StatusCode::REQUEST_TIMEOUT
|| status == StatusCode::TOO_MANY_REQUESTS
{
Some(Retryable::Transient)
} else {
Some(Retryable::Fatal)
}
}
Err(error) => match error {
// If something fails in the middleware we're screwed.
Error::Middleware(_) => Some(Retryable::Fatal),
Error::Reqwest(error) => {
if error.is_timeout() || error.is_connect() {
Some(Retryable::Transient)
} else if error.is_body()
|| error.is_decode()
|| error.is_request()
|| error.is_builder()
|| error.is_redirect()
{
Some(Retryable::Fatal)
} else {
// We omit checking if error.is_status() since we check that already.
// However, if Response::error_for_status is used the status will still
// remain in the response object.
None
}
}
},
}
}
}
impl From<&reqwest::Error> for Retryable {
fn from(_status: &reqwest::Error) -> Retryable {
Retryable::Transient
}
}

View file

@ -0,0 +1,215 @@
use paste::paste;
use reqwest::Client;
use reqwest::StatusCode;
use reqwest_middleware::ClientBuilder;
use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
use std::sync::{
atomic::{AtomicU32, Ordering},
Arc,
};
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, Respond, ResponseTemplate};
pub struct RetryResponder(Arc<AtomicU32>, u32, u16);
impl RetryResponder {
fn new(retries: u32, status_code: u16) -> Self {
Self(Arc::new(AtomicU32::new(0)), retries, status_code)
}
}
impl Respond for RetryResponder {
fn respond(&self, _request: &wiremock::Request) -> ResponseTemplate {
let mut retries = self.0.load(Ordering::SeqCst);
retries += 1;
self.0.store(retries, Ordering::SeqCst);
if retries + 1 >= self.1 {
ResponseTemplate::new(200)
} else {
ResponseTemplate::new(self.2)
}
}
}
macro_rules! assert_retry_succeeds_inner {
($x:tt, $name:ident, $status:expr, $retry:tt, $exact:tt, $responder:expr) => {
#[tokio::test]
async fn $name() {
let server = MockServer::start().await;
let retry_amount: u32 = $retry;
Mock::given(method("GET"))
.and(path("/foo"))
.respond_with($responder)
.expect($exact)
.mount(&server)
.await;
let reqwest_client = Client::builder().build().unwrap();
let client = ClientBuilder::new(reqwest_client)
.with(RetryTransientMiddleware::new_with_policy(
ExponentialBackoff {
max_n_retries: retry_amount,
max_retry_interval: std::time::Duration::from_millis(30),
min_retry_interval: std::time::Duration::from_millis(100),
backoff_exponent: 2,
},
))
.build();
let resp = client
.get(&format!("{}/foo", server.uri()))
.send()
.await
.expect("call failed");
assert_eq!(resp.status(), $status);
}
};
}
macro_rules! assert_retry_succeeds {
($x:tt, $status:expr) => {
paste! {
assert_retry_succeeds_inner!($x, [<assert_retry_succeds_on_ $x>], $status, 3, 2, RetryResponder::new(3 as u32, $x));
}
};
}
macro_rules! assert_no_retry {
($x:tt, $status:expr) => {
paste! {
assert_retry_succeeds_inner!($x, [<assert_no_retry_on_ $x>], $status, 1, 1, ResponseTemplate::new($x));
}
};
}
// 2xx.
assert_no_retry!(200, StatusCode::OK);
assert_no_retry!(201, StatusCode::CREATED);
assert_no_retry!(202, StatusCode::ACCEPTED);
assert_no_retry!(203, StatusCode::NON_AUTHORITATIVE_INFORMATION);
assert_no_retry!(204, StatusCode::NO_CONTENT);
assert_no_retry!(205, StatusCode::RESET_CONTENT);
assert_no_retry!(206, StatusCode::PARTIAL_CONTENT);
assert_no_retry!(207, StatusCode::MULTI_STATUS);
assert_no_retry!(226, StatusCode::IM_USED);
// 3xx.
assert_no_retry!(300, StatusCode::MULTIPLE_CHOICES);
assert_no_retry!(301, StatusCode::MOVED_PERMANENTLY);
assert_no_retry!(302, StatusCode::FOUND);
assert_no_retry!(303, StatusCode::SEE_OTHER);
assert_no_retry!(304, StatusCode::NOT_MODIFIED);
assert_no_retry!(307, StatusCode::TEMPORARY_REDIRECT);
assert_no_retry!(308, StatusCode::PERMANENT_REDIRECT);
// 5xx.
assert_retry_succeeds!(500, StatusCode::OK);
assert_retry_succeeds!(501, StatusCode::OK);
assert_retry_succeeds!(502, StatusCode::OK);
assert_retry_succeeds!(503, StatusCode::OK);
assert_retry_succeeds!(504, StatusCode::OK);
assert_retry_succeeds!(505, StatusCode::OK);
assert_retry_succeeds!(506, StatusCode::OK);
assert_retry_succeeds!(507, StatusCode::OK);
assert_retry_succeeds!(508, StatusCode::OK);
assert_retry_succeeds!(510, StatusCode::OK);
assert_retry_succeeds!(511, StatusCode::OK);
// 4xx.
assert_no_retry!(400, StatusCode::BAD_REQUEST);
assert_no_retry!(401, StatusCode::UNAUTHORIZED);
assert_no_retry!(402, StatusCode::PAYMENT_REQUIRED);
assert_no_retry!(403, StatusCode::FORBIDDEN);
assert_no_retry!(404, StatusCode::NOT_FOUND);
assert_no_retry!(405, StatusCode::METHOD_NOT_ALLOWED);
assert_no_retry!(406, StatusCode::NOT_ACCEPTABLE);
assert_no_retry!(407, StatusCode::PROXY_AUTHENTICATION_REQUIRED);
assert_retry_succeeds!(408, StatusCode::OK);
assert_no_retry!(409, StatusCode::CONFLICT);
assert_no_retry!(410, StatusCode::GONE);
assert_no_retry!(411, StatusCode::LENGTH_REQUIRED);
assert_no_retry!(412, StatusCode::PRECONDITION_FAILED);
assert_no_retry!(413, StatusCode::PAYLOAD_TOO_LARGE);
assert_no_retry!(414, StatusCode::URI_TOO_LONG);
assert_no_retry!(415, StatusCode::UNSUPPORTED_MEDIA_TYPE);
assert_no_retry!(416, StatusCode::RANGE_NOT_SATISFIABLE);
assert_no_retry!(417, StatusCode::EXPECTATION_FAILED);
assert_no_retry!(418, StatusCode::IM_A_TEAPOT);
assert_no_retry!(421, StatusCode::MISDIRECTED_REQUEST);
assert_no_retry!(422, StatusCode::UNPROCESSABLE_ENTITY);
assert_no_retry!(423, StatusCode::LOCKED);
assert_no_retry!(424, StatusCode::FAILED_DEPENDENCY);
assert_no_retry!(426, StatusCode::UPGRADE_REQUIRED);
assert_no_retry!(428, StatusCode::PRECONDITION_REQUIRED);
assert_retry_succeeds!(429, StatusCode::OK);
assert_no_retry!(431, StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE);
assert_no_retry!(451, StatusCode::UNAVAILABLE_FOR_LEGAL_REASONS);
// We assert that we cap retries at 10, which means that we will
// get 11 calls to the RetryResponder.
assert_retry_succeeds_inner!(
500,
assert_maximum_retries_is_not_exceeded,
StatusCode::INTERNAL_SERVER_ERROR,
100,
11,
RetryResponder::new(100_u32, 500)
);
pub struct RetryTimeoutResponder(Arc<AtomicU32>, u32, std::time::Duration);
impl RetryTimeoutResponder {
fn new(retries: u32, initial_timeout: std::time::Duration) -> Self {
Self(Arc::new(AtomicU32::new(0)), retries, initial_timeout)
}
}
impl Respond for RetryTimeoutResponder {
fn respond(&self, _request: &wiremock::Request) -> ResponseTemplate {
let mut retries = self.0.load(Ordering::SeqCst);
retries += 1;
self.0.store(retries, Ordering::SeqCst);
if retries + 1 >= self.1 {
ResponseTemplate::new(200)
} else {
ResponseTemplate::new(500).set_delay(self.2)
}
}
}
#[tokio::test]
async fn assert_retry_on_request_timeout() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/foo"))
.respond_with(RetryTimeoutResponder::new(
3,
std::time::Duration::from_millis(1000),
))
.expect(2)
.mount(&server)
.await;
let reqwest_client = Client::builder().build().unwrap();
let client = ClientBuilder::new(reqwest_client)
.with(RetryTransientMiddleware::new_with_policy(
ExponentialBackoff {
max_n_retries: 3,
max_retry_interval: std::time::Duration::from_millis(30),
min_retry_interval: std::time::Duration::from_millis(100),
backoff_exponent: 2,
},
))
.build();
let resp = client
.get(&format!("{}/foo", server.uri()))
.timeout(std::time::Duration::from_millis(10))
.send()
.await
.expect("call failed");
assert_eq!(resp.status(), 200);
}

View file

@ -0,0 +1,7 @@
# Changelog
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]

View file

@ -0,0 +1,35 @@
[package]
name = "reqwest-tracing"
version = "0.1.0"
authors = ["Rodrigo Gryzinski <rodrigo.gryzinski@truelayer.com>"]
edition = "2018"
description = "Opentracing middleware for reqwest."
repository = "https://github.com/TrueLayer/reqwest-middleware"
license = "MIT OR Apache-2.0"
keywords = ["reqwest", "http", "middleware", "opentracing", "tracing"]
categories = ["web-programming::http-client"]
[features]
opentelemetry_0_13 = ["opentelemetry_0_13_pkg", "tracing-opentelemetry_0_12_pkg"]
opentelemetry_0_14 = ["opentelemetry_0_14_pkg", "tracing-opentelemetry_0_13_pkg"]
opentelemetry_0_15 = ["opentelemetry_0_15_pkg", "tracing-opentelemetry_0_14_pkg"]
[dependencies]
reqwest-middleware = { version = "0.1", path = "../reqwest-middleware" }
async-trait = "0.1.51"
reqwest = "0.11"
tokio = { version = "1.6", features = ["time"] }
tracing = "0.1.26"
truelayer-extensions = "0.1"
opentelemetry_0_13_pkg = { package = "opentelemetry", version = "0.13", optional = true }
opentelemetry_0_14_pkg = { package = "opentelemetry", version = "0.14", optional = true }
opentelemetry_0_15_pkg = { package = "opentelemetry", version = "0.15", optional = true }
tracing-opentelemetry_0_12_pkg = { package = "tracing-opentelemetry",version = "0.12", optional = true }
tracing-opentelemetry_0_13_pkg = { package = "tracing-opentelemetry", version = "0.13", optional = true }
tracing-opentelemetry_0_14_pkg = { package = "tracing-opentelemetry",version = "0.14", optional = true }
[dev-dependencies]
wiremock = "0.5"
tokio = { version = "1", features = ["macros"] }

71
reqwest-tracing/README.md Normal file
View file

@ -0,0 +1,71 @@
# reqwest-tracing
Opentracing middleware implementation for
[`reqwest-middleware`](https://crates.io/crates/reqwest-middleware).
[![Crates.io](https://img.shields.io/crates/v/reqwest-tracing.svg)](https://crates.io/crates/reqwest-tracing)
[![Docs.rs](https://docs.rs/reqwest-tracing/badge.svg)](https://docs.rs/reqwest-tracing)
[![CI](https://github.com/TrueLayer/reqwest-middleware/workflows/CI/badge.svg)](https://github.com/TrueLayer/reqwest-middleware/actions)
[![Coverage Status](https://coveralls.io/repos/github/TrueLayer/reqwest-middleware/badge.svg?branch=main&t=UWgSpm)](https://coveralls.io/github/TrueLayer/reqwest-middleware?branch=main)
## Overview
Attach `TracingMiddleware` to your client to automatically trace HTTP requests:
```rust
use opentelemetry::exporter::trace::stdout;
use reqwest_middleware::ClientBuilder;
use reqwest_tracing::TracingMiddleware;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::Registry;
#[tokio::main]
async fn main() {
let (tracer, _) = stdout::new_pipeline().install();
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
let subscriber = Registry::default().with(telemetry);
tracing::subscriber::set_global_default(subscriber).unwrap();
run().await;
}
async fun run() {
let client = ClientBuilder::new(reqwest::Client::new())
.with(TracingMiddleware)
.build();`
client.get("https://truelayer.com").send().await.unwrap();
}
```
See the [`tracing`](https://crates.io/crates/tracing) crate for more information on how to set up a
tracing subscriber to make use of the spans.
## How to install
Add `reqwest-tracing` to your dependencies. Optionally enable opentelemetry integration by enabling
an opentelemetry version feature:
```toml
[dependencies]
# ...
reqwest-tracing = { version = "0.1.0", features = ["opentelemetry_0_15"] }
```
Available opentelemetry features are `opentelemetry_0_15`, `opentelemetry_0_14` and
`opentelemetry_0_13`.
#### License
<sup>
Licensed under either of <a href="LICENSE-APACHE">Apache License, Version
2.0</a> or <a href="LICENSE-MIT">MIT license</a> at your option.
</sup>
<br>
<sub>
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in the work by you, as defined in the Apache-2.0 license, shall be
dual licensed as above, without any additional terms or conditions.
</sub>

View file

@ -0,0 +1,9 @@
mod middleware;
#[cfg(any(
feature = "opentelemetry_0_13",
feature = "opentelemetry_0_14",
feature = "opentelemetry_0_15"
))]
mod otel;
pub use middleware::TracingMiddleware;

View file

@ -0,0 +1,129 @@
use reqwest::header::{HeaderMap, HeaderValue};
use reqwest::{Request, Response, StatusCode as RequestStatusCode};
use reqwest_middleware::{Error, Middleware, Next, Result};
use truelayer_extensions::Extensions;
/// Middleware for tracing requests using the current Opentelemetry Context.
pub struct TracingMiddleware;
#[async_trait::async_trait]
impl Middleware for TracingMiddleware {
async fn handle(
&self,
req: Request,
extensions: &mut Extensions,
next: Next<'_>,
) -> Result<Response> {
let request_span = {
let method = req.method();
let scheme = req.url().scheme();
let host = req.url().host_str().unwrap_or("");
let host_port = req.url().port().unwrap_or(0) as i64;
let path = req.url().path();
let otel_name = format!("{} {}", method, path);
tracing::info_span!(
"HTTP request",
http.method = %method,
http.scheme = %scheme,
http.host = %host,
net.host.port = %host_port,
otel.kind = "client",
otel.name = %otel_name,
otel.status_code = tracing::field::Empty,
http.user_agent = tracing::field::Empty,
http.status_code = tracing::field::Empty,
error.message = tracing::field::Empty,
error.cause_chain = tracing::field::Empty,
)
};
// Adds tracing headers to the given request to propagate the OpenTracing context to downstream revivers of the request.
// Spans added by downstream consumers will be part of the same trace.
#[cfg(any(
feature = "opentelemetry_0_13",
feature = "opentelemetry_0_14",
feature = "opentelemetry_0_15"
))]
let req = crate::otel::inject_opentracing_context_into_request(&request_span, req);
// Run the request
let outcome = next.run(req, extensions).await;
match &outcome {
Ok(response) => {
// The request ran successfully
let span_status = get_span_status(response.status());
let status_code = response.status().as_u16() as i64;
let user_agent = get_header_value("user_agent", response.headers());
if let Some(span_status) = span_status {
request_span.record("otel.status_code", &span_status);
}
request_span.record("http.status_code", &status_code);
request_span.record("http.user_agent", &user_agent.as_str());
}
Err(e) => {
// The request didn't run successfully
let error_message = e.to_string();
let error_cause_chain = format!("{:?}", e);
request_span.record("otel.status_code", &"ERROR");
request_span.record("error.message", &error_message.as_str());
request_span.record("error.cause_chain", &error_cause_chain.as_str());
if let Error::Reqwest(e) = e {
request_span.record(
"http.status_code",
&e.status()
.map(|s| s.to_string())
.unwrap_or_else(|| "".to_string())
.as_str(),
);
}
}
}
outcome
}
}
fn get_header_value(key: &str, headers: &HeaderMap) -> String {
let header_default = &HeaderValue::from_static("");
format!("{:?}", headers.get(key).unwrap_or(header_default)).replace("\"", "")
}
/// HTTP Mapping <https://github.com/open-telemetry/opentelemetry-specification/blob/c4b7f4307de79009c97b3a98563e91fee39b7ba3/work_in_progress/opencensus/HTTP.md#status>
// | HTTP code | Span status code |
// |-------------------------|-----------------------|
// | 100...299 | `Ok` |
// | 3xx redirect codes | `DeadlineExceeded` in case of loop (see above) [1], otherwise `Ok` |
// | 401 Unauthorized ⚠ | `Unauthenticated` ⚠ (Unauthorized actually means unauthenticated according to [RFC 7235][rfc-unauthorized]) |
// | 403 Forbidden | `PermissionDenied` |
// | 404 Not Found | `NotFound` |
// | 429 Too Many Requests | `ResourceExhausted` |
// | Other 4xx code | `InvalidArgument` [1] |
// | 501 Not Implemented | `Unimplemented` |
// | 503 Service Unavailable | `Unavailable` |
// | 504 Gateway Timeout | `DeadlineExceeded` |
// | Other 5xx code | `InternalError` [1] |
// | Any status code the client fails to interpret (e.g., 093 or 573) | `UnknownError` |
///
/// Maps the the http status to an Opentelemetry span status following the the specified convention above.
fn get_span_status(request_status: RequestStatusCode) -> Option<&'static str> {
match request_status.as_u16() {
100..=399 => Some("OK"),
400..=599 => Some("ERROR"),
_ => None,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn get_header_value_for_span_attribute() {
let expect = "IMPORTANT_HEADER";
let mut header_map = HeaderMap::new();
header_map.insert("test", expect.parse().unwrap());
let value = get_header_value("test", &header_map);
assert_eq!(value, expect);
}
}

View file

@ -0,0 +1,62 @@
use reqwest::header::{HeaderName, HeaderValue};
use reqwest::Request;
use std::str::FromStr;
use tracing::Span;
#[cfg(feature = "opentelemetry_0_13")]
use opentelemetry_0_13_pkg as opentelemetry;
#[cfg(feature = "opentelemetry_0_14")]
use opentelemetry_0_14_pkg as opentelemetry;
#[cfg(feature = "opentelemetry_0_15")]
use opentelemetry_0_15_pkg as opentelemetry;
#[cfg(feature = "opentelemetry_0_13")]
pub use tracing_opentelemetry_0_12_pkg as tracing_opentelemetry;
#[cfg(feature = "opentelemetry_0_14")]
pub use tracing_opentelemetry_0_13_pkg as tracing_opentelemetry;
#[cfg(feature = "opentelemetry_0_15")]
pub use tracing_opentelemetry_0_14_pkg as tracing_opentelemetry;
use opentelemetry::global;
use opentelemetry::propagation::Injector;
use tracing_opentelemetry::OpenTelemetrySpanExt;
/// Injects the given Opentelemetry Context into a reqwest::Request headers to allow propagation downstream.
pub fn inject_opentracing_context_into_request(span: &Span, request: Request) -> Request {
let context = span.context();
let mut request = request;
global::get_text_map_propagator(|injector| {
injector.inject_context(&context, &mut RequestCarrier::new(&mut request))
});
request
}
// "traceparent" => https://www.w3.org/TR/trace-context/#trace-context-http-headers-format
/// Injector used via opentelemetry propagator to tell the extractor how to insert the "traceparent" header value
/// This will allow the propagator to inject opentelemetry context into a standard data structure. Will basically
/// insert a "traceparent" string value "{version}-{trace_id}-{span_id}-{trace-flags}" of the spans context into the headers.
/// Listeners can then re-hydrate the context to add additional spans to the same trace.
struct RequestCarrier<'a> {
request: &'a mut Request,
}
impl<'a> RequestCarrier<'a> {
pub fn new(request: &'a mut Request) -> Self {
RequestCarrier { request }
}
}
impl<'a> Injector for RequestCarrier<'a> {
fn set(&mut self, key: &str, value: String) {
let header_name = HeaderName::from_str(key).expect("Must be header name");
let header_value = HeaderValue::from_str(&value).expect("Must be a header value");
self.request.headers_mut().insert(header_name, header_value);
}
}