Merge branch 'LemmyNet-main' into dev

This commit is contained in:
Tangel 2024-12-16 09:09:59 +00:00 committed by GitHub
commit 36670aff51
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 283 additions and 171 deletions

View file

@ -1,6 +1,6 @@
[package]
name = "activitypub_federation"
version = "0.5.9"
version = "0.6.1"
edition = "2021"
description = "High-level Activitypub framework"
keywords = ["activitypub", "activitystreams", "federation", "fediverse"]
@ -10,8 +10,8 @@ documentation = "https://docs.rs/activitypub_federation/"
[features]
default = ["actix-web", "axum"]
actix-web = ["dep:actix-web"]
axum = ["dep:axum", "dep:tower", "dep:hyper", "dep:http-body-util"]
actix-web = ["dep:actix-web", "dep:http02"]
axum = ["dep:axum", "dep:tower"]
diesel = ["dep:diesel"]
[lints.rust]
@ -57,7 +57,6 @@ dyn-clone = "1.0.17"
enum_delegate = "0.2.0"
httpdate = "1.0.3"
http-signature-normalization-reqwest = { version = "0.12.0", default-features = false, features = [
"default-spawner",
"sha-2",
"middleware",
"default-spawner",
@ -85,25 +84,24 @@ moka = { version = "0.12.8", features = ["future"] }
# Actix-web
actix-web = { version = "4.8.0", default-features = false, optional = true }
http02 = { package = "http", version = "0.2.12", optional = true }
# Axum
axum = { git = "https://github.com/tokio-rs/axum.git", features = [
"json",
], default-features = false, optional = true }
tower = { version = "0.5.1", optional = true }
hyper = { version = "1.4.1", optional = true }
http-body-util = { version = "0.1.2", optional = true }
[dev-dependencies]
anyhow = "1.0.86"
env_logger = "0.11.3"
tower-http = { version = "0.6.1", features = ["map-request-body", "util"] }
axum = { git = "https://github.com/tokio-rs/axum.git", features = [
"http1",
"macros",
"tokio",
"query",
], default-features = false }
axum-macros = { git = "https://github.com/tokio-rs/axum.git" }
axum-extra = { version = "0.9.3", features = ["typed-header"] }
env_logger = "0.11.3"
tokio = { version = "1.40.0", features = ["full"] }
[profile.dev]

View file

@ -15,9 +15,9 @@ The next step is to allow other servers to fetch our actors and objects. For thi
# use activitypub_federation::config::FederationMiddleware;
# use axum::routing::get;
# use crate::activitypub_federation::traits::Object;
# use axum::headers::ContentType;
# use axum_extra::headers::ContentType;
# use activitypub_federation::FEDERATION_CONTENT_TYPE;
# use axum::TypedHeader;
# use axum_extra::TypedHeader;
# use axum::response::IntoResponse;
# use http::HeaderMap;
# async fn generate_user_html(_: String, _: Data<DbConnection>) -> axum::response::Response { todo!() }
@ -34,10 +34,9 @@ async fn main() -> Result<(), Error> {
.layer(FederationMiddleware::new(data));
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
let listener = tokio::net::TcpListener::bind(addr).await?;
tracing::debug!("listening on {}", addr);
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await?;
axum::serve(listener, app.into_make_service()).await?;
Ok(())
}

View file

@ -14,11 +14,11 @@ use activitypub_federation::{
traits::Object,
};
use axum::{
debug_handler,
extract::{Path, Query},
response::{IntoResponse, Response},
Json,
};
use axum_macros::debug_handler;
use http::StatusCode;
use serde::Deserialize;

View file

@ -14,13 +14,13 @@ use activitypub_federation::{
traits::Object,
};
use axum::{
debug_handler,
extract::{Path, Query},
response::IntoResponse,
routing::{get, post},
Json,
Router,
};
use axum_macros::debug_handler;
use serde::Deserialize;
use std::net::TcpListener;
use tracing::info;

View file

@ -24,9 +24,9 @@ use rsa::{pkcs8::DecodePrivateKey, RsaPrivateKey};
use serde::Serialize;
use std::{
fmt::{Debug, Display},
time::{Duration, SystemTime},
time::{Duration, Instant, SystemTime},
};
use tracing::debug;
use tracing::{debug, warn};
use url::Url;
#[derive(Clone, Debug)]
@ -92,7 +92,17 @@ impl SendActivityTask {
self.http_signature_compat,
)
.await?;
// Send the activity, and log a warning if its too slow.
let now = Instant::now();
let response = client.execute(request).await?;
let elapsed = now.elapsed().as_secs();
if elapsed > 10 {
warn!(
"Sending activity {} to {} took {}s",
self.activity_id, self.inbox, elapsed
);
}
self.handle_response(response).await
}
@ -126,8 +136,8 @@ impl SendActivityTask {
}
}
pub(crate) async fn build_tasks<'a, Activity, Datatype, ActorType>(
activity: &'a Activity,
pub(crate) async fn build_tasks<Activity, Datatype, ActorType>(
activity: &Activity,
actor: &ActorType,
inboxes: Vec<Url>,
data: &Data<Datatype>,

View file

@ -0,0 +1,30 @@
//! Remove these conversion helpers after actix-web upgrades to http 1.0
use std::str::FromStr;
pub fn header_value(v: &http02::HeaderValue) -> http::HeaderValue {
http::HeaderValue::from_bytes(v.as_bytes()).expect("can convert http types")
}
pub fn header_map<'a, H>(m: H) -> http::HeaderMap
where
H: IntoIterator<Item = (&'a http02::HeaderName, &'a http02::HeaderValue)>,
{
let mut new_map = http::HeaderMap::new();
for (n, v) in m {
new_map.insert(
http::HeaderName::from_lowercase(n.as_str().as_bytes())
.expect("can convert http types"),
header_value(v),
);
}
new_map
}
pub fn method(m: &http02::Method) -> http::Method {
http::Method::from_bytes(m.as_str().as_bytes()).expect("can convert http types")
}
pub fn uri(m: &http02::Uri) -> http::Uri {
http::Uri::from_str(&m.to_string()).expect("can convert http types")
}

View file

@ -1,5 +1,6 @@
//! Handles incoming activities, verifying HTTP signatures and other checks
use super::http_compat;
use crate::{
config::Data,
error::Error,
@ -9,7 +10,6 @@ use crate::{
};
use actix_web::{web::Bytes, HttpRequest, HttpResponse};
use serde::de::DeserializeOwned;
use std::str::FromStr;
use tracing::debug;
/// Handles incoming activities, verifying HTTP signatures and other checks
@ -28,32 +28,18 @@ where
<ActorT as Object>::Error: From<Error>,
Datatype: Clone,
{
let header_value = request
let digest_header = request
.headers()
.get("Digest")
.map(|v| reqwest::header::HeaderValue::from_str(v.to_str().unwrap_or_default()))
.and_then(std::result::Result::ok);
verify_body_hash(header_value.as_ref(), &body)?;
.map(http_compat::header_value);
verify_body_hash(digest_header.as_ref(), &body)?;
let (activity, actor) = parse_received_activity::<Activity, ActorT, _>(&body, data).await?;
let mut vec = Vec::<(_, _)>::with_capacity(request.headers().len());
request.headers().iter().for_each(|(k, v)| {
let k = reqwest::header::HeaderName::from_str(k.as_str()).expect("Failed to parse header");
let v = reqwest::header::HeaderValue::from_str(v.to_str().unwrap_or_default())
.expect("Failed to parse header");
vec.push((k, v));
});
let headers = vec.iter().map(|(k, v)| (k, v)).collect::<Vec<(_, _)>>();
verify_signature(
headers,
&reqwest::Method::from_str(request.method().as_str())
.map_err(|err| Error::Other(err.to_string()))?,
&http::Uri::from_str(&request.uri().to_string())
.map_err(|err| Error::Other(err.to_string()))?,
actor.public_key_pem(),
)?;
let headers = http_compat::header_map(request.headers());
let method = http_compat::method(request.method());
let uri = http_compat::uri(request.uri());
verify_signature(&headers, &method, &uri, actor.public_key_pem())?;
debug!("Receiving activity {}", activity.id().to_string());
activity.verify(data).await?;
@ -61,139 +47,149 @@ where
Ok(HttpResponse::Ok().finish())
}
// #[cfg(test)]
// #[allow(clippy::unwrap_used)]
// mod test {
// use super::*;
// use crate::{
// activity_sending::generate_request_headers,
// config::FederationConfig,
// fetch::object_id::ObjectId,
// http_signatures::sign_request,
// traits::tests::{DbConnection, DbUser, Follow, DB_USER_KEYPAIR},
// };
// use actix_web::test::TestRequest;
// use reqwest::Client;
// use reqwest_middleware::ClientWithMiddleware;
// use serde_json::json;
// use url::Url;
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod test {
use super::*;
use crate::{
activity_sending::generate_request_headers,
config::FederationConfig,
fetch::object_id::ObjectId,
http_signatures::sign_request,
traits::tests::{DbConnection, DbUser, Follow, DB_USER_KEYPAIR},
};
use actix_web::test::TestRequest;
use reqwest::Client;
use reqwest_middleware::ClientWithMiddleware;
use serde_json::json;
use url::Url;
// #[tokio::test]
// async fn test_receive_activity() {
// let (body, incoming_request, config) = setup_receive_test().await;
// receive_activity::<Follow, DbUser, DbConnection>(
// incoming_request.to_http_request(),
// body,
// &config.to_request_data(),
// )
// .await
// .unwrap();
// }
/// Remove this conversion helper after actix-web upgrades to http 1.0
fn header_pair(
p: (&http::HeaderName, &http::HeaderValue),
) -> (http02::HeaderName, http02::HeaderValue) {
(
http02::HeaderName::from_lowercase(p.0.as_str().as_bytes()).unwrap(),
http02::HeaderValue::from_bytes(p.1.as_bytes()).unwrap(),
)
}
// #[tokio::test]
// async fn test_receive_activity_invalid_body_signature() {
// let (_, incoming_request, config) = setup_receive_test().await;
// let err = receive_activity::<Follow, DbUser, DbConnection>(
// incoming_request.to_http_request(),
// "invalid".into(),
// &config.to_request_data(),
// )
// .await
// .err()
// .unwrap();
#[tokio::test]
async fn test_receive_activity() {
let (body, incoming_request, config) = setup_receive_test().await;
receive_activity::<Follow, DbUser, DbConnection>(
incoming_request.to_http_request(),
body,
&config.to_request_data(),
)
.await
.unwrap();
}
// assert_eq!(&err, &Error::ActivityBodyDigestInvalid)
// }
#[tokio::test]
async fn test_receive_activity_invalid_body_signature() {
let (_, incoming_request, config) = setup_receive_test().await;
let err = receive_activity::<Follow, DbUser, DbConnection>(
incoming_request.to_http_request(),
"invalid".into(),
&config.to_request_data(),
)
.await
.err()
.unwrap();
// #[tokio::test]
// async fn test_receive_activity_invalid_path() {
// let (body, incoming_request, config) = setup_receive_test().await;
// let incoming_request = incoming_request.uri("/wrong");
// let err = receive_activity::<Follow, DbUser, DbConnection>(
// incoming_request.to_http_request(),
// body,
// &config.to_request_data(),
// )
// .await
// .err()
// .unwrap();
assert_eq!(&err, &Error::ActivityBodyDigestInvalid)
}
// assert_eq!(&err, &Error::ActivitySignatureInvalid)
// }
#[tokio::test]
async fn test_receive_activity_invalid_path() {
let (body, incoming_request, config) = setup_receive_test().await;
let incoming_request = incoming_request.uri("/wrong");
let err = receive_activity::<Follow, DbUser, DbConnection>(
incoming_request.to_http_request(),
body,
&config.to_request_data(),
)
.await
.err()
.unwrap();
// #[tokio::test]
// async fn test_receive_unparseable_activity() {
// let (_, _, config) = setup_receive_test().await;
assert_eq!(&err, &Error::ActivitySignatureInvalid)
}
// let actor = Url::parse("http://ds9.lemmy.ml/u/lemmy_alpha").unwrap();
// let id = "http://localhost:123/1";
// let activity = json!({
// "actor": actor.as_str(),
// "to": ["https://www.w3.org/ns/activitystreams#Public"],
// "object": "http://ds9.lemmy.ml/post/1",
// "cc": ["http://enterprise.lemmy.ml/c/main"],
// "type": "Delete",
// "id": id
// }
// );
// let body: Bytes = serde_json::to_vec(&activity).unwrap().into();
// let incoming_request = construct_request(&body, &actor).await;
#[tokio::test]
async fn test_receive_unparseable_activity() {
let (_, _, config) = setup_receive_test().await;
// // intentionally cause a parse error by using wrong type for deser
// let res = receive_activity::<Follow, DbUser, DbConnection>(
// incoming_request.to_http_request(),
// body,
// &config.to_request_data(),
// )
// .await;
let actor = Url::parse("http://ds9.lemmy.ml/u/lemmy_alpha").unwrap();
let id = "http://localhost:123/1";
let activity = json!({
"actor": actor.as_str(),
"to": ["https://www.w3.org/ns/activitystreams#Public"],
"object": "http://ds9.lemmy.ml/post/1",
"cc": ["http://enterprise.lemmy.ml/c/main"],
"type": "Delete",
"id": id
}
);
let body: Bytes = serde_json::to_vec(&activity).unwrap().into();
let incoming_request = construct_request(&body, &actor).await;
// match res {
// Err(Error::ParseReceivedActivity(_, url)) => {
// assert_eq!(id, url.expect("has url").as_str());
// }
// _ => unreachable!(),
// }
// }
// intentionally cause a parse error by using wrong type for deser
let res = receive_activity::<Follow, DbUser, DbConnection>(
incoming_request.to_http_request(),
body,
&config.to_request_data(),
)
.await;
// async fn construct_request(body: &Bytes, actor: &Url) -> TestRequest {
// let inbox = "https://example.com/inbox";
// let headers = generate_request_headers(&Url::parse(inbox).unwrap());
// let request_builder = ClientWithMiddleware::from(Client::default())
// .post(inbox)
// .headers(headers);
// let outgoing_request = sign_request(
// request_builder,
// actor,
// body.clone(),
// DB_USER_KEYPAIR.private_key().unwrap(),
// false,
// )
// .await
// .unwrap();
// let mut incoming_request = TestRequest::post().uri(outgoing_request.url().path());
// for h in outgoing_request.headers() {
// incoming_request = incoming_request.append_header(h);
// }
// incoming_request
// }
match res {
Err(Error::ParseReceivedActivity(_, url)) => {
assert_eq!(id, url.expect("has url").as_str());
}
_ => unreachable!(),
}
}
// async fn setup_receive_test() -> (Bytes, TestRequest, FederationConfig<DbConnection>) {
// let activity = Follow {
// actor: ObjectId::parse("http://localhost:123").unwrap(),
// object: ObjectId::parse("http://localhost:124").unwrap(),
// kind: Default::default(),
// id: "http://localhost:123/1".try_into().unwrap(),
// };
// let body: Bytes = serde_json::to_vec(&activity).unwrap().into();
// let incoming_request = construct_request(&body, activity.actor.inner()).await;
async fn construct_request(body: &Bytes, actor: &Url) -> TestRequest {
let inbox = "https://example.com/inbox";
let headers = generate_request_headers(&Url::parse(inbox).unwrap());
let request_builder = ClientWithMiddleware::from(Client::default())
.post(inbox)
.headers(headers);
let outgoing_request = sign_request(
request_builder,
actor,
body.clone(),
DB_USER_KEYPAIR.private_key().unwrap(),
false,
)
.await
.unwrap();
let mut incoming_request = TestRequest::post().uri(outgoing_request.url().path());
for h in outgoing_request.headers() {
incoming_request = incoming_request.append_header(header_pair(h));
}
incoming_request
}
// let config = FederationConfig::builder()
// .domain("localhost:8002")
// .app_data(DbConnection)
// .debug(true)
// .build()
// .await
// .unwrap();
// (body, incoming_request, config)
// }
// }
async fn setup_receive_test() -> (Bytes, TestRequest, FederationConfig<DbConnection>) {
let activity = Follow {
actor: ObjectId::parse("http://localhost:123").unwrap(),
object: ObjectId::parse("http://localhost:124").unwrap(),
kind: Default::default(),
id: "http://localhost:123/1".try_into().unwrap(),
};
let body: Bytes = serde_json::to_vec(&activity).unwrap().into();
let incoming_request = construct_request(&body, activity.actor.inner()).await;
let config = FederationConfig::builder()
.domain("localhost:8002")
.app_data(DbConnection)
.debug(true)
.build()
.await
.unwrap();
(body, incoming_request, config)
}
}

View file

@ -1,5 +1,6 @@
//! Utilities for using this library with actix-web framework
mod http_compat;
pub mod inbox;
#[doc(hidden)]
pub mod middleware;

View file

@ -17,14 +17,17 @@
use crate::{
activity_queue::{create_activity_queue, ActivityQueue},
error::Error,
http_signatures::sign_request,
protocol::verification::verify_domains_match,
traits::{ActivityHandler, Actor},
};
use async_trait::async_trait;
use bytes::Bytes;
use derive_builder::Builder;
use dyn_clone::{clone_trait_object, DynClone};
use moka::future::Cache;
use reqwest_middleware::ClientWithMiddleware;
use reqwest::Request;
use reqwest_middleware::{ClientWithMiddleware, RequestBuilder};
use rsa::{pkcs8::DecodePrivateKey, RsaPrivateKey};
use serde::de::DeserializeOwned;
use std::{
@ -327,6 +330,25 @@ impl<T: Clone> Data<T> {
pub fn request_count(&self) -> u32 {
self.request_counter.load(Ordering::Relaxed)
}
/// Add HTTP signature to arbitrary request
pub async fn sign_request(&self, req: RequestBuilder, body: Bytes) -> Result<Request, Error> {
let (actor_id, private_key_pem) =
self.config
.signed_fetch_actor
.as_deref()
.ok_or(Error::Other(
"config value signed_fetch_actor is none".to_string(),
))?;
sign_request(
req,
actor_id,
body,
private_key_pem.clone(),
self.config.http_signature_compat,
)
.await
}
}
impl<T: Clone> Deref for Data<T> {

View file

@ -73,6 +73,14 @@ pub async fn fetch_object_http<T: Clone, Kind: DeserializeOwned>(
// Ensure id field matches final url after redirect
if res.object_id.as_ref() != Some(&res.url) {
if let Some(res_object_id) = res.object_id {
// If id is different but still on the same domain, attempt to request object
// again from url in id field.
if res_object_id.domain() == res.url.domain() {
return Box::pin(fetch_object_http(&res_object_id, data)).await;
}
}
// Failed to fetch the object from its specified id
return Err(Error::FetchWrongId(res.url));
}
@ -150,3 +158,34 @@ async fn fetch_object_http_with_accept<T: Clone, Kind: DeserializeOwned>(
)),
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use crate::{
config::FederationConfig,
traits::tests::{DbConnection, Person},
};
#[tokio::test]
async fn test_request_limit() -> Result<(), Error> {
let config = FederationConfig::builder()
.domain("example.com")
.app_data(DbConnection)
.http_fetch_limit(0)
.build()
.await
.unwrap();
let data = config.to_request_data();
let fetch_url = "https://example.net/".to_string();
let res: Result<FetchObjectResponse<Person>, Error> =
fetch_object_http(&Url::parse(&fetch_url).map_err(Error::UrlParse)?, &data).await;
assert_eq!(res.err(), Some(Error::RequestLimit));
Ok(())
}
}

View file

@ -92,7 +92,7 @@ where
// object found in database
if let Some(object) = db_object {
if let Some(last_refreshed_at) = object.last_refreshed_at() {
let is_local = data.config.is_local_url(&self.0);
let is_local = self.is_local(data);
if !is_local && should_refetch_object(last_refreshed_at) {
// object is outdated and should be refetched
return self.dereference_from_http(data, Some(object)).await;
@ -120,6 +120,7 @@ where
.await
.map(|o| o.ok_or(Error::NotFound.into()))?
} else {
// Don't pass in any db object, otherwise it would be returned in case http fetch fails
self.dereference_from_http(data, None).await
}
}
@ -146,6 +147,10 @@ where
Object::read_from_id(*id, data).await
}
/// Fetch object from origin instance over HTTP, then verify and parse it.
///
/// Uses Box::pin to wrap futures to reduce stack size and avoid stack overflow when
/// when fetching objects recursively.
async fn dereference_from_http(
&self,
data: &Data<<Kind as Object>::DataType>,
@ -154,7 +159,7 @@ where
where
<Kind as Object>::Error: From<Error>,
{
let res = fetch_object_http(&self.0, data).await;
let res = Box::pin(fetch_object_http(&self.0, data)).await;
if let Err(Error::ObjectDeleted(url)) = res {
if let Some(db_object) = db_object {
@ -163,11 +168,23 @@ where
return Err(Error::ObjectDeleted(url).into());
}
// If fetch failed, return the existing object from local database
if let (Err(_), Some(db_object)) = (&res, db_object) {
return Ok(db_object);
}
let res = res?;
let redirect_url = &res.url;
Kind::verify(&res.object, redirect_url, data).await?;
Kind::from_json(res.object, data).await
// Prevent overwriting local object
if data.config.is_local_url(redirect_url) {
return self
.dereference_from_db(data)
.await?
.ok_or(Error::NotFound.into());
}
Box::pin(Kind::verify(&res.object, redirect_url, data)).await?;
Box::pin(Kind::from_json(res.object, data)).await
}
/// Returns true if the object's domain matches the one defined in [[FederationConfig.domain]].