diff --git a/Cargo.toml b/Cargo.toml index 6538b94..166cb43 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "activitypub_federation" -version = "0.5.9" +version = "0.6.1" edition = "2021" description = "High-level Activitypub framework" keywords = ["activitypub", "activitystreams", "federation", "fediverse"] @@ -10,8 +10,8 @@ documentation = "https://docs.rs/activitypub_federation/" [features] default = ["actix-web", "axum"] -actix-web = ["dep:actix-web"] -axum = ["dep:axum", "dep:tower", "dep:hyper", "dep:http-body-util"] +actix-web = ["dep:actix-web", "dep:http02"] +axum = ["dep:axum", "dep:tower"] diesel = ["dep:diesel"] [lints.rust] @@ -57,7 +57,6 @@ dyn-clone = "1.0.17" enum_delegate = "0.2.0" httpdate = "1.0.3" http-signature-normalization-reqwest = { version = "0.12.0", default-features = false, features = [ - "default-spawner", "sha-2", "middleware", "default-spawner", @@ -85,25 +84,24 @@ moka = { version = "0.12.8", features = ["future"] } # Actix-web actix-web = { version = "4.8.0", default-features = false, optional = true } +http02 = { package = "http", version = "0.2.12", optional = true } # Axum axum = { git = "https://github.com/tokio-rs/axum.git", features = [ "json", ], default-features = false, optional = true } tower = { version = "0.5.1", optional = true } -hyper = { version = "1.4.1", optional = true } -http-body-util = { version = "0.1.2", optional = true } [dev-dependencies] anyhow = "1.0.86" -env_logger = "0.11.3" -tower-http = { version = "0.6.1", features = ["map-request-body", "util"] } axum = { git = "https://github.com/tokio-rs/axum.git", features = [ "http1", + "macros", "tokio", "query", ], default-features = false } -axum-macros = { git = "https://github.com/tokio-rs/axum.git" } +axum-extra = { version = "0.9.3", features = ["typed-header"] } +env_logger = "0.11.3" tokio = { version = "1.40.0", features = ["full"] } [profile.dev] diff --git a/docs/06_http_endpoints_axum.md b/docs/06_http_endpoints_axum.md index 3a33410..fc8089a 100644 --- a/docs/06_http_endpoints_axum.md +++ b/docs/06_http_endpoints_axum.md @@ -15,9 +15,9 @@ The next step is to allow other servers to fetch our actors and objects. For thi # use activitypub_federation::config::FederationMiddleware; # use axum::routing::get; # use crate::activitypub_federation::traits::Object; -# use axum::headers::ContentType; +# use axum_extra::headers::ContentType; # use activitypub_federation::FEDERATION_CONTENT_TYPE; -# use axum::TypedHeader; +# use axum_extra::TypedHeader; # use axum::response::IntoResponse; # use http::HeaderMap; # async fn generate_user_html(_: String, _: Data) -> axum::response::Response { todo!() } @@ -34,10 +34,9 @@ async fn main() -> Result<(), Error> { .layer(FederationMiddleware::new(data)); let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); + let listener = tokio::net::TcpListener::bind(addr).await?; tracing::debug!("listening on {}", addr); - axum::Server::bind(&addr) - .serve(app.into_make_service()) - .await?; + axum::serve(listener, app.into_make_service()).await?; Ok(()) } diff --git a/examples/live_federation/http.rs b/examples/live_federation/http.rs index e0d2869..5cf8545 100644 --- a/examples/live_federation/http.rs +++ b/examples/live_federation/http.rs @@ -14,11 +14,11 @@ use activitypub_federation::{ traits::Object, }; use axum::{ + debug_handler, extract::{Path, Query}, response::{IntoResponse, Response}, Json, }; -use axum_macros::debug_handler; use http::StatusCode; use serde::Deserialize; diff --git a/examples/local_federation/axum/http.rs b/examples/local_federation/axum/http.rs index 205a5a1..3ac72f6 100644 --- a/examples/local_federation/axum/http.rs +++ b/examples/local_federation/axum/http.rs @@ -14,13 +14,13 @@ use activitypub_federation::{ traits::Object, }; use axum::{ + debug_handler, extract::{Path, Query}, response::IntoResponse, routing::{get, post}, Json, Router, }; -use axum_macros::debug_handler; use serde::Deserialize; use std::net::TcpListener; use tracing::info; diff --git a/src/activity_sending.rs b/src/activity_sending.rs index 52acbc9..c801467 100644 --- a/src/activity_sending.rs +++ b/src/activity_sending.rs @@ -24,9 +24,9 @@ use rsa::{pkcs8::DecodePrivateKey, RsaPrivateKey}; use serde::Serialize; use std::{ fmt::{Debug, Display}, - time::{Duration, SystemTime}, + time::{Duration, Instant, SystemTime}, }; -use tracing::debug; +use tracing::{debug, warn}; use url::Url; #[derive(Clone, Debug)] @@ -92,7 +92,17 @@ impl SendActivityTask { self.http_signature_compat, ) .await?; + + // Send the activity, and log a warning if its too slow. + let now = Instant::now(); let response = client.execute(request).await?; + let elapsed = now.elapsed().as_secs(); + if elapsed > 10 { + warn!( + "Sending activity {} to {} took {}s", + self.activity_id, self.inbox, elapsed + ); + } self.handle_response(response).await } @@ -126,8 +136,8 @@ impl SendActivityTask { } } -pub(crate) async fn build_tasks<'a, Activity, Datatype, ActorType>( - activity: &'a Activity, +pub(crate) async fn build_tasks( + activity: &Activity, actor: &ActorType, inboxes: Vec, data: &Data, diff --git a/src/actix_web/http_compat.rs b/src/actix_web/http_compat.rs new file mode 100644 index 0000000..b605444 --- /dev/null +++ b/src/actix_web/http_compat.rs @@ -0,0 +1,30 @@ +//! Remove these conversion helpers after actix-web upgrades to http 1.0 + +use std::str::FromStr; + +pub fn header_value(v: &http02::HeaderValue) -> http::HeaderValue { + http::HeaderValue::from_bytes(v.as_bytes()).expect("can convert http types") +} + +pub fn header_map<'a, H>(m: H) -> http::HeaderMap +where + H: IntoIterator, +{ + let mut new_map = http::HeaderMap::new(); + for (n, v) in m { + new_map.insert( + http::HeaderName::from_lowercase(n.as_str().as_bytes()) + .expect("can convert http types"), + header_value(v), + ); + } + new_map +} + +pub fn method(m: &http02::Method) -> http::Method { + http::Method::from_bytes(m.as_str().as_bytes()).expect("can convert http types") +} + +pub fn uri(m: &http02::Uri) -> http::Uri { + http::Uri::from_str(&m.to_string()).expect("can convert http types") +} diff --git a/src/actix_web/inbox.rs b/src/actix_web/inbox.rs index cf7fab5..49de07e 100644 --- a/src/actix_web/inbox.rs +++ b/src/actix_web/inbox.rs @@ -1,5 +1,6 @@ //! Handles incoming activities, verifying HTTP signatures and other checks +use super::http_compat; use crate::{ config::Data, error::Error, @@ -9,7 +10,6 @@ use crate::{ }; use actix_web::{web::Bytes, HttpRequest, HttpResponse}; use serde::de::DeserializeOwned; -use std::str::FromStr; use tracing::debug; /// Handles incoming activities, verifying HTTP signatures and other checks @@ -28,32 +28,18 @@ where ::Error: From, Datatype: Clone, { - let header_value = request + let digest_header = request .headers() .get("Digest") - .map(|v| reqwest::header::HeaderValue::from_str(v.to_str().unwrap_or_default())) - .and_then(std::result::Result::ok); - verify_body_hash(header_value.as_ref(), &body)?; + .map(http_compat::header_value); + verify_body_hash(digest_header.as_ref(), &body)?; let (activity, actor) = parse_received_activity::(&body, data).await?; - let mut vec = Vec::<(_, _)>::with_capacity(request.headers().len()); - request.headers().iter().for_each(|(k, v)| { - let k = reqwest::header::HeaderName::from_str(k.as_str()).expect("Failed to parse header"); - let v = reqwest::header::HeaderValue::from_str(v.to_str().unwrap_or_default()) - .expect("Failed to parse header"); - vec.push((k, v)); - }); - let headers = vec.iter().map(|(k, v)| (k, v)).collect::>(); - - verify_signature( - headers, - &reqwest::Method::from_str(request.method().as_str()) - .map_err(|err| Error::Other(err.to_string()))?, - &http::Uri::from_str(&request.uri().to_string()) - .map_err(|err| Error::Other(err.to_string()))?, - actor.public_key_pem(), - )?; + let headers = http_compat::header_map(request.headers()); + let method = http_compat::method(request.method()); + let uri = http_compat::uri(request.uri()); + verify_signature(&headers, &method, &uri, actor.public_key_pem())?; debug!("Receiving activity {}", activity.id().to_string()); activity.verify(data).await?; @@ -61,139 +47,149 @@ where Ok(HttpResponse::Ok().finish()) } -// #[cfg(test)] -// #[allow(clippy::unwrap_used)] -// mod test { -// use super::*; -// use crate::{ -// activity_sending::generate_request_headers, -// config::FederationConfig, -// fetch::object_id::ObjectId, -// http_signatures::sign_request, -// traits::tests::{DbConnection, DbUser, Follow, DB_USER_KEYPAIR}, -// }; -// use actix_web::test::TestRequest; -// use reqwest::Client; -// use reqwest_middleware::ClientWithMiddleware; -// use serde_json::json; -// use url::Url; +#[cfg(test)] +#[allow(clippy::unwrap_used)] +mod test { + use super::*; + use crate::{ + activity_sending::generate_request_headers, + config::FederationConfig, + fetch::object_id::ObjectId, + http_signatures::sign_request, + traits::tests::{DbConnection, DbUser, Follow, DB_USER_KEYPAIR}, + }; + use actix_web::test::TestRequest; + use reqwest::Client; + use reqwest_middleware::ClientWithMiddleware; + use serde_json::json; + use url::Url; -// #[tokio::test] -// async fn test_receive_activity() { -// let (body, incoming_request, config) = setup_receive_test().await; -// receive_activity::( -// incoming_request.to_http_request(), -// body, -// &config.to_request_data(), -// ) -// .await -// .unwrap(); -// } + /// Remove this conversion helper after actix-web upgrades to http 1.0 + fn header_pair( + p: (&http::HeaderName, &http::HeaderValue), + ) -> (http02::HeaderName, http02::HeaderValue) { + ( + http02::HeaderName::from_lowercase(p.0.as_str().as_bytes()).unwrap(), + http02::HeaderValue::from_bytes(p.1.as_bytes()).unwrap(), + ) + } -// #[tokio::test] -// async fn test_receive_activity_invalid_body_signature() { -// let (_, incoming_request, config) = setup_receive_test().await; -// let err = receive_activity::( -// incoming_request.to_http_request(), -// "invalid".into(), -// &config.to_request_data(), -// ) -// .await -// .err() -// .unwrap(); + #[tokio::test] + async fn test_receive_activity() { + let (body, incoming_request, config) = setup_receive_test().await; + receive_activity::( + incoming_request.to_http_request(), + body, + &config.to_request_data(), + ) + .await + .unwrap(); + } -// assert_eq!(&err, &Error::ActivityBodyDigestInvalid) -// } + #[tokio::test] + async fn test_receive_activity_invalid_body_signature() { + let (_, incoming_request, config) = setup_receive_test().await; + let err = receive_activity::( + incoming_request.to_http_request(), + "invalid".into(), + &config.to_request_data(), + ) + .await + .err() + .unwrap(); -// #[tokio::test] -// async fn test_receive_activity_invalid_path() { -// let (body, incoming_request, config) = setup_receive_test().await; -// let incoming_request = incoming_request.uri("/wrong"); -// let err = receive_activity::( -// incoming_request.to_http_request(), -// body, -// &config.to_request_data(), -// ) -// .await -// .err() -// .unwrap(); + assert_eq!(&err, &Error::ActivityBodyDigestInvalid) + } -// assert_eq!(&err, &Error::ActivitySignatureInvalid) -// } + #[tokio::test] + async fn test_receive_activity_invalid_path() { + let (body, incoming_request, config) = setup_receive_test().await; + let incoming_request = incoming_request.uri("/wrong"); + let err = receive_activity::( + incoming_request.to_http_request(), + body, + &config.to_request_data(), + ) + .await + .err() + .unwrap(); -// #[tokio::test] -// async fn test_receive_unparseable_activity() { -// let (_, _, config) = setup_receive_test().await; + assert_eq!(&err, &Error::ActivitySignatureInvalid) + } -// let actor = Url::parse("http://ds9.lemmy.ml/u/lemmy_alpha").unwrap(); -// let id = "http://localhost:123/1"; -// let activity = json!({ -// "actor": actor.as_str(), -// "to": ["https://www.w3.org/ns/activitystreams#Public"], -// "object": "http://ds9.lemmy.ml/post/1", -// "cc": ["http://enterprise.lemmy.ml/c/main"], -// "type": "Delete", -// "id": id -// } -// ); -// let body: Bytes = serde_json::to_vec(&activity).unwrap().into(); -// let incoming_request = construct_request(&body, &actor).await; + #[tokio::test] + async fn test_receive_unparseable_activity() { + let (_, _, config) = setup_receive_test().await; -// // intentionally cause a parse error by using wrong type for deser -// let res = receive_activity::( -// incoming_request.to_http_request(), -// body, -// &config.to_request_data(), -// ) -// .await; + let actor = Url::parse("http://ds9.lemmy.ml/u/lemmy_alpha").unwrap(); + let id = "http://localhost:123/1"; + let activity = json!({ + "actor": actor.as_str(), + "to": ["https://www.w3.org/ns/activitystreams#Public"], + "object": "http://ds9.lemmy.ml/post/1", + "cc": ["http://enterprise.lemmy.ml/c/main"], + "type": "Delete", + "id": id + } + ); + let body: Bytes = serde_json::to_vec(&activity).unwrap().into(); + let incoming_request = construct_request(&body, &actor).await; -// match res { -// Err(Error::ParseReceivedActivity(_, url)) => { -// assert_eq!(id, url.expect("has url").as_str()); -// } -// _ => unreachable!(), -// } -// } + // intentionally cause a parse error by using wrong type for deser + let res = receive_activity::( + incoming_request.to_http_request(), + body, + &config.to_request_data(), + ) + .await; -// async fn construct_request(body: &Bytes, actor: &Url) -> TestRequest { -// let inbox = "https://example.com/inbox"; -// let headers = generate_request_headers(&Url::parse(inbox).unwrap()); -// let request_builder = ClientWithMiddleware::from(Client::default()) -// .post(inbox) -// .headers(headers); -// let outgoing_request = sign_request( -// request_builder, -// actor, -// body.clone(), -// DB_USER_KEYPAIR.private_key().unwrap(), -// false, -// ) -// .await -// .unwrap(); -// let mut incoming_request = TestRequest::post().uri(outgoing_request.url().path()); -// for h in outgoing_request.headers() { -// incoming_request = incoming_request.append_header(h); -// } -// incoming_request -// } + match res { + Err(Error::ParseReceivedActivity(_, url)) => { + assert_eq!(id, url.expect("has url").as_str()); + } + _ => unreachable!(), + } + } -// async fn setup_receive_test() -> (Bytes, TestRequest, FederationConfig) { -// let activity = Follow { -// actor: ObjectId::parse("http://localhost:123").unwrap(), -// object: ObjectId::parse("http://localhost:124").unwrap(), -// kind: Default::default(), -// id: "http://localhost:123/1".try_into().unwrap(), -// }; -// let body: Bytes = serde_json::to_vec(&activity).unwrap().into(); -// let incoming_request = construct_request(&body, activity.actor.inner()).await; + async fn construct_request(body: &Bytes, actor: &Url) -> TestRequest { + let inbox = "https://example.com/inbox"; + let headers = generate_request_headers(&Url::parse(inbox).unwrap()); + let request_builder = ClientWithMiddleware::from(Client::default()) + .post(inbox) + .headers(headers); + let outgoing_request = sign_request( + request_builder, + actor, + body.clone(), + DB_USER_KEYPAIR.private_key().unwrap(), + false, + ) + .await + .unwrap(); + let mut incoming_request = TestRequest::post().uri(outgoing_request.url().path()); + for h in outgoing_request.headers() { + incoming_request = incoming_request.append_header(header_pair(h)); + } + incoming_request + } -// let config = FederationConfig::builder() -// .domain("localhost:8002") -// .app_data(DbConnection) -// .debug(true) -// .build() -// .await -// .unwrap(); -// (body, incoming_request, config) -// } -// } + async fn setup_receive_test() -> (Bytes, TestRequest, FederationConfig) { + let activity = Follow { + actor: ObjectId::parse("http://localhost:123").unwrap(), + object: ObjectId::parse("http://localhost:124").unwrap(), + kind: Default::default(), + id: "http://localhost:123/1".try_into().unwrap(), + }; + let body: Bytes = serde_json::to_vec(&activity).unwrap().into(); + let incoming_request = construct_request(&body, activity.actor.inner()).await; + + let config = FederationConfig::builder() + .domain("localhost:8002") + .app_data(DbConnection) + .debug(true) + .build() + .await + .unwrap(); + (body, incoming_request, config) + } +} \ No newline at end of file diff --git a/src/actix_web/mod.rs b/src/actix_web/mod.rs index 918fa0b..3145be3 100644 --- a/src/actix_web/mod.rs +++ b/src/actix_web/mod.rs @@ -1,5 +1,6 @@ //! Utilities for using this library with actix-web framework +mod http_compat; pub mod inbox; #[doc(hidden)] pub mod middleware; diff --git a/src/config.rs b/src/config.rs index 2015750..63af704 100644 --- a/src/config.rs +++ b/src/config.rs @@ -17,14 +17,17 @@ use crate::{ activity_queue::{create_activity_queue, ActivityQueue}, error::Error, + http_signatures::sign_request, protocol::verification::verify_domains_match, traits::{ActivityHandler, Actor}, }; use async_trait::async_trait; +use bytes::Bytes; use derive_builder::Builder; use dyn_clone::{clone_trait_object, DynClone}; use moka::future::Cache; -use reqwest_middleware::ClientWithMiddleware; +use reqwest::Request; +use reqwest_middleware::{ClientWithMiddleware, RequestBuilder}; use rsa::{pkcs8::DecodePrivateKey, RsaPrivateKey}; use serde::de::DeserializeOwned; use std::{ @@ -327,6 +330,25 @@ impl Data { pub fn request_count(&self) -> u32 { self.request_counter.load(Ordering::Relaxed) } + + /// Add HTTP signature to arbitrary request + pub async fn sign_request(&self, req: RequestBuilder, body: Bytes) -> Result { + let (actor_id, private_key_pem) = + self.config + .signed_fetch_actor + .as_deref() + .ok_or(Error::Other( + "config value signed_fetch_actor is none".to_string(), + ))?; + sign_request( + req, + actor_id, + body, + private_key_pem.clone(), + self.config.http_signature_compat, + ) + .await + } } impl Deref for Data { diff --git a/src/fetch/mod.rs b/src/fetch/mod.rs index 4f6be06..a310a95 100644 --- a/src/fetch/mod.rs +++ b/src/fetch/mod.rs @@ -73,6 +73,14 @@ pub async fn fetch_object_http( // Ensure id field matches final url after redirect if res.object_id.as_ref() != Some(&res.url) { + if let Some(res_object_id) = res.object_id { + // If id is different but still on the same domain, attempt to request object + // again from url in id field. + if res_object_id.domain() == res.url.domain() { + return Box::pin(fetch_object_http(&res_object_id, data)).await; + } + } + // Failed to fetch the object from its specified id return Err(Error::FetchWrongId(res.url)); } @@ -150,3 +158,34 @@ async fn fetch_object_http_with_accept( )), } } + +#[cfg(test)] +#[allow(clippy::unwrap_used)] +mod tests { + use super::*; + use crate::{ + config::FederationConfig, + traits::tests::{DbConnection, Person}, + }; + + #[tokio::test] + async fn test_request_limit() -> Result<(), Error> { + let config = FederationConfig::builder() + .domain("example.com") + .app_data(DbConnection) + .http_fetch_limit(0) + .build() + .await + .unwrap(); + let data = config.to_request_data(); + + let fetch_url = "https://example.net/".to_string(); + + let res: Result, Error> = + fetch_object_http(&Url::parse(&fetch_url).map_err(Error::UrlParse)?, &data).await; + + assert_eq!(res.err(), Some(Error::RequestLimit)); + + Ok(()) + } +} diff --git a/src/fetch/object_id.rs b/src/fetch/object_id.rs index 7d9452c..dc2a4c9 100644 --- a/src/fetch/object_id.rs +++ b/src/fetch/object_id.rs @@ -92,7 +92,7 @@ where // object found in database if let Some(object) = db_object { if let Some(last_refreshed_at) = object.last_refreshed_at() { - let is_local = data.config.is_local_url(&self.0); + let is_local = self.is_local(data); if !is_local && should_refetch_object(last_refreshed_at) { // object is outdated and should be refetched return self.dereference_from_http(data, Some(object)).await; @@ -120,6 +120,7 @@ where .await .map(|o| o.ok_or(Error::NotFound.into()))? } else { + // Don't pass in any db object, otherwise it would be returned in case http fetch fails self.dereference_from_http(data, None).await } } @@ -146,6 +147,10 @@ where Object::read_from_id(*id, data).await } + /// Fetch object from origin instance over HTTP, then verify and parse it. + /// + /// Uses Box::pin to wrap futures to reduce stack size and avoid stack overflow when + /// when fetching objects recursively. async fn dereference_from_http( &self, data: &Data<::DataType>, @@ -154,7 +159,7 @@ where where ::Error: From, { - let res = fetch_object_http(&self.0, data).await; + let res = Box::pin(fetch_object_http(&self.0, data)).await; if let Err(Error::ObjectDeleted(url)) = res { if let Some(db_object) = db_object { @@ -163,11 +168,23 @@ where return Err(Error::ObjectDeleted(url).into()); } + // If fetch failed, return the existing object from local database + if let (Err(_), Some(db_object)) = (&res, db_object) { + return Ok(db_object); + } let res = res?; let redirect_url = &res.url; - Kind::verify(&res.object, redirect_url, data).await?; - Kind::from_json(res.object, data).await + // Prevent overwriting local object + if data.config.is_local_url(redirect_url) { + return self + .dereference_from_db(data) + .await? + .ok_or(Error::NotFound.into()); + } + + Box::pin(Kind::verify(&res.object, redirect_url, data)).await?; + Box::pin(Kind::from_json(res.object, data)).await } /// Returns true if the object's domain matches the one defined in [[FederationConfig.domain]].