From 5402bc9c195f7c48431fdb7a38c99ee8f341cc79 Mon Sep 17 00:00:00 2001 From: Nutomic Date: Tue, 9 Apr 2024 10:38:08 +0200 Subject: [PATCH 01/25] Retry activity send in case of timeout or rate limit (#102) --- src/activity_sending.rs | 77 +++++++++++++++++++++++++++++++++++------ src/fetch/object_id.rs | 2 +- src/traits.rs | 2 +- 3 files changed, 68 insertions(+), 13 deletions(-) diff --git a/src/activity_sending.rs b/src/activity_sending.rs index f16ef37..ce53f94 100644 --- a/src/activity_sending.rs +++ b/src/activity_sending.rs @@ -12,14 +12,17 @@ use crate::{ }; use bytes::Bytes; use futures::StreamExt; +use http::StatusCode; use httpdate::fmt_http_date; use itertools::Itertools; use openssl::pkey::{PKey, Private}; -use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; +use reqwest::{ + header::{HeaderMap, HeaderName, HeaderValue}, + Response, +}; use reqwest_middleware::ClientWithMiddleware; use serde::Serialize; use std::{ - self, fmt::{Debug, Display}, time::{Duration, SystemTime}, }; @@ -90,20 +93,30 @@ impl SendActivityTask { ) .await?; let response = client.execute(request).await?; + self.handle_response(response).await + } - match response { - o if o.status().is_success() => { + /// Based on the HTTP status code determines if an activity was delivered successfully. In that case + /// Ok is returned. Otherwise it returns Err and the activity send should be retried later. + /// + /// Equivalent code in mastodon: https://github.com/mastodon/mastodon/blob/v4.2.8/app/helpers/jsonld_helper.rb#L215-L217 + async fn handle_response(&self, response: Response) -> Result<(), Error> { + match response.status() { + status if status.is_success() => { debug!("Activity {self} delivered successfully"); Ok(()) } - o if o.status().is_client_error() => { - let text = o.text_limited().await?; + status + if status.is_client_error() + && status != StatusCode::REQUEST_TIMEOUT + && status != StatusCode::TOO_MANY_REQUESTS => + { + let text = response.text_limited().await?; debug!("Activity {self} was rejected, aborting: {text}"); Ok(()) } - o => { - let status = o.status(); - let text = o.text_limited().await?; + status => { + let text = response.text_limited().await?; Err(Error::Other(format!( "Activity {self} failure with status {status}: {text}", @@ -214,8 +227,6 @@ pub(crate) fn generate_request_headers(inbox_url: &Url) -> HeaderMap { mod tests { use super::*; use crate::{config::FederationConfig, http_signatures::generate_actor_keypair}; - use bytes::Bytes; - use http::StatusCode; use std::{ sync::{atomic::AtomicUsize, Arc}, time::Instant, @@ -289,4 +300,48 @@ mod tests { info!("Queue Sent: {:?}", start.elapsed()); Ok(()) } + + #[tokio::test] + async fn test_handle_response() { + let keypair = generate_actor_keypair().unwrap(); + let message = SendActivityTask { + actor_id: "http://localhost:8001".parse().unwrap(), + activity_id: "http://localhost:8001/activity".parse().unwrap(), + activity: "{}".into(), + inbox: "http://localhost:8001".parse().unwrap(), + private_key: keypair.private_key().unwrap(), + http_signature_compat: true, + }; + + let res = |status| { + http::Response::builder() + .status(status) + .body(vec![]) + .unwrap() + .into() + }; + + assert!(message.handle_response(res(StatusCode::OK)).await.is_ok()); + assert!(message + .handle_response(res(StatusCode::BAD_REQUEST)) + .await + .is_ok()); + + assert!(message + .handle_response(res(StatusCode::MOVED_PERMANENTLY)) + .await + .is_err()); + assert!(message + .handle_response(res(StatusCode::REQUEST_TIMEOUT)) + .await + .is_err()); + assert!(message + .handle_response(res(StatusCode::TOO_MANY_REQUESTS)) + .await + .is_err()); + assert!(message + .handle_response(res(StatusCode::INTERNAL_SERVER_ERROR)) + .await + .is_err()); + } } diff --git a/src/fetch/object_id.rs b/src/fetch/object_id.rs index f3fa560..061a9f5 100644 --- a/src/fetch/object_id.rs +++ b/src/fetch/object_id.rs @@ -347,7 +347,7 @@ const _IMPL_DIESEL_NEW_TYPE_FOR_OBJECT_ID: () = { #[cfg(test)] pub mod tests { use super::*; - use crate::{fetch::object_id::should_refetch_object, traits::tests::DbUser}; + use crate::traits::tests::DbUser; #[test] fn test_deserialize() { diff --git a/src/traits.rs b/src/traits.rs index 9fdec27..720f731 100644 --- a/src/traits.rs +++ b/src/traits.rs @@ -343,7 +343,7 @@ pub mod tests { error::Error, fetch::object_id::ObjectId, http_signatures::{generate_actor_keypair, Keypair}, - protocol::{public_key::PublicKey, verification::verify_domains_match}, + protocol::verification::verify_domains_match, }; use activitystreams_kinds::{activity::FollowType, actor::PersonType}; use once_cell::sync::Lazy; From a2ac97db98c82b9bdfddcec73fcc84451ffea8aa Mon Sep 17 00:00:00 2001 From: Nutomic Date: Tue, 9 Apr 2024 11:28:22 +0200 Subject: [PATCH 02/25] Allow fetching from local domain in case it redirects to remote (#104) * Allow fetching from local domain in case it redirects to remote * clippy * fix lemmy tests --- examples/local_federation/instance.rs | 1 + src/fetch/mod.rs | 11 ++++++++--- src/fetch/object_id.rs | 12 +++--------- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/examples/local_federation/instance.rs b/examples/local_federation/instance.rs index f377f31..ad1fd12 100644 --- a/examples/local_federation/instance.rs +++ b/examples/local_federation/instance.rs @@ -28,6 +28,7 @@ pub async fn new_instance( .domain(hostname) .signed_fetch_actor(&system_user) .app_data(database) + .url_verifier(Box::new(MyUrlVerifier())) .debug(true) .build() .await?; diff --git a/src/fetch/mod.rs b/src/fetch/mod.rs index 13ea16c..4d563b6 100644 --- a/src/fetch/mod.rs +++ b/src/fetch/mod.rs @@ -69,10 +69,17 @@ pub async fn fetch_object_http( return Err(Error::FetchInvalidContentType(res.url)); } - // Ensure id field matches final url + // Ensure id field matches final url after redirect if res.object_id.as_ref() != Some(&res.url) { return Err(Error::FetchWrongId(res.url)); } + + // Dont allow fetching local object. Only check this after the request as a local url + // may redirect to a remote object. + if data.config.is_local_url(&res.url) { + return Err(Error::NotFound); + } + Ok(res) } @@ -84,8 +91,6 @@ async fn fetch_object_http_with_accept( content_type: &HeaderValue, ) -> Result, Error> { let config = &data.config; - // dont fetch local objects this way - debug_assert!(url.domain() != Some(&config.domain)); config.verify_url_valid(url).await?; info!("Fetching remote object {}", url.to_string()); diff --git a/src/fetch/object_id.rs b/src/fetch/object_id.rs index 061a9f5..50e75bc 100644 --- a/src/fetch/object_id.rs +++ b/src/fetch/object_id.rs @@ -88,19 +88,13 @@ where ::Error: From, { let db_object = self.dereference_from_db(data).await?; - // if its a local object, only fetch it from the database and not over http - if data.config.is_local_url(&self.0) { - return match db_object { - None => Err(Error::NotFound.into()), - Some(o) => Ok(o), - }; - } // object found in database if let Some(object) = db_object { - // object is old and should be refetched if let Some(last_refreshed_at) = object.last_refreshed_at() { - if should_refetch_object(last_refreshed_at) { + let is_local = data.config.is_local_url(&self.0); + if !is_local && should_refetch_object(last_refreshed_at) { + // object is outdated and should be refetched return self.dereference_from_http(data, Some(object)).await; } } From 7def01a19af567c668da663d4a4c88e0b631a57e Mon Sep 17 00:00:00 2001 From: Nutomic Date: Tue, 9 Apr 2024 11:28:57 +0200 Subject: [PATCH 03/25] Avoid running ci checks twice (#105) * Avoid running ci checks twice * upgrade rust * move clippy config to cargo.toml --- .woodpecker.yml | 42 ++++++++++++++++--------------- Cargo.toml | 17 +++++++++++++ examples/live_federation/main.rs | 2 ++ examples/local_federation/main.rs | 2 ++ src/activity_queue.rs | 1 + src/activity_sending.rs | 1 + src/actix_web/inbox.rs | 1 + src/fetch/object_id.rs | 1 + src/fetch/webfinger.rs | 1 + src/http_signatures.rs | 1 + 10 files changed, 49 insertions(+), 20 deletions(-) diff --git a/.woodpecker.yml b/.woodpecker.yml index b563544..d14bc3d 100644 --- a/.woodpecker.yml +++ b/.woodpecker.yml @@ -1,54 +1,56 @@ -pipeline: +variables: + - &rust_image "rust:1.77-bullseye" + +steps: cargo_fmt: image: rustdocker/rust:nightly commands: - /root/.cargo/bin/cargo fmt -- --check - - cargo_check: - image: rust:1.70-bullseye - environment: - CARGO_HOME: .cargo - commands: - - cargo check --all-features --all-targets + when: + - event: pull_request cargo_clippy: - image: rust:1.70-bullseye + image: *rust_image environment: CARGO_HOME: .cargo commands: - rustup component add clippy - - cargo clippy --all-targets --all-features -- - -D warnings -D deprecated -D clippy::perf -D clippy::complexity - -D clippy::dbg_macro -D clippy::inefficient_to_string - -D clippy::items-after-statements -D clippy::implicit_clone - -D clippy::wildcard_imports -D clippy::cast_lossless - -D clippy::manual_string_new -D clippy::redundant_closure_for_method_calls - - cargo clippy --all-features -- -D clippy::unwrap_used + - cargo clippy --all-targets --all-features + when: + - event: pull_request cargo_test: - image: rust:1.70-bullseye + image: *rust_image environment: CARGO_HOME: .cargo commands: - cargo test --all-features --no-fail-fast + when: + - event: pull_request cargo_doc: - image: rust:1.70-bullseye + image: *rust_image environment: CARGO_HOME: .cargo commands: - cargo doc --all-features + when: + - event: pull_request cargo_run_actix_example: - image: rust:1.70-bullseye + image: *rust_image environment: CARGO_HOME: .cargo commands: - cargo run --example local_federation actix-web + when: + - event: pull_request cargo_run_axum_example: - image: rust:1.70-bullseye + image: *rust_image environment: CARGO_HOME: .cargo commands: - cargo run --example local_federation axum + when: + - event: pull_request diff --git a/Cargo.toml b/Cargo.toml index 711ffc2..caef7d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,23 @@ actix-web = ["dep:actix-web"] axum = ["dep:axum", "dep:tower", "dep:hyper", "dep:http-body-util"] diesel = ["dep:diesel"] +[lints.rust] +warnings = "deny" +deprecated = "deny" + +[lints.clippy] +perf = "deny" +complexity = "deny" +dbg_macro = "deny" +inefficient_to_string = "deny" +items-after-statements = "deny" +implicit_clone = "deny" +wildcard_imports = "deny" +cast_lossless = "deny" +manual_string_new = "deny" +redundant_closure_for_method_calls = "deny" +unwrap_used = "deny" + [dependencies] chrono = { version = "0.4.34", features = ["clock"], default-features = false } serde = { version = "1.0.197", features = ["derive"] } diff --git a/examples/live_federation/main.rs b/examples/live_federation/main.rs index 4326226..3fa0b18 100644 --- a/examples/live_federation/main.rs +++ b/examples/live_federation/main.rs @@ -1,3 +1,5 @@ +#![allow(clippy::unwrap_used)] + use crate::{ database::Database, http::{http_get_user, http_post_user_inbox, webfinger}, diff --git a/examples/local_federation/main.rs b/examples/local_federation/main.rs index 1597668..d23a594 100644 --- a/examples/local_federation/main.rs +++ b/examples/local_federation/main.rs @@ -1,3 +1,5 @@ +#![allow(clippy::unwrap_used)] + use crate::{ instance::{listen, new_instance, Webserver}, objects::post::DbPost, diff --git a/src/activity_queue.rs b/src/activity_queue.rs index abb5800..20852bd 100644 --- a/src/activity_queue.rs +++ b/src/activity_queue.rs @@ -416,6 +416,7 @@ async fn retry>, A: FnMut } #[cfg(test)] +#[allow(clippy::unwrap_used)] mod tests { use super::*; use crate::http_signatures::generate_actor_keypair; diff --git a/src/activity_sending.rs b/src/activity_sending.rs index ce53f94..4af8439 100644 --- a/src/activity_sending.rs +++ b/src/activity_sending.rs @@ -224,6 +224,7 @@ pub(crate) fn generate_request_headers(inbox_url: &Url) -> HeaderMap { } #[cfg(test)] +#[allow(clippy::unwrap_used)] mod tests { use super::*; use crate::{config::FederationConfig, http_signatures::generate_actor_keypair}; diff --git a/src/actix_web/inbox.rs b/src/actix_web/inbox.rs index b9c6379..7c10659 100644 --- a/src/actix_web/inbox.rs +++ b/src/actix_web/inbox.rs @@ -45,6 +45,7 @@ where } #[cfg(test)] +#[allow(clippy::unwrap_used)] mod test { use super::*; use crate::{ diff --git a/src/fetch/object_id.rs b/src/fetch/object_id.rs index 50e75bc..d30fe0a 100644 --- a/src/fetch/object_id.rs +++ b/src/fetch/object_id.rs @@ -339,6 +339,7 @@ const _IMPL_DIESEL_NEW_TYPE_FOR_OBJECT_ID: () = { }; #[cfg(test)] +#[allow(clippy::unwrap_used)] pub mod tests { use super::*; use crate::traits::tests::DbUser; diff --git a/src/fetch/webfinger.rs b/src/fetch/webfinger.rs index f065618..7ca4903 100644 --- a/src/fetch/webfinger.rs +++ b/src/fetch/webfinger.rs @@ -245,6 +245,7 @@ pub struct WebfingerLink { } #[cfg(test)] +#[allow(clippy::unwrap_used)] mod tests { use super::*; use crate::{ diff --git a/src/http_signatures.rs b/src/http_signatures.rs index bc27ee5..7e26e8d 100644 --- a/src/http_signatures.rs +++ b/src/http_signatures.rs @@ -275,6 +275,7 @@ pub(crate) fn verify_body_hash( } #[cfg(test)] +#[allow(clippy::unwrap_used)] pub mod test { use super::*; use crate::activity_sending::generate_request_headers; From 779313ac22b804777a569738b1828cc4ebaa1032 Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Tue, 9 Apr 2024 11:30:43 +0200 Subject: [PATCH 04/25] Version 0.5.3 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index caef7d1..e4cb6ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "activitypub_federation" -version = "0.5.2" +version = "0.5.3" edition = "2021" description = "High-level Activitypub framework" keywords = ["activitypub", "activitystreams", "federation", "fediverse"] From 54e8a1145f3b3d6785d31a33f715b26b4ef69766 Mon Sep 17 00:00:00 2001 From: Nutomic Date: Wed, 10 Apr 2024 11:31:55 +0200 Subject: [PATCH 05/25] Add function ObjectId.is_local (#106) * Add function ObjectId.is_local * add test * add test --- examples/live_federation/objects/post.rs | 12 ++++++++--- src/config.rs | 27 ++++++++++++++++++++++++ src/fetch/object_id.rs | 5 +++++ 3 files changed, 41 insertions(+), 3 deletions(-) diff --git a/examples/live_federation/objects/post.rs b/examples/live_federation/objects/post.rs index 9a08b9d..1b19fac 100644 --- a/examples/live_federation/objects/post.rs +++ b/examples/live_federation/objects/post.rs @@ -21,7 +21,6 @@ pub struct DbPost { pub text: String, pub ap_id: ObjectId, pub creator: ObjectId, - pub local: bool, } #[derive(Deserialize, Serialize, Debug)] @@ -59,7 +58,15 @@ impl Object for DbPost { } async fn into_json(self, _data: &Data) -> Result { - unimplemented!() + Ok(Note { + kind: NoteType::Note, + id: self.ap_id, + content: self.text, + attributed_to: self.creator, + to: vec![public()], + tag: vec![], + in_reply_to: None, + }) } async fn verify( @@ -81,7 +88,6 @@ impl Object for DbPost { text: json.content, ap_id: json.id.clone(), creator: json.attributed_to.clone(), - local: false, }; let mention = Mention { diff --git a/src/config.rs b/src/config.rs index b7aea9c..6142cfd 100644 --- a/src/config.rs +++ b/src/config.rs @@ -341,3 +341,30 @@ impl FederationMiddleware { FederationMiddleware(config) } } + +#[cfg(test)] +#[allow(clippy::unwrap_used)] +mod test { + use super::*; + + async fn config() -> FederationConfig { + FederationConfig::builder() + .domain("example.com") + .app_data(1) + .build() + .await + .unwrap() + } + #[tokio::test] + async fn test_url_is_local() -> Result<(), Error> { + let config = config().await; + assert!(config.is_local_url(&Url::parse("http://example.com")?)); + assert!(!config.is_local_url(&Url::parse("http://other.com")?)); + Ok(()) + } + #[tokio::test] + async fn test_get_domain() { + let config = config().await; + assert_eq!("example.com", config.domain()); + } +} diff --git a/src/fetch/object_id.rs b/src/fetch/object_id.rs index d30fe0a..ce52c43 100644 --- a/src/fetch/object_id.rs +++ b/src/fetch/object_id.rs @@ -169,6 +169,11 @@ where Kind::verify(&res.object, redirect_url, data).await?; Kind::from_json(res.object, data).await } + + /// Returns true if the object's domain matches the one defined in [[FederationConfig.domain]]. + pub fn is_local(&self, data: &Data<::DataType>) -> bool { + data.config.is_local_url(&self.0) + } } /// Need to implement clone manually, to avoid requiring Kind to be Clone From ee268405f71a74051fbb0b5bd10426b0f5f1a2ac Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Wed, 10 Apr 2024 11:32:14 +0200 Subject: [PATCH 06/25] Version 0.5.4 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index e4cb6ad..419c6c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "activitypub_federation" -version = "0.5.3" +version = "0.5.4" edition = "2021" description = "High-level Activitypub framework" keywords = ["activitypub", "activitystreams", "federation", "fediverse"] From ddc455510b8b294e75907b747068a6c9e182d511 Mon Sep 17 00:00:00 2001 From: Nutomic Date: Thu, 2 May 2024 10:58:33 +0200 Subject: [PATCH 07/25] Dont crash when calling is_local_url() without domain (#108) --- src/config.rs | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/src/config.rs b/src/config.rs index 6142cfd..f5ccfff 100644 --- a/src/config.rs +++ b/src/config.rs @@ -174,11 +174,17 @@ impl FederationConfig { /// Returns true if the url refers to this instance. Handles hostnames like `localhost:8540` for /// local debugging. pub(crate) fn is_local_url(&self, url: &Url) -> bool { - let mut domain = url.host_str().expect("id has domain").to_string(); - if let Some(port) = url.port() { - domain = format!("{}:{}", domain, port); + match url.host_str() { + Some(domain) => { + let domain = if let Some(port) = url.port() { + format!("{}:{}", domain, port) + } else { + domain.to_string() + }; + domain == self.domain + } + None => false, } - domain == self.domain } /// Returns the local domain @@ -355,13 +361,17 @@ mod test { .await .unwrap() } + #[tokio::test] async fn test_url_is_local() -> Result<(), Error> { let config = config().await; assert!(config.is_local_url(&Url::parse("http://example.com")?)); assert!(!config.is_local_url(&Url::parse("http://other.com")?)); + // ensure that missing domain doesnt cause crash + assert!(!config.is_local_url(&Url::parse("http://127.0.0.1")?)); Ok(()) } + #[tokio::test] async fn test_get_domain() { let config = config().await; From be69efdee36652bcdead12f39ea0f1193085532f Mon Sep 17 00:00:00 2001 From: Nutomic Date: Thu, 2 May 2024 10:58:56 +0200 Subject: [PATCH 08/25] Require signed digest when verifying signatures (#109) --- src/http_signatures.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/http_signatures.rs b/src/http_signatures.rs index 7e26e8d..1f4e15b 100644 --- a/src/http_signatures.rs +++ b/src/http_signatures.rs @@ -189,8 +189,11 @@ fn verify_signature_inner( uri: &Uri, public_key: &str, ) -> Result<(), Error> { - static CONFIG: Lazy = - Lazy::new(|| http_signature_normalization::Config::new().set_expiration(EXPIRES_AFTER)); + static CONFIG: Lazy = Lazy::new(|| { + http_signature_normalization::Config::new() + .set_expiration(EXPIRES_AFTER) + .require_digest() + }); let path_and_query = uri.path_and_query().map(PathAndQuery::as_str).unwrap_or(""); From c48de9e9448a558a592346e6d88f90b6855ba9ec Mon Sep 17 00:00:00 2001 From: Nutomic Date: Thu, 2 May 2024 12:58:08 +0200 Subject: [PATCH 09/25] Upgrade dependencies (#110) --- Cargo.toml | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 419c6c9..5173318 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,21 +32,21 @@ redundant_closure_for_method_calls = "deny" unwrap_used = "deny" [dependencies] -chrono = { version = "0.4.34", features = ["clock"], default-features = false } -serde = { version = "1.0.197", features = ["derive"] } -async-trait = "0.1.77" +chrono = { version = "0.4.38", features = ["clock"], default-features = false } +serde = { version = "1.0.200", features = ["derive"] } +async-trait = "0.1.80" url = { version = "2.5.0", features = ["serde"] } -serde_json = { version = "1.0.114", features = ["preserve_order"] } -reqwest = { version = "0.11.24", features = ["json", "stream"] } -reqwest-middleware = "0.2.4" +serde_json = { version = "1.0.116", features = ["preserve_order"] } +reqwest = { version = "0.11.27", features = ["json", "stream"] } +reqwest-middleware = "0.2.5" tracing = "0.1.40" -base64 = "0.21.7" +base64 = "0.22.1" openssl = "0.10.64" once_cell = "1.19.0" -http = "0.2.11" +http = "0.2.12" sha2 = "0.10.8" -thiserror = "1.0.57" -derive_builder = "0.12.0" +thiserror = "1.0.59" +derive_builder = "0.20.0" itertools = "0.12.1" dyn-clone = "1.0.17" enum_delegate = "0.2.0" @@ -57,20 +57,20 @@ http-signature-normalization-reqwest = { version = "0.10.0", default-features = "default-spawner", ] } http-signature-normalization = "0.7.0" -bytes = "1.5.0" +bytes = "1.6.0" futures-core = { version = "0.3.30", default-features = false } -pin-project-lite = "0.2.13" +pin-project-lite = "0.2.14" activitystreams-kinds = "0.3.0" -regex = { version = "1.10.3", default-features = false, features = ["std", "unicode-case"] } -tokio = { version = "1.36.0", features = [ +regex = { version = "1.10.4", default-features = false, features = ["std", "unicode-case"] } +tokio = { version = "1.37.0", features = [ "sync", "rt", "rt-multi-thread", "time", ] } -diesel = { version = "2.1.4", features = ["postgres"], default-features = false, optional = true } +diesel = { version = "2.1.6", features = ["postgres"], default-features = false, optional = true } futures = "0.3.30" -moka = { version = "0.12.5", features = ["future"] } +moka = { version = "0.12.7", features = ["future"] } # Actix-web actix-web = { version = "4.5.1", default-features = false, optional = true } @@ -82,12 +82,12 @@ axum = { version = "0.6.20", features = [ ], default-features = false, optional = true } tower = { version = "0.4.13", optional = true } hyper = { version = "0.14", optional = true } -http-body-util = {version = "0.1.0", optional = true } +http-body-util = {version = "0.1.1", optional = true } [dev-dependencies] -anyhow = "1.0.80" +anyhow = "1.0.82" rand = "0.8.5" -env_logger = "0.10.2" +env_logger = "0.11.3" tower-http = { version = "0.5.2", features = ["map-request-body", "util"] } axum = { version = "0.6.20", features = [ "http1", @@ -95,7 +95,7 @@ axum = { version = "0.6.20", features = [ "query", ], default-features = false } axum-macros = "0.3.8" -tokio = { version = "1.36.0", features = ["full"] } +tokio = { version = "1.37.0", features = ["full"] } [profile.dev] strip = "symbols" From 24afad7abc8361bbc45f794e5d2e8010404501d6 Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Thu, 2 May 2024 13:06:22 +0200 Subject: [PATCH 10/25] Version 0.5.5 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 5173318..ba11e32 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "activitypub_federation" -version = "0.5.4" +version = "0.5.5" edition = "2021" description = "High-level Activitypub framework" keywords = ["activitypub", "activitystreams", "federation", "fediverse"] From cf1f84993b984051581b2b76be0a0cc595378354 Mon Sep 17 00:00:00 2001 From: Nutomic Date: Mon, 6 May 2024 11:09:23 +0200 Subject: [PATCH 11/25] Make response content-type check case insensitive (#111) * Make response content-type check case insensitive For wordpress compat * cleaner * clippy * fmt * fmt --- src/fetch/mod.rs | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/src/fetch/mod.rs b/src/fetch/mod.rs index 4d563b6..67165a9 100644 --- a/src/fetch/mod.rs +++ b/src/fetch/mod.rs @@ -53,19 +53,21 @@ pub async fn fetch_object_http( url: &Url, data: &Data, ) -> Result, Error> { - static CONTENT_TYPE: HeaderValue = HeaderValue::from_static(FEDERATION_CONTENT_TYPE); - static ALT_CONTENT_TYPE: HeaderValue = HeaderValue::from_static( - r#"application/ld+json; profile="https://www.w3.org/ns/activitystreams""#, - ); - static ALT_CONTENT_TYPE_MASTODON: HeaderValue = - HeaderValue::from_static(r#"application/activity+json; charset=utf-8"#); - let res = fetch_object_http_with_accept(url, data, &CONTENT_TYPE).await?; + static FETCH_CONTENT_TYPE: HeaderValue = HeaderValue::from_static(FEDERATION_CONTENT_TYPE); + const VALID_RESPONSE_CONTENT_TYPES: [&str; 3] = [ + FEDERATION_CONTENT_TYPE, // lemmy + r#"application/ld+json; profile="https://www.w3.org/ns/activitystreams""#, // activitypub standard + r#"application/activity+json; charset=utf-8"#, // mastodon + ]; + let res = fetch_object_http_with_accept(url, data, &FETCH_CONTENT_TYPE).await?; - // Ensure correct content-type to prevent vulnerabilities. - if res.content_type.as_ref() != Some(&CONTENT_TYPE) - && res.content_type.as_ref() != Some(&ALT_CONTENT_TYPE) - && res.content_type.as_ref() != Some(&ALT_CONTENT_TYPE_MASTODON) - { + // Ensure correct content-type to prevent vulnerabilities, with case insensitive comparison. + let content_type = res + .content_type + .as_ref() + .and_then(|c| c.to_str().ok()) + .ok_or(Error::FetchInvalidContentType(res.url.clone()))?; + if !VALID_RESPONSE_CONTENT_TYPES.contains(&content_type) { return Err(Error::FetchInvalidContentType(res.url)); } From 16844f048afeeeb5d7d0df3a559505cbcd684a61 Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Mon, 6 May 2024 11:09:47 +0200 Subject: [PATCH 12/25] Version 0.5.6 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index ba11e32..eabcee6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "activitypub_federation" -version = "0.5.5" +version = "0.5.6" edition = "2021" description = "High-level Activitypub framework" keywords = ["activitypub", "activitystreams", "federation", "fediverse"] From 32da1b747c08d824e84874617f232f2664bf3588 Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Wed, 5 Jun 2024 23:05:26 +0200 Subject: [PATCH 13/25] Revert "Retry activity send in case of timeout or rate limit (#102)" This reverts commit 5402bc9c195f7c48431fdb7a38c99ee8f341cc79. --- src/activity_sending.rs | 77 ++++++----------------------------------- src/fetch/object_id.rs | 2 +- src/traits.rs | 2 +- 3 files changed, 13 insertions(+), 68 deletions(-) diff --git a/src/activity_sending.rs b/src/activity_sending.rs index 4af8439..f7daac4 100644 --- a/src/activity_sending.rs +++ b/src/activity_sending.rs @@ -12,17 +12,14 @@ use crate::{ }; use bytes::Bytes; use futures::StreamExt; -use http::StatusCode; use httpdate::fmt_http_date; use itertools::Itertools; use openssl::pkey::{PKey, Private}; -use reqwest::{ - header::{HeaderMap, HeaderName, HeaderValue}, - Response, -}; +use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; use reqwest_middleware::ClientWithMiddleware; use serde::Serialize; use std::{ + self, fmt::{Debug, Display}, time::{Duration, SystemTime}, }; @@ -93,30 +90,20 @@ impl SendActivityTask { ) .await?; let response = client.execute(request).await?; - self.handle_response(response).await - } - /// Based on the HTTP status code determines if an activity was delivered successfully. In that case - /// Ok is returned. Otherwise it returns Err and the activity send should be retried later. - /// - /// Equivalent code in mastodon: https://github.com/mastodon/mastodon/blob/v4.2.8/app/helpers/jsonld_helper.rb#L215-L217 - async fn handle_response(&self, response: Response) -> Result<(), Error> { - match response.status() { - status if status.is_success() => { + match response { + o if o.status().is_success() => { debug!("Activity {self} delivered successfully"); Ok(()) } - status - if status.is_client_error() - && status != StatusCode::REQUEST_TIMEOUT - && status != StatusCode::TOO_MANY_REQUESTS => - { - let text = response.text_limited().await?; + o if o.status().is_client_error() => { + let text = o.text_limited().await?; debug!("Activity {self} was rejected, aborting: {text}"); Ok(()) } - status => { - let text = response.text_limited().await?; + o => { + let status = o.status(); + let text = o.text_limited().await?; Err(Error::Other(format!( "Activity {self} failure with status {status}: {text}", @@ -228,6 +215,8 @@ pub(crate) fn generate_request_headers(inbox_url: &Url) -> HeaderMap { mod tests { use super::*; use crate::{config::FederationConfig, http_signatures::generate_actor_keypair}; + use bytes::Bytes; + use http::StatusCode; use std::{ sync::{atomic::AtomicUsize, Arc}, time::Instant, @@ -301,48 +290,4 @@ mod tests { info!("Queue Sent: {:?}", start.elapsed()); Ok(()) } - - #[tokio::test] - async fn test_handle_response() { - let keypair = generate_actor_keypair().unwrap(); - let message = SendActivityTask { - actor_id: "http://localhost:8001".parse().unwrap(), - activity_id: "http://localhost:8001/activity".parse().unwrap(), - activity: "{}".into(), - inbox: "http://localhost:8001".parse().unwrap(), - private_key: keypair.private_key().unwrap(), - http_signature_compat: true, - }; - - let res = |status| { - http::Response::builder() - .status(status) - .body(vec![]) - .unwrap() - .into() - }; - - assert!(message.handle_response(res(StatusCode::OK)).await.is_ok()); - assert!(message - .handle_response(res(StatusCode::BAD_REQUEST)) - .await - .is_ok()); - - assert!(message - .handle_response(res(StatusCode::MOVED_PERMANENTLY)) - .await - .is_err()); - assert!(message - .handle_response(res(StatusCode::REQUEST_TIMEOUT)) - .await - .is_err()); - assert!(message - .handle_response(res(StatusCode::TOO_MANY_REQUESTS)) - .await - .is_err()); - assert!(message - .handle_response(res(StatusCode::INTERNAL_SERVER_ERROR)) - .await - .is_err()); - } } diff --git a/src/fetch/object_id.rs b/src/fetch/object_id.rs index ce52c43..7c6abb8 100644 --- a/src/fetch/object_id.rs +++ b/src/fetch/object_id.rs @@ -347,7 +347,7 @@ const _IMPL_DIESEL_NEW_TYPE_FOR_OBJECT_ID: () = { #[allow(clippy::unwrap_used)] pub mod tests { use super::*; - use crate::traits::tests::DbUser; + use crate::{fetch::object_id::should_refetch_object, traits::tests::DbUser}; #[test] fn test_deserialize() { diff --git a/src/traits.rs b/src/traits.rs index 720f731..9fdec27 100644 --- a/src/traits.rs +++ b/src/traits.rs @@ -343,7 +343,7 @@ pub mod tests { error::Error, fetch::object_id::ObjectId, http_signatures::{generate_actor_keypair, Keypair}, - protocol::verification::verify_domains_match, + protocol::{public_key::PublicKey, verification::verify_domains_match}, }; use activitystreams_kinds::{activity::FollowType, actor::PersonType}; use once_cell::sync::Lazy; From a25114095271d1e22657adbc63c4dd77bcf8cf46 Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Wed, 5 Jun 2024 23:09:53 +0200 Subject: [PATCH 14/25] Version 0.5.7 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index eabcee6..8d70ca4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "activitypub_federation" -version = "0.5.6" +version = "0.5.7" edition = "2021" description = "High-level Activitypub framework" keywords = ["activitypub", "activitystreams", "federation", "fediverse"] From e118e4f24099736ef6eaf827ca94c98f9f6df166 Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Thu, 6 Jun 2024 00:02:36 +0200 Subject: [PATCH 15/25] Reapply "Retry activity send in case of timeout or rate limit (#102)" This reverts commit 32da1b747c08d824e84874617f232f2664bf3588. --- src/activity_sending.rs | 77 +++++++++++++++++++++++++++++++++++------ src/fetch/object_id.rs | 2 +- src/traits.rs | 2 +- 3 files changed, 68 insertions(+), 13 deletions(-) diff --git a/src/activity_sending.rs b/src/activity_sending.rs index f7daac4..4af8439 100644 --- a/src/activity_sending.rs +++ b/src/activity_sending.rs @@ -12,14 +12,17 @@ use crate::{ }; use bytes::Bytes; use futures::StreamExt; +use http::StatusCode; use httpdate::fmt_http_date; use itertools::Itertools; use openssl::pkey::{PKey, Private}; -use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; +use reqwest::{ + header::{HeaderMap, HeaderName, HeaderValue}, + Response, +}; use reqwest_middleware::ClientWithMiddleware; use serde::Serialize; use std::{ - self, fmt::{Debug, Display}, time::{Duration, SystemTime}, }; @@ -90,20 +93,30 @@ impl SendActivityTask { ) .await?; let response = client.execute(request).await?; + self.handle_response(response).await + } - match response { - o if o.status().is_success() => { + /// Based on the HTTP status code determines if an activity was delivered successfully. In that case + /// Ok is returned. Otherwise it returns Err and the activity send should be retried later. + /// + /// Equivalent code in mastodon: https://github.com/mastodon/mastodon/blob/v4.2.8/app/helpers/jsonld_helper.rb#L215-L217 + async fn handle_response(&self, response: Response) -> Result<(), Error> { + match response.status() { + status if status.is_success() => { debug!("Activity {self} delivered successfully"); Ok(()) } - o if o.status().is_client_error() => { - let text = o.text_limited().await?; + status + if status.is_client_error() + && status != StatusCode::REQUEST_TIMEOUT + && status != StatusCode::TOO_MANY_REQUESTS => + { + let text = response.text_limited().await?; debug!("Activity {self} was rejected, aborting: {text}"); Ok(()) } - o => { - let status = o.status(); - let text = o.text_limited().await?; + status => { + let text = response.text_limited().await?; Err(Error::Other(format!( "Activity {self} failure with status {status}: {text}", @@ -215,8 +228,6 @@ pub(crate) fn generate_request_headers(inbox_url: &Url) -> HeaderMap { mod tests { use super::*; use crate::{config::FederationConfig, http_signatures::generate_actor_keypair}; - use bytes::Bytes; - use http::StatusCode; use std::{ sync::{atomic::AtomicUsize, Arc}, time::Instant, @@ -290,4 +301,48 @@ mod tests { info!("Queue Sent: {:?}", start.elapsed()); Ok(()) } + + #[tokio::test] + async fn test_handle_response() { + let keypair = generate_actor_keypair().unwrap(); + let message = SendActivityTask { + actor_id: "http://localhost:8001".parse().unwrap(), + activity_id: "http://localhost:8001/activity".parse().unwrap(), + activity: "{}".into(), + inbox: "http://localhost:8001".parse().unwrap(), + private_key: keypair.private_key().unwrap(), + http_signature_compat: true, + }; + + let res = |status| { + http::Response::builder() + .status(status) + .body(vec![]) + .unwrap() + .into() + }; + + assert!(message.handle_response(res(StatusCode::OK)).await.is_ok()); + assert!(message + .handle_response(res(StatusCode::BAD_REQUEST)) + .await + .is_ok()); + + assert!(message + .handle_response(res(StatusCode::MOVED_PERMANENTLY)) + .await + .is_err()); + assert!(message + .handle_response(res(StatusCode::REQUEST_TIMEOUT)) + .await + .is_err()); + assert!(message + .handle_response(res(StatusCode::TOO_MANY_REQUESTS)) + .await + .is_err()); + assert!(message + .handle_response(res(StatusCode::INTERNAL_SERVER_ERROR)) + .await + .is_err()); + } } diff --git a/src/fetch/object_id.rs b/src/fetch/object_id.rs index 7c6abb8..ce52c43 100644 --- a/src/fetch/object_id.rs +++ b/src/fetch/object_id.rs @@ -347,7 +347,7 @@ const _IMPL_DIESEL_NEW_TYPE_FOR_OBJECT_ID: () = { #[allow(clippy::unwrap_used)] pub mod tests { use super::*; - use crate::{fetch::object_id::should_refetch_object, traits::tests::DbUser}; + use crate::traits::tests::DbUser; #[test] fn test_deserialize() { diff --git a/src/traits.rs b/src/traits.rs index 9fdec27..720f731 100644 --- a/src/traits.rs +++ b/src/traits.rs @@ -343,7 +343,7 @@ pub mod tests { error::Error, fetch::object_id::ObjectId, http_signatures::{generate_actor_keypair, Keypair}, - protocol::{public_key::PublicKey, verification::verify_domains_match}, + protocol::verification::verify_domains_match, }; use activitystreams_kinds::{activity::FollowType, actor::PersonType}; use once_cell::sync::Lazy; From 175b22006b53ac493c565845269dac1e6b4b2c32 Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Thu, 6 Jun 2024 00:02:41 +0200 Subject: [PATCH 16/25] Revert "Version 0.5.7" This reverts commit a25114095271d1e22657adbc63c4dd77bcf8cf46. --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 8d70ca4..eabcee6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "activitypub_federation" -version = "0.5.7" +version = "0.5.6" edition = "2021" description = "High-level Activitypub framework" keywords = ["activitypub", "activitystreams", "federation", "fediverse"] From 6edbc06a78a79cc42c9addad799fbf6f5b99e79d Mon Sep 17 00:00:00 2001 From: Nutomic Date: Tue, 11 Jun 2024 11:16:04 +0200 Subject: [PATCH 17/25] Convert content-type to lowercase for comparison (#114) * Convert content-type to lowercase for comparison * rust 1.78 * clippy priority * upgrade dep --- .woodpecker.yml | 4 ++-- Cargo.toml | 6 +++--- src/fetch/mod.rs | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/.woodpecker.yml b/.woodpecker.yml index d14bc3d..b82dd94 100644 --- a/.woodpecker.yml +++ b/.woodpecker.yml @@ -1,6 +1,6 @@ variables: - - &rust_image "rust:1.77-bullseye" - + - &rust_image "rust:1.78-bullseye" + steps: cargo_fmt: image: rustdocker/rust:nightly diff --git a/Cargo.toml b/Cargo.toml index eabcee6..522a84a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,8 +19,8 @@ warnings = "deny" deprecated = "deny" [lints.clippy] -perf = "deny" -complexity = "deny" +perf = { level = "deny", priority = -1 } +complexity = { level = "deny", priority = -1 } dbg_macro = "deny" inefficient_to_string = "deny" items-after-statements = "deny" @@ -61,7 +61,7 @@ bytes = "1.6.0" futures-core = { version = "0.3.30", default-features = false } pin-project-lite = "0.2.14" activitystreams-kinds = "0.3.0" -regex = { version = "1.10.4", default-features = false, features = ["std", "unicode-case"] } +regex = { version = "1.10.5", default-features = false, features = ["std", "unicode"] } tokio = { version = "1.37.0", features = [ "sync", "rt", diff --git a/src/fetch/mod.rs b/src/fetch/mod.rs index 67165a9..f078cf6 100644 --- a/src/fetch/mod.rs +++ b/src/fetch/mod.rs @@ -65,9 +65,9 @@ pub async fn fetch_object_http( let content_type = res .content_type .as_ref() - .and_then(|c| c.to_str().ok()) + .and_then(|c| Some(c.to_str().ok()?.to_lowercase())) .ok_or(Error::FetchInvalidContentType(res.url.clone()))?; - if !VALID_RESPONSE_CONTENT_TYPES.contains(&content_type) { + if !VALID_RESPONSE_CONTENT_TYPES.contains(&content_type.as_str()) { return Err(Error::FetchInvalidContentType(res.url)); } From 930c9288782185dbe8e5e07547b92cede2983d9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=97=8D+85CD?= <50108258+kwaa@users.noreply.github.com> Date: Wed, 10 Jul 2024 00:07:08 +0800 Subject: [PATCH 18/25] chore: update .gitignore (#115) * chore: update .gitignore * chore(gitignore): add comments --- .gitignore | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 7785759..b10a717 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,12 @@ /.idea /Cargo.lock perf.data* -flamegraph.svg \ No newline at end of file +flamegraph.svg + +# direnv +/.direnv +/.envrc + +# nix flake +/flake.nix +/flake.lock From 08af457453702b172231b00a601f831a1a0d327f Mon Sep 17 00:00:00 2001 From: Nutomic Date: Thu, 11 Jul 2024 17:42:31 +0200 Subject: [PATCH 19/25] Dont connect to broken Pleroma instance during webfinger test (#117) --- src/fetch/webfinger.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/fetch/webfinger.rs b/src/fetch/webfinger.rs index 7ca4903..8460245 100644 --- a/src/fetch/webfinger.rs +++ b/src/fetch/webfinger.rs @@ -264,8 +264,6 @@ mod tests { let data = config.to_request_data(); webfinger_resolve_actor::("LemmyDev@mastodon.social", &data).await?; - // poa.st is as of 2023-07-14 the largest Pleroma instance - webfinger_resolve_actor::("graf@poa.st", &data).await?; Ok(()) } From 8f47daa2e21b26c8cda6e1241f1b62fff7d60278 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=97=8D+85CD?= <50108258+kwaa@users.noreply.github.com> Date: Mon, 15 Jul 2024 16:20:35 +0800 Subject: [PATCH 20/25] refactor!: use `rsa` instead of `openssl` (#116) * refactor!: use `rsa` instead of `openssl` * fix: format code * fix: format code * fix: lint code * fix: format code --- Cargo.toml | 21 +++++++---- src/activity_sending.rs | 10 ++--- src/config.rs | 10 ++--- src/error.rs | 21 +++++++++-- src/http_signatures.rs | 81 +++++++++++++++++++++-------------------- 5 files changed, 82 insertions(+), 61 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 522a84a..b16df17 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,9 +21,9 @@ deprecated = "deny" [lints.clippy] perf = { level = "deny", priority = -1 } complexity = { level = "deny", priority = -1 } -dbg_macro = "deny" +dbg_macro = "deny" inefficient_to_string = "deny" -items-after-statements = "deny" +items-after-statements = "deny" implicit_clone = "deny" wildcard_imports = "deny" cast_lossless = "deny" @@ -41,10 +41,11 @@ reqwest = { version = "0.11.27", features = ["json", "stream"] } reqwest-middleware = "0.2.5" tracing = "0.1.40" base64 = "0.22.1" -openssl = "0.10.64" +rand = "0.8.5" +rsa = "0.9.6" once_cell = "1.19.0" http = "0.2.12" -sha2 = "0.10.8" +sha2 = { version = "0.10.8", features = ["oid"] } thiserror = "1.0.59" derive_builder = "0.20.0" itertools = "0.12.1" @@ -61,14 +62,19 @@ bytes = "1.6.0" futures-core = { version = "0.3.30", default-features = false } pin-project-lite = "0.2.14" activitystreams-kinds = "0.3.0" -regex = { version = "1.10.5", default-features = false, features = ["std", "unicode"] } +regex = { version = "1.10.5", default-features = false, features = [ + "std", + "unicode", +] } tokio = { version = "1.37.0", features = [ "sync", "rt", "rt-multi-thread", "time", ] } -diesel = { version = "2.1.6", features = ["postgres"], default-features = false, optional = true } +diesel = { version = "2.1.6", features = [ + "postgres", +], default-features = false, optional = true } futures = "0.3.30" moka = { version = "0.12.7", features = ["future"] } @@ -82,11 +88,10 @@ axum = { version = "0.6.20", features = [ ], default-features = false, optional = true } tower = { version = "0.4.13", optional = true } hyper = { version = "0.14", optional = true } -http-body-util = {version = "0.1.1", optional = true } +http-body-util = { version = "0.1.1", optional = true } [dev-dependencies] anyhow = "1.0.82" -rand = "0.8.5" env_logger = "0.11.3" tower-http = { version = "0.5.2", features = ["map-request-body", "util"] } axum = { version = "0.6.20", features = [ diff --git a/src/activity_sending.rs b/src/activity_sending.rs index 4af8439..f9023ce 100644 --- a/src/activity_sending.rs +++ b/src/activity_sending.rs @@ -15,12 +15,12 @@ use futures::StreamExt; use http::StatusCode; use httpdate::fmt_http_date; use itertools::Itertools; -use openssl::pkey::{PKey, Private}; use reqwest::{ header::{HeaderMap, HeaderName, HeaderValue}, Response, }; use reqwest_middleware::ClientWithMiddleware; +use rsa::{pkcs8::DecodePrivateKey, RsaPrivateKey}; use serde::Serialize; use std::{ fmt::{Debug, Display}, @@ -37,7 +37,7 @@ pub struct SendActivityTask { pub(crate) activity_id: Url, pub(crate) activity: Bytes, pub(crate) inbox: Url, - pub(crate) private_key: PKey, + pub(crate) private_key: RsaPrivateKey, pub(crate) http_signature_compat: bool, } @@ -172,7 +172,7 @@ where pub(crate) async fn get_pkey_cached( data: &Data, actor: &ActorType, -) -> Result, Error> +) -> Result where ActorType: Actor, { @@ -189,13 +189,13 @@ where // This is a mostly expensive blocking call, we don't want to tie up other tasks while this is happening let pkey = tokio::task::spawn_blocking(move || { - PKey::private_key_from_pem(private_key_pem.as_bytes()).map_err(|err| { + RsaPrivateKey::from_pkcs8_pem(&private_key_pem).map_err(|err| { Error::Other(format!("Could not create private key from PEM data:{err}")) }) }) .await .map_err(|err| Error::Other(format!("Error joining: {err}")))??; - std::result::Result::, Error>::Ok(pkey) + std::result::Result::::Ok(pkey) }) .await .map_err(|e| Error::Other(format!("cloned error: {e}"))) diff --git a/src/config.rs b/src/config.rs index f5ccfff..2015750 100644 --- a/src/config.rs +++ b/src/config.rs @@ -24,8 +24,8 @@ use async_trait::async_trait; use derive_builder::Builder; use dyn_clone::{clone_trait_object, DynClone}; use moka::future::Cache; -use openssl::pkey::{PKey, Private}; use reqwest_middleware::ClientWithMiddleware; +use rsa::{pkcs8::DecodePrivateKey, RsaPrivateKey}; use serde::de::DeserializeOwned; use std::{ ops::Deref, @@ -80,12 +80,12 @@ pub struct FederationConfig { /// This can be used to implement secure mode federation. /// #[builder(default = "None", setter(custom))] - pub(crate) signed_fetch_actor: Option)>>, + pub(crate) signed_fetch_actor: Option>, #[builder( default = "Cache::builder().max_capacity(10000).build()", setter(custom) )] - pub(crate) actor_pkey_cache: Cache>, + pub(crate) actor_pkey_cache: Cache, /// Queue for sending outgoing activities. Only optional to make builder work, its always /// present once constructed. #[builder(setter(skip))] @@ -200,8 +200,8 @@ impl FederationConfigBuilder { .private_key_pem() .expect("actor does not have a private key to sign with"); - let private_key = PKey::private_key_from_pem(private_key_pem.as_bytes()) - .expect("Could not decode PEM data"); + let private_key = + RsaPrivateKey::from_pkcs8_pem(&private_key_pem).expect("Could not decode PEM data"); self.signed_fetch_actor = Some(Some(Arc::new((actor.id(), private_key)))); self } diff --git a/src/error.rs b/src/error.rs index d2f7c87..1866e48 100644 --- a/src/error.rs +++ b/src/error.rs @@ -2,7 +2,10 @@ use crate::fetch::webfinger::WebFingerError; use http_signature_normalization_reqwest::SignError; -use openssl::error::ErrorStack; +use rsa::{ + errors::Error as RsaError, + pkcs8::{spki::Error as SpkiError, Error as Pkcs8Error}, +}; use std::string::FromUtf8Error; use tokio::task::JoinError; use url::Url; @@ -80,8 +83,20 @@ pub enum Error { Other(String), } -impl From for Error { - fn from(value: ErrorStack) -> Self { +impl From for Error { + fn from(value: RsaError) -> Self { + Error::Other(value.to_string()) + } +} + +impl From for Error { + fn from(value: Pkcs8Error) -> Self { + Error::Other(value.to_string()) + } +} + +impl From for Error { + fn from(value: SpkiError) -> Self { Error::Other(value.to_string()) } } diff --git a/src/http_signatures.rs b/src/http_signatures.rs index 1f4e15b..aa526f9 100644 --- a/src/http_signatures.rs +++ b/src/http_signatures.rs @@ -20,17 +20,17 @@ use http_signature_normalization_reqwest::{ DefaultSpawner, }; use once_cell::sync::Lazy; -use openssl::{ - hash::MessageDigest, - pkey::{PKey, Private}, - rsa::Rsa, - sign::{Signer, Verifier}, -}; use reqwest::Request; use reqwest_middleware::RequestBuilder; +use rsa::{ + pkcs8::{DecodePublicKey, EncodePrivateKey, EncodePublicKey, LineEnding}, + Pkcs1v15Sign, + RsaPrivateKey, + RsaPublicKey, +}; use serde::Deserialize; use sha2::{Digest, Sha256}; -use std::{collections::BTreeMap, fmt::Debug, io::ErrorKind, time::Duration}; +use std::{collections::BTreeMap, fmt::Debug, time::Duration}; use tracing::debug; use url::Url; @@ -46,27 +46,23 @@ pub struct Keypair { impl Keypair { /// Helper method to turn this into an openssl private key #[cfg(test)] - pub(crate) fn private_key(&self) -> Result, anyhow::Error> { - Ok(PKey::private_key_from_pem(self.private_key.as_bytes())?) + pub(crate) fn private_key(&self) -> Result { + use rsa::pkcs8::DecodePrivateKey; + + Ok(RsaPrivateKey::from_pkcs8_pem(&self.private_key)?) } } /// Generate a random asymmetric keypair for ActivityPub HTTP signatures. -pub fn generate_actor_keypair() -> Result { - let rsa = Rsa::generate(2048)?; - let pkey = PKey::from_rsa(rsa)?; - let public_key = pkey.public_key_to_pem()?; - let private_key = pkey.private_key_to_pem_pkcs8()?; - let key_to_string = |key| match String::from_utf8(key) { - Ok(s) => Ok(s), - Err(e) => Err(std::io::Error::new( - ErrorKind::Other, - format!("Failed converting key to string: {}", e), - )), - }; +pub fn generate_actor_keypair() -> Result { + let mut rng = rand::thread_rng(); + let rsa = RsaPrivateKey::new(&mut rng, 2048)?; + let pkey = RsaPublicKey::from(&rsa); + let public_key = pkey.to_public_key_pem(LineEnding::default())?; + let private_key = rsa.to_pkcs8_pem(LineEnding::default())?.to_string(); Ok(Keypair { - private_key: key_to_string(private_key)?, - public_key: key_to_string(public_key)?, + private_key, + public_key, }) } @@ -83,7 +79,7 @@ pub(crate) async fn sign_request( request_builder: RequestBuilder, actor_id: &Url, activity: Bytes, - private_key: PKey, + private_key: RsaPrivateKey, http_signature_compat: bool, ) -> Result { static CONFIG: Lazy> = @@ -106,10 +102,10 @@ pub(crate) async fn sign_request( Sha256::new(), activity, move |signing_string| { - let mut signer = Signer::new(MessageDigest::sha256(), &private_key)?; - signer.update(signing_string.as_bytes())?; - - Ok(Base64.encode(signer.sign_to_vec()?)) as Result<_, Error> + Ok(Base64.encode(private_key.sign( + Pkcs1v15Sign::new::(), + &Sha256::digest(signing_string.as_bytes()), + )?)) as Result<_, Error> }, ) .await @@ -205,15 +201,19 @@ fn verify_signature_inner( "Verifying with key {}, message {}", &public_key, &signing_string ); - let public_key = PKey::public_key_from_pem(public_key.as_bytes())?; - let mut verifier = Verifier::new(MessageDigest::sha256(), &public_key)?; - verifier.update(signing_string.as_bytes())?; + let public_key = RsaPublicKey::from_public_key_pem(public_key)?; let base64_decoded = Base64 .decode(signature) .map_err(|err| Error::Other(err.to_string()))?; - Ok(verifier.verify(&base64_decoded)?) + Ok(public_key + .verify( + Pkcs1v15Sign::new::(), + &Sha256::digest(signing_string.as_bytes()), + &base64_decoded, + ) + .is_ok()) })?; if verified { @@ -284,6 +284,7 @@ pub mod test { use crate::activity_sending::generate_request_headers; use reqwest::Client; use reqwest_middleware::ClientWithMiddleware; + use rsa::{pkcs1::DecodeRsaPrivateKey, pkcs8::DecodePrivateKey}; use std::str::FromStr; static ACTOR_ID: Lazy = Lazy::new(|| Url::parse("https://example.com/u/alice").unwrap()); @@ -306,7 +307,7 @@ pub mod test { request_builder, &ACTOR_ID, "my activity".into(), - PKey::private_key_from_pem(test_keypair().private_key.as_bytes()).unwrap(), + RsaPrivateKey::from_pkcs8_pem(&test_keypair().private_key).unwrap(), // set this to prevent created/expires headers to be generated and inserted // automatically from current time true, @@ -342,7 +343,7 @@ pub mod test { request_builder, &ACTOR_ID, "my activity".to_string().into(), - PKey::private_key_from_pem(test_keypair().private_key.as_bytes()).unwrap(), + RsaPrivateKey::from_pkcs8_pem(&test_keypair().private_key).unwrap(), false, ) .await @@ -378,13 +379,13 @@ pub mod test { } pub fn test_keypair() -> Keypair { - let rsa = Rsa::private_key_from_pem(PRIVATE_KEY.as_bytes()).unwrap(); - let pkey = PKey::from_rsa(rsa).unwrap(); - let private_key = pkey.private_key_to_pem_pkcs8().unwrap(); - let public_key = pkey.public_key_to_pem().unwrap(); + let rsa = RsaPrivateKey::from_pkcs1_pem(PRIVATE_KEY).unwrap(); + let pkey = RsaPublicKey::from(&rsa); + let public_key = pkey.to_public_key_pem(LineEnding::default()).unwrap(); + let private_key = rsa.to_pkcs8_pem(LineEnding::default()).unwrap().to_string(); Keypair { - private_key: String::from_utf8(private_key).unwrap(), - public_key: String::from_utf8(public_key).unwrap(), + private_key, + public_key, } } From 472f6ffac5cc37da2b3eadfaafb52160c61c72f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=97=8D+85CD?= <50108258+kwaa@users.noreply.github.com> Date: Tue, 16 Jul 2024 17:48:45 +0800 Subject: [PATCH 21/25] chore(deps/reqwest): enable `rustls-tls` feature (#118) --- Cargo.toml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index b16df17..970262c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,7 +37,11 @@ serde = { version = "1.0.200", features = ["derive"] } async-trait = "0.1.80" url = { version = "2.5.0", features = ["serde"] } serde_json = { version = "1.0.116", features = ["preserve_order"] } -reqwest = { version = "0.11.27", features = ["json", "stream"] } +reqwest = { version = "0.11.27", default-features = false, features = [ + "json", + "stream", + "rustls-tls", +] } reqwest-middleware = "0.2.5" tracing = "0.1.40" base64 = "0.22.1" From 4920d1a2de3c43c98753daec8cd758e0389c9809 Mon Sep 17 00:00:00 2001 From: Nutomic Date: Tue, 16 Jul 2024 12:29:14 +0200 Subject: [PATCH 22/25] Upgrade dependencies (#120) --- Cargo.toml | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 970262c..3aec378 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,10 +33,10 @@ unwrap_used = "deny" [dependencies] chrono = { version = "0.4.38", features = ["clock"], default-features = false } -serde = { version = "1.0.200", features = ["derive"] } -async-trait = "0.1.80" -url = { version = "2.5.0", features = ["serde"] } -serde_json = { version = "1.0.116", features = ["preserve_order"] } +serde = { version = "1.0.204", features = ["derive"] } +async-trait = "0.1.81" +url = { version = "2.5.2", features = ["serde"] } +serde_json = { version = "1.0.120", features = ["preserve_order"] } reqwest = { version = "0.11.27", default-features = false, features = [ "json", "stream", @@ -50,9 +50,9 @@ rsa = "0.9.6" once_cell = "1.19.0" http = "0.2.12" sha2 = { version = "0.10.8", features = ["oid"] } -thiserror = "1.0.59" +thiserror = "1.0.62" derive_builder = "0.20.0" -itertools = "0.12.1" +itertools = "0.13.0" dyn-clone = "1.0.17" enum_delegate = "0.2.0" httpdate = "1.0.3" @@ -62,7 +62,7 @@ http-signature-normalization-reqwest = { version = "0.10.0", default-features = "default-spawner", ] } http-signature-normalization = "0.7.0" -bytes = "1.6.0" +bytes = "1.6.1" futures-core = { version = "0.3.30", default-features = false } pin-project-lite = "0.2.14" activitystreams-kinds = "0.3.0" @@ -70,20 +70,20 @@ regex = { version = "1.10.5", default-features = false, features = [ "std", "unicode", ] } -tokio = { version = "1.37.0", features = [ +tokio = { version = "1.38.0", features = [ "sync", "rt", "rt-multi-thread", "time", ] } -diesel = { version = "2.1.6", features = [ +diesel = { version = "2.2.1", features = [ "postgres", ], default-features = false, optional = true } futures = "0.3.30" -moka = { version = "0.12.7", features = ["future"] } +moka = { version = "0.12.8", features = ["future"] } # Actix-web -actix-web = { version = "4.5.1", default-features = false, optional = true } +actix-web = { version = "4.8.0", default-features = false, optional = true } # Axum axum = { version = "0.6.20", features = [ @@ -92,10 +92,10 @@ axum = { version = "0.6.20", features = [ ], default-features = false, optional = true } tower = { version = "0.4.13", optional = true } hyper = { version = "0.14", optional = true } -http-body-util = { version = "0.1.1", optional = true } +http-body-util = { version = "0.1.2", optional = true } [dev-dependencies] -anyhow = "1.0.82" +anyhow = "1.0.86" env_logger = "0.11.3" tower-http = { version = "0.5.2", features = ["map-request-body", "util"] } axum = { version = "0.6.20", features = [ @@ -104,7 +104,7 @@ axum = { version = "0.6.20", features = [ "query", ], default-features = false } axum-macros = "0.3.8" -tokio = { version = "1.37.0", features = ["full"] } +tokio = { version = "1.38.0", features = ["full"] } [profile.dev] strip = "symbols" From a0e0c54b571a8c9f11e5646c4117fb7ff2cf1d70 Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Tue, 16 Jul 2024 12:29:28 +0200 Subject: [PATCH 23/25] Version 0.5.8 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 3aec378..9bece25 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "activitypub_federation" -version = "0.5.6" +version = "0.5.8" edition = "2021" description = "High-level Activitypub framework" keywords = ["activitypub", "activitystreams", "federation", "fediverse"] From d45ce32e883b1c172b870d909380873b5fc1236e Mon Sep 17 00:00:00 2001 From: Dessalines Date: Wed, 17 Jul 2024 04:00:04 -0400 Subject: [PATCH 24/25] Adding codeowners. (#119) --- .github/CODEOWNERS | 1 + 1 file changed, 1 insertion(+) create mode 100644 .github/CODEOWNERS diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 0000000..822226e --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1 @@ +* @Nutomic @dessalines From 83a156394eb841dfb11452755d4a423ae5a05f06 Mon Sep 17 00:00:00 2001 From: Dessalines Date: Wed, 17 Jul 2024 04:00:31 -0400 Subject: [PATCH 25/25] Fixing clippy. (#121) --- src/traits.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/traits.rs b/src/traits.rs index 720f731..9976bda 100644 --- a/src/traits.rs +++ b/src/traits.rs @@ -338,7 +338,7 @@ pub trait Collection: Sized { #[doc(hidden)] #[allow(clippy::unwrap_used)] pub mod tests { - use super::*; + use super::{async_trait, ActivityHandler, Actor, Data, Debug, Object, PublicKey, Url}; use crate::{ error::Error, fetch::object_id::ObjectId,