Merge branch 'main' of https://github.com/LemmyNet/activitypub-federation-rust into LemmyNet-main
This commit is contained in:
commit
8615763a0b
11 changed files with 283 additions and 171 deletions
16
Cargo.toml
16
Cargo.toml
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "activitypub_federation"
|
||||
version = "0.5.9"
|
||||
version = "0.6.1"
|
||||
edition = "2021"
|
||||
description = "High-level Activitypub framework"
|
||||
keywords = ["activitypub", "activitystreams", "federation", "fediverse"]
|
||||
|
|
@ -10,8 +10,8 @@ documentation = "https://docs.rs/activitypub_federation/"
|
|||
|
||||
[features]
|
||||
default = ["actix-web", "axum"]
|
||||
actix-web = ["dep:actix-web"]
|
||||
axum = ["dep:axum", "dep:tower", "dep:hyper", "dep:http-body-util"]
|
||||
actix-web = ["dep:actix-web", "dep:http02"]
|
||||
axum = ["dep:axum", "dep:tower"]
|
||||
diesel = ["dep:diesel"]
|
||||
|
||||
[lints.rust]
|
||||
|
|
@ -57,7 +57,6 @@ dyn-clone = "1.0.17"
|
|||
enum_delegate = "0.2.0"
|
||||
httpdate = "1.0.3"
|
||||
http-signature-normalization-reqwest = { version = "0.12.0", default-features = false, features = [
|
||||
"default-spawner",
|
||||
"sha-2",
|
||||
"middleware",
|
||||
"default-spawner",
|
||||
|
|
@ -85,25 +84,24 @@ moka = { version = "0.12.8", features = ["future"] }
|
|||
|
||||
# Actix-web
|
||||
actix-web = { version = "4.8.0", default-features = false, optional = true }
|
||||
http02 = { package = "http", version = "0.2.12", optional = true }
|
||||
|
||||
# Axum
|
||||
axum = { git = "https://github.com/tokio-rs/axum.git", features = [
|
||||
"json",
|
||||
], default-features = false, optional = true }
|
||||
tower = { version = "0.5.1", optional = true }
|
||||
hyper = { version = "1.4.1", optional = true }
|
||||
http-body-util = { version = "0.1.2", optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
anyhow = "1.0.86"
|
||||
env_logger = "0.11.3"
|
||||
tower-http = { version = "0.6.1", features = ["map-request-body", "util"] }
|
||||
axum = { git = "https://github.com/tokio-rs/axum.git", features = [
|
||||
"http1",
|
||||
"macros",
|
||||
"tokio",
|
||||
"query",
|
||||
], default-features = false }
|
||||
axum-macros = { git = "https://github.com/tokio-rs/axum.git" }
|
||||
axum-extra = { version = "0.9.3", features = ["typed-header"] }
|
||||
env_logger = "0.11.3"
|
||||
tokio = { version = "1.40.0", features = ["full"] }
|
||||
|
||||
[profile.dev]
|
||||
|
|
|
|||
|
|
@ -15,9 +15,9 @@ The next step is to allow other servers to fetch our actors and objects. For thi
|
|||
# use activitypub_federation::config::FederationMiddleware;
|
||||
# use axum::routing::get;
|
||||
# use crate::activitypub_federation::traits::Object;
|
||||
# use axum::headers::ContentType;
|
||||
# use axum_extra::headers::ContentType;
|
||||
# use activitypub_federation::FEDERATION_CONTENT_TYPE;
|
||||
# use axum::TypedHeader;
|
||||
# use axum_extra::TypedHeader;
|
||||
# use axum::response::IntoResponse;
|
||||
# use http::HeaderMap;
|
||||
# async fn generate_user_html(_: String, _: Data<DbConnection>) -> axum::response::Response { todo!() }
|
||||
|
|
@ -34,10 +34,9 @@ async fn main() -> Result<(), Error> {
|
|||
.layer(FederationMiddleware::new(data));
|
||||
|
||||
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
|
||||
let listener = tokio::net::TcpListener::bind(addr).await?;
|
||||
tracing::debug!("listening on {}", addr);
|
||||
axum::Server::bind(&addr)
|
||||
.serve(app.into_make_service())
|
||||
.await?;
|
||||
axum::serve(listener, app.into_make_service()).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -14,11 +14,11 @@ use activitypub_federation::{
|
|||
traits::Object,
|
||||
};
|
||||
use axum::{
|
||||
debug_handler,
|
||||
extract::{Path, Query},
|
||||
response::{IntoResponse, Response},
|
||||
Json,
|
||||
};
|
||||
use axum_macros::debug_handler;
|
||||
use http::StatusCode;
|
||||
use serde::Deserialize;
|
||||
|
||||
|
|
|
|||
|
|
@ -14,13 +14,13 @@ use activitypub_federation::{
|
|||
traits::Object,
|
||||
};
|
||||
use axum::{
|
||||
debug_handler,
|
||||
extract::{Path, Query},
|
||||
response::IntoResponse,
|
||||
routing::{get, post},
|
||||
Json,
|
||||
Router,
|
||||
};
|
||||
use axum_macros::debug_handler;
|
||||
use serde::Deserialize;
|
||||
use std::net::TcpListener;
|
||||
use tracing::info;
|
||||
|
|
|
|||
|
|
@ -24,9 +24,9 @@ use rsa::{pkcs8::DecodePrivateKey, RsaPrivateKey};
|
|||
use serde::Serialize;
|
||||
use std::{
|
||||
fmt::{Debug, Display},
|
||||
time::{Duration, SystemTime},
|
||||
time::{Duration, Instant, SystemTime},
|
||||
};
|
||||
use tracing::debug;
|
||||
use tracing::{debug, warn};
|
||||
use url::Url;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
|
|
@ -92,7 +92,17 @@ impl SendActivityTask {
|
|||
self.http_signature_compat,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Send the activity, and log a warning if its too slow.
|
||||
let now = Instant::now();
|
||||
let response = client.execute(request).await?;
|
||||
let elapsed = now.elapsed().as_secs();
|
||||
if elapsed > 10 {
|
||||
warn!(
|
||||
"Sending activity {} to {} took {}s",
|
||||
self.activity_id, self.inbox, elapsed
|
||||
);
|
||||
}
|
||||
self.handle_response(response).await
|
||||
}
|
||||
|
||||
|
|
@ -126,8 +136,8 @@ impl SendActivityTask {
|
|||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn build_tasks<'a, Activity, Datatype, ActorType>(
|
||||
activity: &'a Activity,
|
||||
pub(crate) async fn build_tasks<Activity, Datatype, ActorType>(
|
||||
activity: &Activity,
|
||||
actor: &ActorType,
|
||||
inboxes: Vec<Url>,
|
||||
data: &Data<Datatype>,
|
||||
|
|
|
|||
30
src/actix_web/http_compat.rs
Normal file
30
src/actix_web/http_compat.rs
Normal file
|
|
@ -0,0 +1,30 @@
|
|||
//! Remove these conversion helpers after actix-web upgrades to http 1.0
|
||||
|
||||
use std::str::FromStr;
|
||||
|
||||
pub fn header_value(v: &http02::HeaderValue) -> http::HeaderValue {
|
||||
http::HeaderValue::from_bytes(v.as_bytes()).expect("can convert http types")
|
||||
}
|
||||
|
||||
pub fn header_map<'a, H>(m: H) -> http::HeaderMap
|
||||
where
|
||||
H: IntoIterator<Item = (&'a http02::HeaderName, &'a http02::HeaderValue)>,
|
||||
{
|
||||
let mut new_map = http::HeaderMap::new();
|
||||
for (n, v) in m {
|
||||
new_map.insert(
|
||||
http::HeaderName::from_lowercase(n.as_str().as_bytes())
|
||||
.expect("can convert http types"),
|
||||
header_value(v),
|
||||
);
|
||||
}
|
||||
new_map
|
||||
}
|
||||
|
||||
pub fn method(m: &http02::Method) -> http::Method {
|
||||
http::Method::from_bytes(m.as_str().as_bytes()).expect("can convert http types")
|
||||
}
|
||||
|
||||
pub fn uri(m: &http02::Uri) -> http::Uri {
|
||||
http::Uri::from_str(&m.to_string()).expect("can convert http types")
|
||||
}
|
||||
|
|
@ -1,5 +1,6 @@
|
|||
//! Handles incoming activities, verifying HTTP signatures and other checks
|
||||
|
||||
use super::http_compat;
|
||||
use crate::{
|
||||
config::Data,
|
||||
error::Error,
|
||||
|
|
@ -9,7 +10,6 @@ use crate::{
|
|||
};
|
||||
use actix_web::{web::Bytes, HttpRequest, HttpResponse};
|
||||
use serde::de::DeserializeOwned;
|
||||
use std::str::FromStr;
|
||||
use tracing::debug;
|
||||
|
||||
/// Handles incoming activities, verifying HTTP signatures and other checks
|
||||
|
|
@ -28,32 +28,18 @@ where
|
|||
<ActorT as Object>::Error: From<Error>,
|
||||
Datatype: Clone,
|
||||
{
|
||||
let header_value = request
|
||||
let digest_header = request
|
||||
.headers()
|
||||
.get("Digest")
|
||||
.map(|v| reqwest::header::HeaderValue::from_str(v.to_str().unwrap_or_default()))
|
||||
.and_then(std::result::Result::ok);
|
||||
verify_body_hash(header_value.as_ref(), &body)?;
|
||||
.map(http_compat::header_value);
|
||||
verify_body_hash(digest_header.as_ref(), &body)?;
|
||||
|
||||
let (activity, actor) = parse_received_activity::<Activity, ActorT, _>(&body, data).await?;
|
||||
|
||||
let mut vec = Vec::<(_, _)>::with_capacity(request.headers().len());
|
||||
request.headers().iter().for_each(|(k, v)| {
|
||||
let k = reqwest::header::HeaderName::from_str(k.as_str()).expect("Failed to parse header");
|
||||
let v = reqwest::header::HeaderValue::from_str(v.to_str().unwrap_or_default())
|
||||
.expect("Failed to parse header");
|
||||
vec.push((k, v));
|
||||
});
|
||||
let headers = vec.iter().map(|(k, v)| (k, v)).collect::<Vec<(_, _)>>();
|
||||
|
||||
verify_signature(
|
||||
headers,
|
||||
&reqwest::Method::from_str(request.method().as_str())
|
||||
.map_err(|err| Error::Other(err.to_string()))?,
|
||||
&http::Uri::from_str(&request.uri().to_string())
|
||||
.map_err(|err| Error::Other(err.to_string()))?,
|
||||
actor.public_key_pem(),
|
||||
)?;
|
||||
let headers = http_compat::header_map(request.headers());
|
||||
let method = http_compat::method(request.method());
|
||||
let uri = http_compat::uri(request.uri());
|
||||
verify_signature(&headers, &method, &uri, actor.public_key_pem())?;
|
||||
|
||||
debug!("Receiving activity {}", activity.id().to_string());
|
||||
activity.verify(data).await?;
|
||||
|
|
@ -61,139 +47,149 @@ where
|
|||
Ok(HttpResponse::Ok().finish())
|
||||
}
|
||||
|
||||
// #[cfg(test)]
|
||||
// #[allow(clippy::unwrap_used)]
|
||||
// mod test {
|
||||
// use super::*;
|
||||
// use crate::{
|
||||
// activity_sending::generate_request_headers,
|
||||
// config::FederationConfig,
|
||||
// fetch::object_id::ObjectId,
|
||||
// http_signatures::sign_request,
|
||||
// traits::tests::{DbConnection, DbUser, Follow, DB_USER_KEYPAIR},
|
||||
// };
|
||||
// use actix_web::test::TestRequest;
|
||||
// use reqwest::Client;
|
||||
// use reqwest_middleware::ClientWithMiddleware;
|
||||
// use serde_json::json;
|
||||
// use url::Url;
|
||||
#[cfg(test)]
|
||||
#[allow(clippy::unwrap_used)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use crate::{
|
||||
activity_sending::generate_request_headers,
|
||||
config::FederationConfig,
|
||||
fetch::object_id::ObjectId,
|
||||
http_signatures::sign_request,
|
||||
traits::tests::{DbConnection, DbUser, Follow, DB_USER_KEYPAIR},
|
||||
};
|
||||
use actix_web::test::TestRequest;
|
||||
use reqwest::Client;
|
||||
use reqwest_middleware::ClientWithMiddleware;
|
||||
use serde_json::json;
|
||||
use url::Url;
|
||||
|
||||
// #[tokio::test]
|
||||
// async fn test_receive_activity() {
|
||||
// let (body, incoming_request, config) = setup_receive_test().await;
|
||||
// receive_activity::<Follow, DbUser, DbConnection>(
|
||||
// incoming_request.to_http_request(),
|
||||
// body,
|
||||
// &config.to_request_data(),
|
||||
// )
|
||||
// .await
|
||||
// .unwrap();
|
||||
// }
|
||||
/// Remove this conversion helper after actix-web upgrades to http 1.0
|
||||
fn header_pair(
|
||||
p: (&http::HeaderName, &http::HeaderValue),
|
||||
) -> (http02::HeaderName, http02::HeaderValue) {
|
||||
(
|
||||
http02::HeaderName::from_lowercase(p.0.as_str().as_bytes()).unwrap(),
|
||||
http02::HeaderValue::from_bytes(p.1.as_bytes()).unwrap(),
|
||||
)
|
||||
}
|
||||
|
||||
// #[tokio::test]
|
||||
// async fn test_receive_activity_invalid_body_signature() {
|
||||
// let (_, incoming_request, config) = setup_receive_test().await;
|
||||
// let err = receive_activity::<Follow, DbUser, DbConnection>(
|
||||
// incoming_request.to_http_request(),
|
||||
// "invalid".into(),
|
||||
// &config.to_request_data(),
|
||||
// )
|
||||
// .await
|
||||
// .err()
|
||||
// .unwrap();
|
||||
#[tokio::test]
|
||||
async fn test_receive_activity() {
|
||||
let (body, incoming_request, config) = setup_receive_test().await;
|
||||
receive_activity::<Follow, DbUser, DbConnection>(
|
||||
incoming_request.to_http_request(),
|
||||
body,
|
||||
&config.to_request_data(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// assert_eq!(&err, &Error::ActivityBodyDigestInvalid)
|
||||
// }
|
||||
#[tokio::test]
|
||||
async fn test_receive_activity_invalid_body_signature() {
|
||||
let (_, incoming_request, config) = setup_receive_test().await;
|
||||
let err = receive_activity::<Follow, DbUser, DbConnection>(
|
||||
incoming_request.to_http_request(),
|
||||
"invalid".into(),
|
||||
&config.to_request_data(),
|
||||
)
|
||||
.await
|
||||
.err()
|
||||
.unwrap();
|
||||
|
||||
// #[tokio::test]
|
||||
// async fn test_receive_activity_invalid_path() {
|
||||
// let (body, incoming_request, config) = setup_receive_test().await;
|
||||
// let incoming_request = incoming_request.uri("/wrong");
|
||||
// let err = receive_activity::<Follow, DbUser, DbConnection>(
|
||||
// incoming_request.to_http_request(),
|
||||
// body,
|
||||
// &config.to_request_data(),
|
||||
// )
|
||||
// .await
|
||||
// .err()
|
||||
// .unwrap();
|
||||
assert_eq!(&err, &Error::ActivityBodyDigestInvalid)
|
||||
}
|
||||
|
||||
// assert_eq!(&err, &Error::ActivitySignatureInvalid)
|
||||
// }
|
||||
#[tokio::test]
|
||||
async fn test_receive_activity_invalid_path() {
|
||||
let (body, incoming_request, config) = setup_receive_test().await;
|
||||
let incoming_request = incoming_request.uri("/wrong");
|
||||
let err = receive_activity::<Follow, DbUser, DbConnection>(
|
||||
incoming_request.to_http_request(),
|
||||
body,
|
||||
&config.to_request_data(),
|
||||
)
|
||||
.await
|
||||
.err()
|
||||
.unwrap();
|
||||
|
||||
// #[tokio::test]
|
||||
// async fn test_receive_unparseable_activity() {
|
||||
// let (_, _, config) = setup_receive_test().await;
|
||||
assert_eq!(&err, &Error::ActivitySignatureInvalid)
|
||||
}
|
||||
|
||||
// let actor = Url::parse("http://ds9.lemmy.ml/u/lemmy_alpha").unwrap();
|
||||
// let id = "http://localhost:123/1";
|
||||
// let activity = json!({
|
||||
// "actor": actor.as_str(),
|
||||
// "to": ["https://www.w3.org/ns/activitystreams#Public"],
|
||||
// "object": "http://ds9.lemmy.ml/post/1",
|
||||
// "cc": ["http://enterprise.lemmy.ml/c/main"],
|
||||
// "type": "Delete",
|
||||
// "id": id
|
||||
// }
|
||||
// );
|
||||
// let body: Bytes = serde_json::to_vec(&activity).unwrap().into();
|
||||
// let incoming_request = construct_request(&body, &actor).await;
|
||||
#[tokio::test]
|
||||
async fn test_receive_unparseable_activity() {
|
||||
let (_, _, config) = setup_receive_test().await;
|
||||
|
||||
// // intentionally cause a parse error by using wrong type for deser
|
||||
// let res = receive_activity::<Follow, DbUser, DbConnection>(
|
||||
// incoming_request.to_http_request(),
|
||||
// body,
|
||||
// &config.to_request_data(),
|
||||
// )
|
||||
// .await;
|
||||
let actor = Url::parse("http://ds9.lemmy.ml/u/lemmy_alpha").unwrap();
|
||||
let id = "http://localhost:123/1";
|
||||
let activity = json!({
|
||||
"actor": actor.as_str(),
|
||||
"to": ["https://www.w3.org/ns/activitystreams#Public"],
|
||||
"object": "http://ds9.lemmy.ml/post/1",
|
||||
"cc": ["http://enterprise.lemmy.ml/c/main"],
|
||||
"type": "Delete",
|
||||
"id": id
|
||||
}
|
||||
);
|
||||
let body: Bytes = serde_json::to_vec(&activity).unwrap().into();
|
||||
let incoming_request = construct_request(&body, &actor).await;
|
||||
|
||||
// match res {
|
||||
// Err(Error::ParseReceivedActivity(_, url)) => {
|
||||
// assert_eq!(id, url.expect("has url").as_str());
|
||||
// }
|
||||
// _ => unreachable!(),
|
||||
// }
|
||||
// }
|
||||
// intentionally cause a parse error by using wrong type for deser
|
||||
let res = receive_activity::<Follow, DbUser, DbConnection>(
|
||||
incoming_request.to_http_request(),
|
||||
body,
|
||||
&config.to_request_data(),
|
||||
)
|
||||
.await;
|
||||
|
||||
// async fn construct_request(body: &Bytes, actor: &Url) -> TestRequest {
|
||||
// let inbox = "https://example.com/inbox";
|
||||
// let headers = generate_request_headers(&Url::parse(inbox).unwrap());
|
||||
// let request_builder = ClientWithMiddleware::from(Client::default())
|
||||
// .post(inbox)
|
||||
// .headers(headers);
|
||||
// let outgoing_request = sign_request(
|
||||
// request_builder,
|
||||
// actor,
|
||||
// body.clone(),
|
||||
// DB_USER_KEYPAIR.private_key().unwrap(),
|
||||
// false,
|
||||
// )
|
||||
// .await
|
||||
// .unwrap();
|
||||
// let mut incoming_request = TestRequest::post().uri(outgoing_request.url().path());
|
||||
// for h in outgoing_request.headers() {
|
||||
// incoming_request = incoming_request.append_header(h);
|
||||
// }
|
||||
// incoming_request
|
||||
// }
|
||||
match res {
|
||||
Err(Error::ParseReceivedActivity(_, url)) => {
|
||||
assert_eq!(id, url.expect("has url").as_str());
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
// async fn setup_receive_test() -> (Bytes, TestRequest, FederationConfig<DbConnection>) {
|
||||
// let activity = Follow {
|
||||
// actor: ObjectId::parse("http://localhost:123").unwrap(),
|
||||
// object: ObjectId::parse("http://localhost:124").unwrap(),
|
||||
// kind: Default::default(),
|
||||
// id: "http://localhost:123/1".try_into().unwrap(),
|
||||
// };
|
||||
// let body: Bytes = serde_json::to_vec(&activity).unwrap().into();
|
||||
// let incoming_request = construct_request(&body, activity.actor.inner()).await;
|
||||
async fn construct_request(body: &Bytes, actor: &Url) -> TestRequest {
|
||||
let inbox = "https://example.com/inbox";
|
||||
let headers = generate_request_headers(&Url::parse(inbox).unwrap());
|
||||
let request_builder = ClientWithMiddleware::from(Client::default())
|
||||
.post(inbox)
|
||||
.headers(headers);
|
||||
let outgoing_request = sign_request(
|
||||
request_builder,
|
||||
actor,
|
||||
body.clone(),
|
||||
DB_USER_KEYPAIR.private_key().unwrap(),
|
||||
false,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut incoming_request = TestRequest::post().uri(outgoing_request.url().path());
|
||||
for h in outgoing_request.headers() {
|
||||
incoming_request = incoming_request.append_header(header_pair(h));
|
||||
}
|
||||
incoming_request
|
||||
}
|
||||
|
||||
// let config = FederationConfig::builder()
|
||||
// .domain("localhost:8002")
|
||||
// .app_data(DbConnection)
|
||||
// .debug(true)
|
||||
// .build()
|
||||
// .await
|
||||
// .unwrap();
|
||||
// (body, incoming_request, config)
|
||||
// }
|
||||
// }
|
||||
async fn setup_receive_test() -> (Bytes, TestRequest, FederationConfig<DbConnection>) {
|
||||
let activity = Follow {
|
||||
actor: ObjectId::parse("http://localhost:123").unwrap(),
|
||||
object: ObjectId::parse("http://localhost:124").unwrap(),
|
||||
kind: Default::default(),
|
||||
id: "http://localhost:123/1".try_into().unwrap(),
|
||||
};
|
||||
let body: Bytes = serde_json::to_vec(&activity).unwrap().into();
|
||||
let incoming_request = construct_request(&body, activity.actor.inner()).await;
|
||||
|
||||
let config = FederationConfig::builder()
|
||||
.domain("localhost:8002")
|
||||
.app_data(DbConnection)
|
||||
.debug(true)
|
||||
.build()
|
||||
.await
|
||||
.unwrap();
|
||||
(body, incoming_request, config)
|
||||
}
|
||||
}
|
||||
|
|
@ -1,5 +1,6 @@
|
|||
//! Utilities for using this library with actix-web framework
|
||||
|
||||
mod http_compat;
|
||||
pub mod inbox;
|
||||
#[doc(hidden)]
|
||||
pub mod middleware;
|
||||
|
|
|
|||
|
|
@ -17,14 +17,17 @@
|
|||
use crate::{
|
||||
activity_queue::{create_activity_queue, ActivityQueue},
|
||||
error::Error,
|
||||
http_signatures::sign_request,
|
||||
protocol::verification::verify_domains_match,
|
||||
traits::{ActivityHandler, Actor},
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use derive_builder::Builder;
|
||||
use dyn_clone::{clone_trait_object, DynClone};
|
||||
use moka::future::Cache;
|
||||
use reqwest_middleware::ClientWithMiddleware;
|
||||
use reqwest::Request;
|
||||
use reqwest_middleware::{ClientWithMiddleware, RequestBuilder};
|
||||
use rsa::{pkcs8::DecodePrivateKey, RsaPrivateKey};
|
||||
use serde::de::DeserializeOwned;
|
||||
use std::{
|
||||
|
|
@ -327,6 +330,25 @@ impl<T: Clone> Data<T> {
|
|||
pub fn request_count(&self) -> u32 {
|
||||
self.request_counter.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Add HTTP signature to arbitrary request
|
||||
pub async fn sign_request(&self, req: RequestBuilder, body: Bytes) -> Result<Request, Error> {
|
||||
let (actor_id, private_key_pem) =
|
||||
self.config
|
||||
.signed_fetch_actor
|
||||
.as_deref()
|
||||
.ok_or(Error::Other(
|
||||
"config value signed_fetch_actor is none".to_string(),
|
||||
))?;
|
||||
sign_request(
|
||||
req,
|
||||
actor_id,
|
||||
body,
|
||||
private_key_pem.clone(),
|
||||
self.config.http_signature_compat,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Clone> Deref for Data<T> {
|
||||
|
|
|
|||
|
|
@ -73,6 +73,14 @@ pub async fn fetch_object_http<T: Clone, Kind: DeserializeOwned>(
|
|||
|
||||
// Ensure id field matches final url after redirect
|
||||
if res.object_id.as_ref() != Some(&res.url) {
|
||||
if let Some(res_object_id) = res.object_id {
|
||||
// If id is different but still on the same domain, attempt to request object
|
||||
// again from url in id field.
|
||||
if res_object_id.domain() == res.url.domain() {
|
||||
return Box::pin(fetch_object_http(&res_object_id, data)).await;
|
||||
}
|
||||
}
|
||||
// Failed to fetch the object from its specified id
|
||||
return Err(Error::FetchWrongId(res.url));
|
||||
}
|
||||
|
||||
|
|
@ -150,3 +158,34 @@ async fn fetch_object_http_with_accept<T: Clone, Kind: DeserializeOwned>(
|
|||
)),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[allow(clippy::unwrap_used)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::{
|
||||
config::FederationConfig,
|
||||
traits::tests::{DbConnection, Person},
|
||||
};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_request_limit() -> Result<(), Error> {
|
||||
let config = FederationConfig::builder()
|
||||
.domain("example.com")
|
||||
.app_data(DbConnection)
|
||||
.http_fetch_limit(0)
|
||||
.build()
|
||||
.await
|
||||
.unwrap();
|
||||
let data = config.to_request_data();
|
||||
|
||||
let fetch_url = "https://example.net/".to_string();
|
||||
|
||||
let res: Result<FetchObjectResponse<Person>, Error> =
|
||||
fetch_object_http(&Url::parse(&fetch_url).map_err(Error::UrlParse)?, &data).await;
|
||||
|
||||
assert_eq!(res.err(), Some(Error::RequestLimit));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -92,7 +92,7 @@ where
|
|||
// object found in database
|
||||
if let Some(object) = db_object {
|
||||
if let Some(last_refreshed_at) = object.last_refreshed_at() {
|
||||
let is_local = data.config.is_local_url(&self.0);
|
||||
let is_local = self.is_local(data);
|
||||
if !is_local && should_refetch_object(last_refreshed_at) {
|
||||
// object is outdated and should be refetched
|
||||
return self.dereference_from_http(data, Some(object)).await;
|
||||
|
|
@ -120,6 +120,7 @@ where
|
|||
.await
|
||||
.map(|o| o.ok_or(Error::NotFound.into()))?
|
||||
} else {
|
||||
// Don't pass in any db object, otherwise it would be returned in case http fetch fails
|
||||
self.dereference_from_http(data, None).await
|
||||
}
|
||||
}
|
||||
|
|
@ -146,6 +147,10 @@ where
|
|||
Object::read_from_id(*id, data).await
|
||||
}
|
||||
|
||||
/// Fetch object from origin instance over HTTP, then verify and parse it.
|
||||
///
|
||||
/// Uses Box::pin to wrap futures to reduce stack size and avoid stack overflow when
|
||||
/// when fetching objects recursively.
|
||||
async fn dereference_from_http(
|
||||
&self,
|
||||
data: &Data<<Kind as Object>::DataType>,
|
||||
|
|
@ -154,7 +159,7 @@ where
|
|||
where
|
||||
<Kind as Object>::Error: From<Error>,
|
||||
{
|
||||
let res = fetch_object_http(&self.0, data).await;
|
||||
let res = Box::pin(fetch_object_http(&self.0, data)).await;
|
||||
|
||||
if let Err(Error::ObjectDeleted(url)) = res {
|
||||
if let Some(db_object) = db_object {
|
||||
|
|
@ -163,11 +168,23 @@ where
|
|||
return Err(Error::ObjectDeleted(url).into());
|
||||
}
|
||||
|
||||
// If fetch failed, return the existing object from local database
|
||||
if let (Err(_), Some(db_object)) = (&res, db_object) {
|
||||
return Ok(db_object);
|
||||
}
|
||||
let res = res?;
|
||||
let redirect_url = &res.url;
|
||||
|
||||
Kind::verify(&res.object, redirect_url, data).await?;
|
||||
Kind::from_json(res.object, data).await
|
||||
// Prevent overwriting local object
|
||||
if data.config.is_local_url(redirect_url) {
|
||||
return self
|
||||
.dereference_from_db(data)
|
||||
.await?
|
||||
.ok_or(Error::NotFound.into());
|
||||
}
|
||||
|
||||
Box::pin(Kind::verify(&res.object, redirect_url, data)).await?;
|
||||
Box::pin(Kind::from_json(res.object, data)).await
|
||||
}
|
||||
|
||||
/// Returns true if the object's domain matches the one defined in [[FederationConfig.domain]].
|
||||
|
|
|
|||
Loading…
Reference in a new issue