feat: wasm32-unknown-unknown support (#79)

* feat: wasm32-unknown-unknown support

This replaces task-local-extensions with http's extensions, as http was
already in the dependency closure anyway and the other features of
task-local-extensions (that required an incompatible-with-wasm part of
tokio) were not used anyway.

* feat: have ci check that wasm32-unknown-unknown keeps compiling

* revert back to task-local-extensions

* fix ci

* fix random on wasm

* fix ci again

* bump

---------

Co-authored-by: Conrad Ludgate <conrad.ludgate@truelayer.com>
This commit is contained in:
Léo Gaspard 2023-03-09 12:33:13 +01:00 committed by GitHub
parent f854725791
commit fef18b3506
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 100 additions and 62 deletions

View file

@ -32,6 +32,33 @@ jobs:
command: test command: test
args: --workspace --all-targets --features ${{ matrix.otel_version }} args: --workspace --all-targets --features ${{ matrix.otel_version }}
wasm:
name: Compiles for the browser
runs-on: ubuntu-latest
strategy:
matrix:
otel_version:
- opentelemetry_0_13
- opentelemetry_0_14
- opentelemetry_0_15
- opentelemetry_0_16
- opentelemetry_0_17
- opentelemetry_0_18
steps:
- name: Checkout repository
uses: actions/checkout@v2
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
target: wasm32-unknown-unknown
toolchain: stable
profile: minimal
override: true
- uses: actions-rs/cargo@v1
with:
command: build
args: --workspace --features ${{ matrix.otel_version }} --target wasm32-unknown-unknown
rustfmt: rustfmt:
name: Rustfmt name: Rustfmt
runs-on: ubuntu-latest runs-on: ubuntu-latest
@ -131,27 +158,3 @@ jobs:
with: with:
command: publish command: publish
args: --dry-run --manifest-path reqwest-tracing/Cargo.toml args: --dry-run --manifest-path reqwest-tracing/Cargo.toml
coverage:
name: Code coverage
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v2
- name: Install stable toolchain
uses: actions-rs/toolchain@v1
with:
toolchain: stable
profile: minimal
override: true
- name: Run cargo-tarpaulin
uses: actions-rs/tarpaulin@v0.1
with:
args: '--ignore-tests --out Lcov'
- name: Upload to Coveralls
# upload only if push
if: ${{ github.event_name == 'push' }}
uses: coverallsapp/github-action@master
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
path-to-lcov: './lcov.info'

View file

@ -6,6 +6,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
## [0.2.1] - 2023-03-09
### Added
- Support for `wasm32-unknown-unknown`
## [0.2.0] - 2022-11-15 ## [0.2.0] - 2022-11-15
### Changed ### Changed

View file

@ -1,6 +1,6 @@
[package] [package]
name = "reqwest-middleware" name = "reqwest-middleware"
version = "0.2.0" version = "0.2.1"
authors = ["Rodrigo Gryzinski <rodrigo.gryzinski@truelayer.com>"] authors = ["Rodrigo Gryzinski <rodrigo.gryzinski@truelayer.com>"]
edition = "2018" edition = "2018"
description = "Wrapper around reqwest to allow for client middleware chains." description = "Wrapper around reqwest to allow for client middleware chains."
@ -16,7 +16,7 @@ async-trait = "0.1.51"
http = "0.2" http = "0.2"
reqwest = { version = "0.11", default-features = false, features = ["json", "multipart"] } reqwest = { version = "0.11", default-features = false, features = ["json", "multipart"] }
serde = "1" serde = "1"
task-local-extensions = "0.1.1" task-local-extensions = "0.1.4"
thiserror = "1" thiserror = "1"
[dev-dependencies] [dev-dependencies]

View file

@ -5,7 +5,6 @@ use serde::Serialize;
use std::convert::TryFrom; use std::convert::TryFrom;
use std::fmt::{self, Display}; use std::fmt::{self, Display};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use task_local_extensions::Extensions; use task_local_extensions::Extensions;
use crate::error::Result; use crate::error::Result;
@ -239,7 +238,8 @@ impl RequestBuilder {
} }
} }
pub fn timeout(self, timeout: Duration) -> Self { #[cfg(not(target_arch = "wasm32"))]
pub fn timeout(self, timeout: std::time::Duration) -> Self {
RequestBuilder { RequestBuilder {
inner: self.inner.timeout(timeout), inner: self.inner.timeout(timeout),
..self ..self

View file

@ -31,7 +31,8 @@ use crate::error::{Error, Result};
/// ///
/// [`ClientWithMiddleware`]: crate::ClientWithMiddleware /// [`ClientWithMiddleware`]: crate::ClientWithMiddleware
/// [`with`]: crate::ClientBuilder::with /// [`with`]: crate::ClientBuilder::with
#[async_trait::async_trait] #[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
pub trait Middleware: 'static + Send + Sync { pub trait Middleware: 'static + Send + Sync {
/// Invoked with a request before sending it. If you want to continue processing the request, /// Invoked with a request before sending it. If you want to continue processing the request,
/// you should explicitly call `next.run(req, extensions)`. /// you should explicitly call `next.run(req, extensions)`.
@ -46,7 +47,8 @@ pub trait Middleware: 'static + Send + Sync {
) -> Result<Response>; ) -> Result<Response>;
} }
#[async_trait::async_trait] #[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
impl<F> Middleware for F impl<F> Middleware for F
where where
F: Send F: Send
@ -75,7 +77,10 @@ pub struct Next<'a> {
middlewares: &'a [Arc<dyn Middleware>], middlewares: &'a [Arc<dyn Middleware>],
} }
#[cfg(not(target_arch = "wasm32"))]
pub type BoxFuture<'a, T> = std::pin::Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>; pub type BoxFuture<'a, T> = std::pin::Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;
#[cfg(target_arch = "wasm32")]
pub type BoxFuture<'a, T> = std::pin::Pin<Box<dyn std::future::Future<Output = T> + 'a>>;
impl<'a> Next<'a> { impl<'a> Next<'a> {
pub(crate) fn new(client: &'a Client, middlewares: &'a [Arc<dyn Middleware>]) -> Self { pub(crate) fn new(client: &'a Client, middlewares: &'a [Arc<dyn Middleware>]) -> Self {

View file

@ -1,6 +1,6 @@
[package] [package]
name = "reqwest-retry" name = "reqwest-retry"
version = "0.2.1" version = "0.2.2"
authors = ["Rodrigo Gryzinski <rodrigo.gryzinski@truelayer.com>"] authors = ["Rodrigo Gryzinski <rodrigo.gryzinski@truelayer.com>"]
edition = "2018" edition = "2018"
description = "Retry middleware for reqwest." description = "Retry middleware for reqwest."
@ -17,16 +17,22 @@ async-trait = "0.1.51"
chrono = { version = "0.4.19", features = ["clock"], default-features = false } chrono = { version = "0.4.19", features = ["clock"], default-features = false }
futures = "0.3" futures = "0.3"
http = "0.2" http = "0.2"
hyper = "0.14"
reqwest = { version = "0.11", default-features = false } reqwest = { version = "0.11", default-features = false }
retry-policies = "0.1" retry-policies = "0.1"
task-local-extensions = "0.1.1" task-local-extensions = "0.1.4"
tokio = { version = "1.6", features = ["time"] }
tracing = "0.1.26" tracing = "0.1.26"
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
hyper = "0.14"
tokio = { version = "1.6", features = ["time"] }
[target.'cfg(target_arch = "wasm32")'.dependencies]
parking_lot = { version = "0.11.2", features = ["wasm-bindgen"] } # work around https://github.com/tomaka/wasm-timer/issues/14
wasm-timer = "0.2.5"
getrandom = { version = "0.2", features = ["js"] }
[dev-dependencies] [dev-dependencies]
async-std = { version = "1.10"}
paste = "1" paste = "1"
tokio = { version = "1", features = ["macros"] } tokio = { version = "1", features = ["full"] }
wiremock = "0.5" wiremock = "0.5"
futures = "0.3" futures = "0.3"

View file

@ -58,7 +58,8 @@ impl<T: RetryPolicy + Send + Sync> RetryTransientMiddleware<T> {
} }
} }
#[async_trait::async_trait] #[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
impl<T: RetryPolicy + Send + Sync> Middleware for RetryTransientMiddleware<T> { impl<T: RetryPolicy + Send + Sync> Middleware for RetryTransientMiddleware<T> {
async fn handle( async fn handle(
&self, &self,
@ -118,7 +119,12 @@ impl<T: RetryPolicy + Send + Sync> RetryTransientMiddleware<T> {
n_past_retries, n_past_retries,
duration duration
); );
#[cfg(not(target_arch = "wasm32"))]
tokio::time::sleep(duration).await; tokio::time::sleep(duration).await;
#[cfg(target_arch = "wasm32")]
wasm_timer::Delay::new(duration)
.await
.expect("failed sleeping");
n_past_retries += 1; n_past_retries += 1;
continue; continue;

View file

@ -1,5 +1,3 @@
use std::io;
use http::StatusCode; use http::StatusCode;
use reqwest_middleware::Error; use reqwest_middleware::Error;
@ -42,7 +40,11 @@ impl Retryable {
// If something fails in the middleware we're screwed. // If something fails in the middleware we're screwed.
Error::Middleware(_) => Some(Retryable::Fatal), Error::Middleware(_) => Some(Retryable::Fatal),
Error::Reqwest(error) => { Error::Reqwest(error) => {
if error.is_timeout() || error.is_connect() { #[cfg(not(target_arch = "wasm32"))]
let is_connect = error.is_connect();
#[cfg(target_arch = "wasm32")]
let is_connect = false;
if error.is_timeout() || is_connect {
Some(Retryable::Transient) Some(Retryable::Transient)
} else if error.is_body() } else if error.is_body()
|| error.is_decode() || error.is_decode()
@ -53,6 +55,7 @@ impl Retryable {
} else if error.is_request() { } else if error.is_request() {
// It seems that hyper::Error(IncompleteMessage) is not correctly handled by reqwest. // It seems that hyper::Error(IncompleteMessage) is not correctly handled by reqwest.
// Here we check if the Reqwest error was originated by hyper and map it consistently. // Here we check if the Reqwest error was originated by hyper and map it consistently.
#[cfg(not(target_arch = "wasm32"))]
if let Some(hyper_error) = get_source_error_type::<hyper::Error>(&error) { if let Some(hyper_error) = get_source_error_type::<hyper::Error>(&error) {
// The hyper::Error(IncompleteMessage) is raised if the HTTP response is well formatted but does not contain all the bytes. // The hyper::Error(IncompleteMessage) is raised if the HTTP response is well formatted but does not contain all the bytes.
// This can happen when the server has started sending back the response but the connection is cut halfway thorugh. // This can happen when the server has started sending back the response but the connection is cut halfway thorugh.
@ -65,7 +68,7 @@ impl Retryable {
// Try and downcast the hyper error to io::Error if that is the // Try and downcast the hyper error to io::Error if that is the
// underlying error, and try and classify it. // underlying error, and try and classify it.
} else if let Some(io_error) = } else if let Some(io_error) =
get_source_error_type::<io::Error>(hyper_error) get_source_error_type::<std::io::Error>(hyper_error)
{ {
Some(classify_io_error(io_error)) Some(classify_io_error(io_error))
} else { } else {
@ -74,6 +77,8 @@ impl Retryable {
} else { } else {
Some(Retryable::Fatal) Some(Retryable::Fatal)
} }
#[cfg(target_arch = "wasm32")]
Some(Retryable::Fatal)
} else { } else {
// We omit checking if error.is_status() since we check that already. // We omit checking if error.is_status() since we check that already.
// However, if Response::error_for_status is used the status will still // However, if Response::error_for_status is used the status will still
@ -92,14 +97,18 @@ impl From<&reqwest::Error> for Retryable {
} }
} }
fn classify_io_error(error: &io::Error) -> Retryable { #[cfg(not(target_arch = "wasm32"))]
fn classify_io_error(error: &std::io::Error) -> Retryable {
match error.kind() { match error.kind() {
io::ErrorKind::ConnectionReset | io::ErrorKind::ConnectionAborted => Retryable::Transient, std::io::ErrorKind::ConnectionReset | std::io::ErrorKind::ConnectionAborted => {
Retryable::Transient
}
_ => Retryable::Fatal, _ => Retryable::Fatal,
} }
} }
/// Downcasts the given err source into T. /// Downcasts the given err source into T.
#[cfg(not(target_arch = "wasm32"))]
fn get_source_error_type<T: std::error::Error + 'static>( fn get_source_error_type<T: std::error::Error + 'static>(
err: &dyn std::error::Error, err: &dyn std::error::Error,
) -> Option<&T> { ) -> Option<&T> {

View file

@ -1,10 +1,8 @@
use async_std::io::ReadExt;
use async_std::io::WriteExt;
use async_std::net::{TcpListener, TcpStream};
use futures::future::BoxFuture; use futures::future::BoxFuture;
use futures::stream::StreamExt;
use std::error::Error; use std::error::Error;
use std::fmt; use std::fmt;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
type CustomMessageHandler = Box< type CustomMessageHandler = Box<
dyn Fn(TcpStream) -> BoxFuture<'static, Result<(), Box<dyn std::error::Error>>> + Send + Sync, dyn Fn(TcpStream) -> BoxFuture<'static, Result<(), Box<dyn std::error::Error>>> + Send + Sync,
@ -74,9 +72,9 @@ impl SimpleServer {
/// Starts the TcpListener and handles the requests. /// Starts the TcpListener and handles the requests.
pub async fn start(mut self) { pub async fn start(mut self) {
while let Some(stream) = self.listener.incoming().next().await { loop {
match stream { match self.listener.accept().await {
Ok(stream) => { Ok((stream, _)) => {
match self.handle_connection(stream).await { match self.handle_connection(stream).await {
Ok(_) => (), Ok(_) => (),
Err(e) => { Err(e) => {
@ -103,9 +101,9 @@ impl SimpleServer {
let mut buffer = vec![0; 1024]; let mut buffer = vec![0; 1024];
stream.read(&mut buffer).await.unwrap(); let n = stream.read(&mut buffer).await.unwrap();
let request = String::from_utf8_lossy(&buffer[..]); let request = String::from_utf8_lossy(&buffer[..n]);
let request_line = request.lines().next().unwrap(); let request_line = request.lines().next().unwrap();
let response = match Self::parse_request_line(request_line) { let response = match Self::parse_request_line(request_line) {
@ -120,7 +118,7 @@ impl SimpleServer {
}; };
println!("-- Response --\n{}\n--------------", response.clone()); println!("-- Response --\n{}\n--------------", response.clone());
stream.write(response.as_bytes()).await.unwrap(); stream.write_all(response.as_bytes()).await.unwrap();
stream.flush().await.unwrap(); stream.flush().await.unwrap();
Ok(()) Ok(())

View file

@ -1,5 +1,3 @@
use async_std::io::ReadExt;
use futures::AsyncWriteExt;
use futures::FutureExt; use futures::FutureExt;
use paste::paste; use paste::paste;
use reqwest::Client; use reqwest::Client;
@ -11,6 +9,8 @@ use std::sync::{
atomic::{AtomicU32, Ordering}, atomic::{AtomicU32, Ordering},
Arc, Arc,
}; };
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use wiremock::matchers::{method, path}; use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, Respond, ResponseTemplate}; use wiremock::{Mock, MockServer, Respond, ResponseTemplate};
@ -284,10 +284,13 @@ async fn assert_retry_on_hyper_canceled() {
let counter = counter.clone(); let counter = counter.clone();
async move { async move {
let mut buffer = Vec::new(); let mut buffer = Vec::new();
stream.read(&mut buffer).await.unwrap(); stream.read_buf(&mut buffer).await.unwrap();
if counter.fetch_add(1, Ordering::SeqCst) > 1 { if counter.fetch_add(1, Ordering::SeqCst) > 1 {
// This triggeres hyper:Error(Canceled). // This triggeres hyper:Error(Canceled).
let _res = stream.shutdown(std::net::Shutdown::Both); let _res = stream
.into_std()
.unwrap()
.shutdown(std::net::Shutdown::Both);
} else { } else {
let _res = stream.write("HTTP/1.1 200 OK\r\n\r\n".as_bytes()).await; let _res = stream.write("HTTP/1.1 200 OK\r\n\r\n".as_bytes()).await;
} }
@ -332,7 +335,7 @@ async fn assert_retry_on_connection_reset_by_peer() {
let counter = counter.clone(); let counter = counter.clone();
async move { async move {
let mut buffer = Vec::new(); let mut buffer = Vec::new();
stream.read(&mut buffer).await.unwrap(); stream.read_buf(&mut buffer).await.unwrap();
if counter.fetch_add(1, Ordering::SeqCst) > 1 { if counter.fetch_add(1, Ordering::SeqCst) > 1 {
// This triggeres hyper:Error(Io, io::Error(ConnectionReset)). // This triggeres hyper:Error(Io, io::Error(ConnectionReset)).
drop(stream); drop(stream);

View file

@ -1,6 +1,6 @@
[package] [package]
name = "reqwest-tracing" name = "reqwest-tracing"
version = "0.4.0" version = "0.4.1"
authors = ["Rodrigo Gryzinski <rodrigo.gryzinski@truelayer.com>"] authors = ["Rodrigo Gryzinski <rodrigo.gryzinski@truelayer.com>"]
edition = "2018" edition = "2018"
description = "Opentracing middleware for reqwest." description = "Opentracing middleware for reqwest."
@ -23,7 +23,7 @@ reqwest-middleware = { version = "0.2.0", path = "../reqwest-middleware" }
async-trait = "0.1.51" async-trait = "0.1.51"
reqwest = { version = "0.11", default-features = false } reqwest = { version = "0.11", default-features = false }
task-local-extensions = "0.1.1" task-local-extensions = "0.1.4"
tracing = "0.1.26" tracing = "0.1.26"
opentelemetry_0_13_pkg = { package = "opentelemetry", version = "0.13", optional = true } opentelemetry_0_13_pkg = { package = "opentelemetry", version = "0.13", optional = true }
@ -39,6 +39,8 @@ tracing-opentelemetry_0_16_pkg = { package = "tracing-opentelemetry",version = "
tracing-opentelemetry_0_17_pkg = { package = "tracing-opentelemetry",version = "0.17", optional = true } tracing-opentelemetry_0_17_pkg = { package = "tracing-opentelemetry",version = "0.17", optional = true }
tracing-opentelemetry_0_18_pkg = { package = "tracing-opentelemetry",version = "0.18", optional = true } tracing-opentelemetry_0_18_pkg = { package = "tracing-opentelemetry",version = "0.18", optional = true }
[target.'cfg(target_arch = "wasm32")'.dependencies]
getrandom = { version = "0.2", features = ["js"] }
[dev-dependencies] [dev-dependencies]
tokio = { version = "1", features = ["macros"] } tokio = { version = "1", features = ["macros"] }

View file

@ -25,7 +25,7 @@ tokio = { version = "1.12.0", features = ["macros", "rt-multi-thread"] }
tracing = "0.1" tracing = "0.1"
tracing-opentelemetry = "0.18" tracing-opentelemetry = "0.18"
tracing-subscriber = "0.3" tracing-subscriber = "0.3"
task-local-extensions = "0.1.0" task-local-extensions = "0.1.4"
``` ```
```rust,skip ```rust,skip

View file

@ -30,7 +30,8 @@ impl Default for TracingMiddleware<DefaultSpanBackend> {
} }
} }
#[async_trait::async_trait] #[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
impl<ReqwestOtelSpan> Middleware for TracingMiddleware<ReqwestOtelSpan> impl<ReqwestOtelSpan> Middleware for TracingMiddleware<ReqwestOtelSpan>
where where
ReqwestOtelSpan: ReqwestOtelSpanBackend + Sync + Send + 'static, ReqwestOtelSpan: ReqwestOtelSpanBackend + Sync + Send + 'static,