Merge branch 'LemmyNet-main' into dev

This commit is contained in:
Tangel 2024-07-22 14:21:40 +08:00
commit 1b2dd8115a
No known key found for this signature in database
GPG key ID: 3EE818DD23597C80
20 changed files with 487 additions and 287 deletions

1
.github/CODEOWNERS vendored Normal file
View file

@ -0,0 +1 @@
* @Nutomic @dessalines

8
.gitignore vendored
View file

@ -3,3 +3,11 @@
/Cargo.lock
perf.data*
flamegraph.svg
# direnv
/.direnv
/.envrc
# nix flake
/flake.nix
/flake.lock

View file

@ -1,54 +1,56 @@
pipeline:
variables:
- &rust_image "rust:1.78-bullseye"
steps:
cargo_fmt:
image: rustdocker/rust:nightly
commands:
- /root/.cargo/bin/cargo fmt -- --check
cargo_check:
image: rust:1.70-bullseye
environment:
CARGO_HOME: .cargo
commands:
- cargo check --all-features --all-targets
when:
- event: pull_request
cargo_clippy:
image: rust:1.70-bullseye
image: *rust_image
environment:
CARGO_HOME: .cargo
commands:
- rustup component add clippy
- cargo clippy --all-targets --all-features --
-D warnings -D deprecated -D clippy::perf -D clippy::complexity
-D clippy::dbg_macro -D clippy::inefficient_to_string
-D clippy::items-after-statements -D clippy::implicit_clone
-D clippy::wildcard_imports -D clippy::cast_lossless
-D clippy::manual_string_new -D clippy::redundant_closure_for_method_calls
- cargo clippy --all-features -- -D clippy::unwrap_used
- cargo clippy --all-targets --all-features
when:
- event: pull_request
cargo_test:
image: rust:1.70-bullseye
image: *rust_image
environment:
CARGO_HOME: .cargo
commands:
- cargo test --all-features --no-fail-fast
when:
- event: pull_request
cargo_doc:
image: rust:1.70-bullseye
image: *rust_image
environment:
CARGO_HOME: .cargo
commands:
- cargo doc --all-features
when:
- event: pull_request
cargo_run_actix_example:
image: rust:1.70-bullseye
image: *rust_image
environment:
CARGO_HOME: .cargo
commands:
- cargo run --example local_federation actix-web
when:
- event: pull_request
cargo_run_axum_example:
image: rust:1.70-bullseye
image: *rust_image
environment:
CARGO_HOME: .cargo
commands:
- cargo run --example local_federation axum
when:
- event: pull_request

View file

@ -1,6 +1,6 @@
[package]
name = "activitypub_federation"
version = "0.5.2"
version = "0.5.8"
edition = "2021"
description = "High-level Activitypub framework"
keywords = ["activitypub", "activitystreams", "federation", "fediverse"]
@ -14,63 +14,89 @@ actix-web = ["dep:actix-web"]
axum = ["dep:axum", "dep:tower", "dep:hyper", "dep:http-body-util"]
diesel = ["dep:diesel"]
[lints.rust]
warnings = "deny"
deprecated = "deny"
[lints.clippy]
perf = { level = "deny", priority = -1 }
complexity = { level = "deny", priority = -1 }
dbg_macro = "deny"
inefficient_to_string = "deny"
items-after-statements = "deny"
implicit_clone = "deny"
wildcard_imports = "deny"
cast_lossless = "deny"
manual_string_new = "deny"
redundant_closure_for_method_calls = "deny"
unwrap_used = "deny"
[dependencies]
chrono = { version = "0.4.34", features = ["clock"], default-features = false }
serde = { version = "1.0.197", features = ["derive"] }
async-trait = "0.1.77"
url = { version = "2.5.0", features = ["serde"] }
serde_json = { version = "1.0.114", features = ["preserve_order"] }
reqwest = { version = "0.11.24", features = ["json", "stream"] }
reqwest-middleware = "0.2.4"
chrono = { version = "0.4.38", features = ["clock"], default-features = false }
serde = { version = "1.0.204", features = ["derive"] }
async-trait = "0.1.81"
url = { version = "2.5.2", features = ["serde"] }
serde_json = { version = "1.0.120", features = ["preserve_order"] }
reqwest = { version = "0.12.5", default-features = false, features = [
"json",
"stream",
"rustls-tls",
] }
reqwest-middleware = "0.3.2"
tracing = "0.1.40"
base64 = "0.21.7"
openssl = "0.10.64"
base64 = "0.22.1"
rand = "0.8.5"
rsa = "0.9.6"
once_cell = "1.19.0"
http = "1.0.0"
sha2 = "0.10.8"
thiserror = "1.0.57"
derive_builder = "0.12.0"
itertools = "0.12.1"
http = "1.1.0"
sha2 = { version = "0.10.8", features = ["oid"] }
thiserror = "1.0.63"
derive_builder = "0.20.0"
itertools = "0.13.0"
dyn-clone = "1.0.17"
enum_delegate = "0.2.0"
httpdate = "1.0.3"
http-signature-normalization-reqwest = { version = "0.10.0", default-features = false, features = [
http-signature-normalization-reqwest = { version = "0.12.0", default-features = false, features = [
"default-spawner",
"sha-2",
"middleware",
"default-spawner",
] }
http-signature-normalization = "0.7.0"
bytes = "1.5.0"
bytes = "1.6.1"
futures-core = { version = "0.3.30", default-features = false }
pin-project-lite = "0.2.13"
pin-project-lite = "0.2.14"
activitystreams-kinds = "0.3.0"
regex = { version = "1.10.3", default-features = false, features = ["std", "unicode-case"] }
tokio = { version = "1.36.0", features = [
regex = { version = "1.10.5", default-features = false, features = [
"std",
"unicode",
] }
tokio = { version = "1.38.0", features = [
"sync",
"rt",
"rt-multi-thread",
"time",
] }
diesel = { version = "2.1.4", features = ["postgres"], default-features = false, optional = true }
diesel = { version = "2.2.2", features = [
"postgres",
], default-features = false, optional = true }
futures = "0.3.30"
moka = { version = "0.12.5", features = ["future"] }
moka = { version = "0.12.8", features = ["future"] }
# Actix-web
actix-web = { version = "4.5.1", default-features = false, optional = true }
actix-web = { version = "4.8.0", default-features = false, optional = true }
# Axum
axum = { git = "https://github.com/tokio-rs/axum.git", features = [
"json",
], default-features = false, optional = true }
tower = { version = "0.4.13", optional = true }
hyper = { version = "1.1.0", optional = true }
http-body-util = {version = "0.1.0", optional = true }
hyper = { version = "1.4.1", optional = true }
http-body-util = { version = "0.1.2", optional = true }
[dev-dependencies]
anyhow = "1.0.80"
rand = "0.8.5"
env_logger = "0.10.2"
anyhow = "1.0.86"
env_logger = "0.11.3"
tower-http = { version = "0.5.2", features = ["map-request-body", "util"] }
axum = { git = "https://github.com/tokio-rs/axum.git", features = [
"http1",
@ -78,7 +104,7 @@ axum = { git = "https://github.com/tokio-rs/axum.git", features = [
"query",
], default-features = false }
axum-macros = { git = "https://github.com/tokio-rs/axum.git" }
tokio = { version = "1.36.0", features = ["full"] }
tokio = { version = "1.38.1", features = ["full"] }
[profile.dev]
strip = "symbols"

View file

@ -1,3 +1,5 @@
#![allow(clippy::unwrap_used)]
use crate::{
database::Database,
http::{http_get_user, http_post_user_inbox, webfinger},

View file

@ -21,7 +21,6 @@ pub struct DbPost {
pub text: String,
pub ap_id: ObjectId<DbPost>,
pub creator: ObjectId<DbUser>,
pub local: bool,
}
#[derive(Deserialize, Serialize, Debug)]
@ -59,7 +58,15 @@ impl Object for DbPost {
}
async fn into_json(self, _data: &Data<Self::DataType>) -> Result<Self::Kind, Self::Error> {
unimplemented!()
Ok(Note {
kind: NoteType::Note,
id: self.ap_id,
content: self.text,
attributed_to: self.creator,
to: vec![public()],
tag: vec![],
in_reply_to: None,
})
}
async fn verify(
@ -81,7 +88,6 @@ impl Object for DbPost {
text: json.content,
ap_id: json.id.clone(),
creator: json.attributed_to.clone(),
local: false,
};
let mention = Mention {

View file

@ -28,6 +28,7 @@ pub async fn new_instance(
.domain(hostname)
.signed_fetch_actor(&system_user)
.app_data(database)
.url_verifier(Box::new(MyUrlVerifier()))
.debug(true)
.build()
.await?;

View file

@ -1,3 +1,5 @@
#![allow(clippy::unwrap_used)]
use crate::{
instance::{listen, new_instance, Webserver},
objects::post::DbPost,

View file

@ -416,6 +416,7 @@ async fn retry<T, E: Display + Debug, F: Future<Output = Result<T, E>>, A: FnMut
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use crate::http_signatures::generate_actor_keypair;

View file

@ -12,14 +12,17 @@ use crate::{
};
use bytes::Bytes;
use futures::StreamExt;
use http::StatusCode;
use httpdate::fmt_http_date;
use itertools::Itertools;
use openssl::pkey::{PKey, Private};
use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
use reqwest::{
header::{HeaderMap, HeaderName, HeaderValue},
Response,
};
use reqwest_middleware::ClientWithMiddleware;
use rsa::{pkcs8::DecodePrivateKey, RsaPrivateKey};
use serde::Serialize;
use std::{
self,
fmt::{Debug, Display},
time::{Duration, SystemTime},
};
@ -34,7 +37,7 @@ pub struct SendActivityTask {
pub(crate) activity_id: Url,
pub(crate) activity: Bytes,
pub(crate) inbox: Url,
pub(crate) private_key: PKey<Private>,
pub(crate) private_key: RsaPrivateKey,
pub(crate) http_signature_compat: bool,
}
@ -90,20 +93,30 @@ impl SendActivityTask {
)
.await?;
let response = client.execute(request).await?;
self.handle_response(response).await
}
match response {
o if o.status().is_success() => {
/// Based on the HTTP status code determines if an activity was delivered successfully. In that case
/// Ok is returned. Otherwise it returns Err and the activity send should be retried later.
///
/// Equivalent code in mastodon: https://github.com/mastodon/mastodon/blob/v4.2.8/app/helpers/jsonld_helper.rb#L215-L217
async fn handle_response(&self, response: Response) -> Result<(), Error> {
match response.status() {
status if status.is_success() => {
debug!("Activity {self} delivered successfully");
Ok(())
}
o if o.status().is_client_error() => {
let text = o.text_limited().await?;
status
if status.is_client_error()
&& status != StatusCode::REQUEST_TIMEOUT
&& status != StatusCode::TOO_MANY_REQUESTS =>
{
let text = response.text_limited().await?;
debug!("Activity {self} was rejected, aborting: {text}");
Ok(())
}
o => {
let status = o.status();
let text = o.text_limited().await?;
status => {
let text = response.text_limited().await?;
Err(Error::Other(format!(
"Activity {self} failure with status {status}: {text}",
@ -159,7 +172,7 @@ where
pub(crate) async fn get_pkey_cached<ActorType>(
data: &Data<impl Clone>,
actor: &ActorType,
) -> Result<PKey<Private>, Error>
) -> Result<RsaPrivateKey, Error>
where
ActorType: Actor,
{
@ -176,13 +189,13 @@ where
// This is a mostly expensive blocking call, we don't want to tie up other tasks while this is happening
let pkey = tokio::task::spawn_blocking(move || {
PKey::private_key_from_pem(private_key_pem.as_bytes()).map_err(|err| {
RsaPrivateKey::from_pkcs8_pem(&private_key_pem).map_err(|err| {
Error::Other(format!("Could not create private key from PEM data:{err}"))
})
})
.await
.map_err(|err| Error::Other(format!("Error joining: {err}")))??;
std::result::Result::<PKey<Private>, Error>::Ok(pkey)
std::result::Result::<RsaPrivateKey, Error>::Ok(pkey)
})
.await
.map_err(|e| Error::Other(format!("cloned error: {e}")))
@ -211,6 +224,7 @@ pub(crate) fn generate_request_headers(inbox_url: &Url) -> HeaderMap {
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use crate::{config::FederationConfig, http_signatures::generate_actor_keypair};
@ -226,7 +240,7 @@ mod tests {
// This will periodically send back internal errors to test the retry
async fn dodgy_handler(
State(state): State<Arc<AtomicUsize>>,
State(_state): State<Arc<AtomicUsize>>,
headers: http::HeaderMap,
body: Bytes,
) -> Result<(), StatusCode> {
@ -297,4 +311,48 @@ mod tests {
info!("Queue Sent: {:?}", start.elapsed());
Ok(())
}
#[tokio::test]
async fn test_handle_response() {
let keypair = generate_actor_keypair().unwrap();
let message = SendActivityTask {
actor_id: "http://localhost:8001".parse().unwrap(),
activity_id: "http://localhost:8001/activity".parse().unwrap(),
activity: "{}".into(),
inbox: "http://localhost:8001".parse().unwrap(),
private_key: keypair.private_key().unwrap(),
http_signature_compat: true,
};
let res = |status| {
http::Response::builder()
.status(status)
.body(vec![])
.unwrap()
.into()
};
assert!(message.handle_response(res(StatusCode::OK)).await.is_ok());
assert!(message
.handle_response(res(StatusCode::BAD_REQUEST))
.await
.is_ok());
assert!(message
.handle_response(res(StatusCode::MOVED_PERMANENTLY))
.await
.is_err());
assert!(message
.handle_response(res(StatusCode::REQUEST_TIMEOUT))
.await
.is_err());
assert!(message
.handle_response(res(StatusCode::TOO_MANY_REQUESTS))
.await
.is_err());
assert!(message
.handle_response(res(StatusCode::INTERNAL_SERVER_ERROR))
.await
.is_err());
}
}

View file

@ -28,13 +28,27 @@ where
<ActorT as Object>::Error: From<Error>,
Datatype: Clone,
{
verify_body_hash(request.headers().get("Digest"), &body)?;
let header_value = request
.headers()
.get("Digest")
.map(|v| reqwest::header::HeaderValue::from_str(v.to_str().unwrap_or_default()))
.and_then(|v| v.ok());
verify_body_hash(header_value.as_ref(), &body)?;
let (activity, actor) = parse_received_activity::<Activity, ActorT, _>(&body, data).await?;
let mut vec = Vec::<(_, _)>::with_capacity(request.headers().len());
request.headers().iter().for_each(|(k, v)| {
let k = reqwest::header::HeaderName::from_str(k.as_str()).unwrap();
let v = reqwest::header::HeaderValue::from_str(v.to_str().unwrap_or_default()).unwrap();
vec.push((k, v));
});
let headers = vec.iter().map(|(k, v)| (k, v)).collect::<Vec<(_, _)>>();
verify_signature(
request.headers(),
request.method(),
headers,
&reqwest::Method::from_str(request.method().as_str())
.map_err(|err| Error::Other(err.to_string()))?,
&http::Uri::from_str(&request.uri().to_string()).unwrap(),
actor.public_key_pem(),
)?;
@ -45,138 +59,139 @@ where
Ok(HttpResponse::Ok().finish())
}
#[cfg(test)]
mod test {
use super::*;
use crate::{
activity_sending::generate_request_headers,
config::FederationConfig,
fetch::object_id::ObjectId,
http_signatures::sign_request,
traits::tests::{DbConnection, DbUser, Follow, DB_USER_KEYPAIR},
};
use actix_web::test::TestRequest;
use reqwest::Client;
use reqwest_middleware::ClientWithMiddleware;
use serde_json::json;
use url::Url;
// #[cfg(test)]
// #[allow(clippy::unwrap_used)]
// mod test {
// use super::*;
// use crate::{
// activity_sending::generate_request_headers,
// config::FederationConfig,
// fetch::object_id::ObjectId,
// http_signatures::sign_request,
// traits::tests::{DbConnection, DbUser, Follow, DB_USER_KEYPAIR},
// };
// use actix_web::test::TestRequest;
// use reqwest::Client;
// use reqwest_middleware::ClientWithMiddleware;
// use serde_json::json;
// use url::Url;
#[tokio::test]
async fn test_receive_activity() {
let (body, incoming_request, config) = setup_receive_test().await;
receive_activity::<Follow, DbUser, DbConnection>(
incoming_request.to_http_request(),
body,
&config.to_request_data(),
)
.await
.unwrap();
}
// #[tokio::test]
// async fn test_receive_activity() {
// let (body, incoming_request, config) = setup_receive_test().await;
// receive_activity::<Follow, DbUser, DbConnection>(
// incoming_request.to_http_request(),
// body,
// &config.to_request_data(),
// )
// .await
// .unwrap();
// }
#[tokio::test]
async fn test_receive_activity_invalid_body_signature() {
let (_, incoming_request, config) = setup_receive_test().await;
let err = receive_activity::<Follow, DbUser, DbConnection>(
incoming_request.to_http_request(),
"invalid".into(),
&config.to_request_data(),
)
.await
.err()
.unwrap();
// #[tokio::test]
// async fn test_receive_activity_invalid_body_signature() {
// let (_, incoming_request, config) = setup_receive_test().await;
// let err = receive_activity::<Follow, DbUser, DbConnection>(
// incoming_request.to_http_request(),
// "invalid".into(),
// &config.to_request_data(),
// )
// .await
// .err()
// .unwrap();
assert_eq!(&err, &Error::ActivityBodyDigestInvalid)
}
// assert_eq!(&err, &Error::ActivityBodyDigestInvalid)
// }
#[tokio::test]
async fn test_receive_activity_invalid_path() {
let (body, incoming_request, config) = setup_receive_test().await;
let incoming_request = incoming_request.uri("/wrong");
let err = receive_activity::<Follow, DbUser, DbConnection>(
incoming_request.to_http_request(),
body,
&config.to_request_data(),
)
.await
.err()
.unwrap();
// #[tokio::test]
// async fn test_receive_activity_invalid_path() {
// let (body, incoming_request, config) = setup_receive_test().await;
// let incoming_request = incoming_request.uri("/wrong");
// let err = receive_activity::<Follow, DbUser, DbConnection>(
// incoming_request.to_http_request(),
// body,
// &config.to_request_data(),
// )
// .await
// .err()
// .unwrap();
assert_eq!(&err, &Error::ActivitySignatureInvalid)
}
// assert_eq!(&err, &Error::ActivitySignatureInvalid)
// }
#[tokio::test]
async fn test_receive_unparseable_activity() {
let (_, _, config) = setup_receive_test().await;
// #[tokio::test]
// async fn test_receive_unparseable_activity() {
// let (_, _, config) = setup_receive_test().await;
let actor = Url::parse("http://ds9.lemmy.ml/u/lemmy_alpha").unwrap();
let id = "http://localhost:123/1";
let activity = json!({
"actor": actor.as_str(),
"to": ["https://www.w3.org/ns/activitystreams#Public"],
"object": "http://ds9.lemmy.ml/post/1",
"cc": ["http://enterprise.lemmy.ml/c/main"],
"type": "Delete",
"id": id
}
);
let body: Bytes = serde_json::to_vec(&activity).unwrap().into();
let incoming_request = construct_request(&body, &actor).await;
// let actor = Url::parse("http://ds9.lemmy.ml/u/lemmy_alpha").unwrap();
// let id = "http://localhost:123/1";
// let activity = json!({
// "actor": actor.as_str(),
// "to": ["https://www.w3.org/ns/activitystreams#Public"],
// "object": "http://ds9.lemmy.ml/post/1",
// "cc": ["http://enterprise.lemmy.ml/c/main"],
// "type": "Delete",
// "id": id
// }
// );
// let body: Bytes = serde_json::to_vec(&activity).unwrap().into();
// let incoming_request = construct_request(&body, &actor).await;
// intentionally cause a parse error by using wrong type for deser
let res = receive_activity::<Follow, DbUser, DbConnection>(
incoming_request.to_http_request(),
body,
&config.to_request_data(),
)
.await;
// // intentionally cause a parse error by using wrong type for deser
// let res = receive_activity::<Follow, DbUser, DbConnection>(
// incoming_request.to_http_request(),
// body,
// &config.to_request_data(),
// )
// .await;
match res {
Err(Error::ParseReceivedActivity(_, url)) => {
assert_eq!(id, url.expect("has url").as_str());
}
_ => unreachable!(),
}
}
// match res {
// Err(Error::ParseReceivedActivity(_, url)) => {
// assert_eq!(id, url.expect("has url").as_str());
// }
// _ => unreachable!(),
// }
// }
async fn construct_request(body: &Bytes, actor: &Url) -> TestRequest {
let inbox = "https://example.com/inbox";
let headers = generate_request_headers(&Url::parse(inbox).unwrap());
let request_builder = ClientWithMiddleware::from(Client::default())
.post(inbox)
.headers(headers);
let outgoing_request = sign_request(
request_builder,
actor,
body.clone(),
DB_USER_KEYPAIR.private_key().unwrap(),
false,
)
.await
.unwrap();
let mut incoming_request = TestRequest::post().uri(outgoing_request.url().path());
for h in outgoing_request.headers() {
incoming_request = incoming_request.append_header(h);
}
incoming_request
}
// async fn construct_request(body: &Bytes, actor: &Url) -> TestRequest {
// let inbox = "https://example.com/inbox";
// let headers = generate_request_headers(&Url::parse(inbox).unwrap());
// let request_builder = ClientWithMiddleware::from(Client::default())
// .post(inbox)
// .headers(headers);
// let outgoing_request = sign_request(
// request_builder,
// actor,
// body.clone(),
// DB_USER_KEYPAIR.private_key().unwrap(),
// false,
// )
// .await
// .unwrap();
// let mut incoming_request = TestRequest::post().uri(outgoing_request.url().path());
// for h in outgoing_request.headers() {
// incoming_request = incoming_request.append_header(h);
// }
// incoming_request
// }
async fn setup_receive_test() -> (Bytes, TestRequest, FederationConfig<DbConnection>) {
let activity = Follow {
actor: ObjectId::parse("http://localhost:123").unwrap(),
object: ObjectId::parse("http://localhost:124").unwrap(),
kind: Default::default(),
id: "http://localhost:123/1".try_into().unwrap(),
};
let body: Bytes = serde_json::to_vec(&activity).unwrap().into();
let incoming_request = construct_request(&body, activity.actor.inner()).await;
// async fn setup_receive_test() -> (Bytes, TestRequest, FederationConfig<DbConnection>) {
// let activity = Follow {
// actor: ObjectId::parse("http://localhost:123").unwrap(),
// object: ObjectId::parse("http://localhost:124").unwrap(),
// kind: Default::default(),
// id: "http://localhost:123/1".try_into().unwrap(),
// };
// let body: Bytes = serde_json::to_vec(&activity).unwrap().into();
// let incoming_request = construct_request(&body, activity.actor.inner()).await;
let config = FederationConfig::builder()
.domain("localhost:8002")
.app_data(DbConnection)
.debug(true)
.build()
.await
.unwrap();
(body, incoming_request, config)
}
}
// let config = FederationConfig::builder()
// .domain("localhost:8002")
// .app_data(DbConnection)
// .debug(true)
// .build()
// .await
// .unwrap();
// (body, incoming_request, config)
// }
// }

View file

@ -26,11 +26,25 @@ where
<A as Object>::Error: From<Error>,
for<'de2> <A as Object>::Kind: Deserialize<'de2>,
{
verify_body_hash(request.headers().get("Digest"), &body.unwrap_or_default())?;
let header_value = request
.headers()
.get("Digest")
.map(|v| reqwest::header::HeaderValue::from_str(v.to_str().unwrap_or_default()))
.and_then(|v| v.ok());
verify_body_hash(header_value.as_ref(), &body.unwrap_or_default())?;
let mut vec = Vec::<(_, _)>::with_capacity(request.headers().len());
request.headers().iter().for_each(|(k, v)| {
let k = reqwest::header::HeaderName::from_str(k.as_str()).unwrap();
let v = reqwest::header::HeaderValue::from_str(v.to_str().unwrap_or_default()).unwrap();
vec.push((k, v));
});
let headers = vec.iter().map(|(k, v)| (k, v)).collect::<Vec<(_, _)>>();
http_signatures::signing_actor(
request.headers(),
request.method(),
headers,
&reqwest::Method::from_str(request.method().as_str())
.map_err(|err| Error::Other(err.to_string()))?,
&http::Uri::from_str(&request.uri().to_string()).unwrap(),
data,
)

View file

@ -5,7 +5,7 @@
use crate::{
config::Data,
error::Error,
http_signatures::verify_signature,
// http_signatures::verify_signature,
parse_received_activity,
traits::{ActivityHandler, Actor, Object},
};
@ -32,7 +32,7 @@ where
<ActorT as Object>::Error: From<Error>,
Datatype: Clone,
{
let (activity, actor) =
let (activity, _actor) =
parse_received_activity::<Activity, ActorT, _>(&activity_data.body, data).await?;
// verify_signature(
@ -49,6 +49,7 @@ where
}
/// Contains all data that is necessary to receive an activity from an HTTP request
#[allow(dead_code)]
#[derive(Debug)]
pub struct ActivityData {
headers: HeaderMap,

View file

@ -24,8 +24,8 @@ use async_trait::async_trait;
use derive_builder::Builder;
use dyn_clone::{clone_trait_object, DynClone};
use moka::future::Cache;
use openssl::pkey::{PKey, Private};
use reqwest_middleware::ClientWithMiddleware;
use rsa::{pkcs8::DecodePrivateKey, RsaPrivateKey};
use serde::de::DeserializeOwned;
use std::{
ops::Deref,
@ -80,12 +80,12 @@ pub struct FederationConfig<T: Clone> {
/// This can be used to implement secure mode federation.
/// <https://docs.joinmastodon.org/spec/activitypub/#secure-mode>
#[builder(default = "None", setter(custom))]
pub(crate) signed_fetch_actor: Option<Arc<(Url, PKey<Private>)>>,
pub(crate) signed_fetch_actor: Option<Arc<(Url, RsaPrivateKey)>>,
#[builder(
default = "Cache::builder().max_capacity(10000).build()",
setter(custom)
)]
pub(crate) actor_pkey_cache: Cache<Url, PKey<Private>>,
pub(crate) actor_pkey_cache: Cache<Url, RsaPrivateKey>,
/// Queue for sending outgoing activities. Only optional to make builder work, its always
/// present once constructed.
#[builder(setter(skip))]
@ -174,11 +174,17 @@ impl<T: Clone> FederationConfig<T> {
/// Returns true if the url refers to this instance. Handles hostnames like `localhost:8540` for
/// local debugging.
pub(crate) fn is_local_url(&self, url: &Url) -> bool {
let mut domain = url.host_str().expect("id has domain").to_string();
if let Some(port) = url.port() {
domain = format!("{}:{}", domain, port);
match url.host_str() {
Some(domain) => {
let domain = if let Some(port) = url.port() {
format!("{}:{}", domain, port)
} else {
domain.to_string()
};
domain == self.domain
}
None => false,
}
domain == self.domain
}
/// Returns the local domain
@ -194,8 +200,8 @@ impl<T: Clone> FederationConfigBuilder<T> {
.private_key_pem()
.expect("actor does not have a private key to sign with");
let private_key = PKey::private_key_from_pem(private_key_pem.as_bytes())
.expect("Could not decode PEM data");
let private_key =
RsaPrivateKey::from_pkcs8_pem(&private_key_pem).expect("Could not decode PEM data");
self.signed_fetch_actor = Some(Some(Arc::new((actor.id(), private_key))));
self
}
@ -341,3 +347,34 @@ impl<T: Clone> FederationMiddleware<T> {
FederationMiddleware(config)
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod test {
use super::*;
async fn config() -> FederationConfig<i32> {
FederationConfig::builder()
.domain("example.com")
.app_data(1)
.build()
.await
.unwrap()
}
#[tokio::test]
async fn test_url_is_local() -> Result<(), Error> {
let config = config().await;
assert!(config.is_local_url(&Url::parse("http://example.com")?));
assert!(!config.is_local_url(&Url::parse("http://other.com")?));
// ensure that missing domain doesnt cause crash
assert!(!config.is_local_url(&Url::parse("http://127.0.0.1")?));
Ok(())
}
#[tokio::test]
async fn test_get_domain() {
let config = config().await;
assert_eq!("example.com", config.domain());
}
}

View file

@ -2,7 +2,10 @@
use crate::fetch::webfinger::WebFingerError;
use http_signature_normalization_reqwest::SignError;
use openssl::error::ErrorStack;
use rsa::{
errors::Error as RsaError,
pkcs8::{spki::Error as SpkiError, Error as Pkcs8Error},
};
use std::string::FromUtf8Error;
use tokio::task::JoinError;
use url::Url;
@ -80,8 +83,20 @@ pub enum Error {
Other(String),
}
impl From<ErrorStack> for Error {
fn from(value: ErrorStack) -> Self {
impl From<RsaError> for Error {
fn from(value: RsaError) -> Self {
Error::Other(value.to_string())
}
}
impl From<Pkcs8Error> for Error {
fn from(value: Pkcs8Error) -> Self {
Error::Other(value.to_string())
}
}
impl From<SpkiError> for Error {
fn from(value: SpkiError) -> Self {
Error::Other(value.to_string())
}
}

View file

@ -53,26 +53,35 @@ pub async fn fetch_object_http<T: Clone, Kind: DeserializeOwned>(
url: &Url,
data: &Data<T>,
) -> Result<FetchObjectResponse<Kind>, Error> {
static CONTENT_TYPE: HeaderValue = HeaderValue::from_static(FEDERATION_CONTENT_TYPE);
static ALT_CONTENT_TYPE: HeaderValue = HeaderValue::from_static(
r#"application/ld+json; profile="https://www.w3.org/ns/activitystreams""#,
);
static ALT_CONTENT_TYPE_MASTODON: HeaderValue =
HeaderValue::from_static(r#"application/activity+json; charset=utf-8"#);
let res = fetch_object_http_with_accept(url, data, &CONTENT_TYPE).await?;
static FETCH_CONTENT_TYPE: HeaderValue = HeaderValue::from_static(FEDERATION_CONTENT_TYPE);
const VALID_RESPONSE_CONTENT_TYPES: [&str; 3] = [
FEDERATION_CONTENT_TYPE, // lemmy
r#"application/ld+json; profile="https://www.w3.org/ns/activitystreams""#, // activitypub standard
r#"application/activity+json; charset=utf-8"#, // mastodon
];
let res = fetch_object_http_with_accept(url, data, &FETCH_CONTENT_TYPE).await?;
// Ensure correct content-type to prevent vulnerabilities.
if res.content_type.as_ref() != Some(&CONTENT_TYPE)
&& res.content_type.as_ref() != Some(&ALT_CONTENT_TYPE)
&& res.content_type.as_ref() != Some(&ALT_CONTENT_TYPE_MASTODON)
{
// Ensure correct content-type to prevent vulnerabilities, with case insensitive comparison.
let content_type = res
.content_type
.as_ref()
.and_then(|c| Some(c.to_str().ok()?.to_lowercase()))
.ok_or(Error::FetchInvalidContentType(res.url.clone()))?;
if !VALID_RESPONSE_CONTENT_TYPES.contains(&content_type.as_str()) {
return Err(Error::FetchInvalidContentType(res.url));
}
// Ensure id field matches final url
// Ensure id field matches final url after redirect
if res.object_id.as_ref() != Some(&res.url) {
return Err(Error::FetchWrongId(res.url));
}
// Dont allow fetching local object. Only check this after the request as a local url
// may redirect to a remote object.
if data.config.is_local_url(&res.url) {
return Err(Error::NotFound);
}
Ok(res)
}
@ -84,17 +93,15 @@ async fn fetch_object_http_with_accept<T: Clone, Kind: DeserializeOwned>(
content_type: &HeaderValue,
) -> Result<FetchObjectResponse<Kind>, Error> {
let config = &data.config;
// dont fetch local objects this way
debug_assert!(url.domain() != Some(&config.domain));
config.verify_url_valid(url).await?;
info!("Fetching remote object {}", url.to_string());
// let mut counter = data.request_counter.fetch_add(1, Ordering::SeqCst);
let mut counter = data.request_counter.fetch_add(1, Ordering::SeqCst);
// fetch_add returns old value so we need to increment manually here
// counter += 1;
// if counter > config.http_fetch_limit {
// return Err(Error::RequestLimit);
// }
counter += 1;
if counter > config.http_fetch_limit {
return Err(Error::RequestLimit);
}
let req = config
.client

View file

@ -88,19 +88,13 @@ where
<Kind as Object>::Error: From<Error>,
{
let db_object = self.dereference_from_db(data).await?;
// if its a local object, only fetch it from the database and not over http
if data.config.is_local_url(&self.0) {
return match db_object {
None => Err(Error::NotFound.into()),
Some(o) => Ok(o),
};
}
// object found in database
if let Some(object) = db_object {
// object is old and should be refetched
if let Some(last_refreshed_at) = object.last_refreshed_at() {
if should_refetch_object(last_refreshed_at) {
let is_local = data.config.is_local_url(&self.0);
if !is_local && should_refetch_object(last_refreshed_at) {
// object is outdated and should be refetched
return self.dereference_from_http(data, Some(object)).await;
}
}
@ -175,6 +169,11 @@ where
Kind::verify(&res.object, redirect_url, data).await?;
Kind::from_json(res.object, data).await
}
/// Returns true if the object's domain matches the one defined in [[FederationConfig.domain]].
pub fn is_local(&self, data: &Data<<Kind as Object>::DataType>) -> bool {
data.config.is_local_url(&self.0)
}
}
/// Need to implement clone manually, to avoid requiring Kind to be Clone
@ -345,9 +344,10 @@ const _IMPL_DIESEL_NEW_TYPE_FOR_OBJECT_ID: () = {
};
#[cfg(test)]
#[allow(clippy::unwrap_used)]
pub mod tests {
use super::*;
use crate::{fetch::object_id::should_refetch_object, traits::tests::DbUser};
use crate::traits::tests::DbUser;
#[test]
fn test_deserialize() {

View file

@ -245,6 +245,7 @@ pub struct WebfingerLink {
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use crate::{
@ -263,8 +264,6 @@ mod tests {
let data = config.to_request_data();
webfinger_resolve_actor::<DbConnection, DbUser>("LemmyDev@mastodon.social", &data).await?;
// poa.st is as of 2023-07-14 the largest Pleroma instance
webfinger_resolve_actor::<DbConnection, DbUser>("graf@poa.st", &data).await?;
Ok(())
}

View file

@ -20,21 +20,21 @@ use http_signature_normalization_reqwest::{
DefaultSpawner,
};
use once_cell::sync::Lazy;
use openssl::{
hash::MessageDigest,
pkey::{PKey, Private},
rsa::Rsa,
sign::{Signer, Verifier},
};
use reqwest::{
header::{HeaderName, HeaderValue},
Method,
Request,
};
use reqwest_middleware::RequestBuilder;
use rsa::{
pkcs8::{DecodePublicKey, EncodePrivateKey, EncodePublicKey, LineEnding},
Pkcs1v15Sign,
RsaPrivateKey,
RsaPublicKey,
};
use serde::Deserialize;
use sha2::{Digest, Sha256};
use std::{collections::BTreeMap, fmt::Debug, io::ErrorKind, time::Duration};
use std::{collections::BTreeMap, fmt::Debug, time::Duration};
use tracing::debug;
use url::Url;
@ -50,27 +50,23 @@ pub struct Keypair {
impl Keypair {
/// Helper method to turn this into an openssl private key
#[cfg(test)]
pub(crate) fn private_key(&self) -> Result<PKey<Private>, anyhow::Error> {
Ok(PKey::private_key_from_pem(self.private_key.as_bytes())?)
pub(crate) fn private_key(&self) -> Result<RsaPrivateKey, anyhow::Error> {
use rsa::pkcs8::DecodePrivateKey;
Ok(RsaPrivateKey::from_pkcs8_pem(&self.private_key)?)
}
}
/// Generate a random asymmetric keypair for ActivityPub HTTP signatures.
pub fn generate_actor_keypair() -> Result<Keypair, std::io::Error> {
let rsa = Rsa::generate(2048)?;
let pkey = PKey::from_rsa(rsa)?;
let public_key = pkey.public_key_to_pem()?;
let private_key = pkey.private_key_to_pem_pkcs8()?;
let key_to_string = |key| match String::from_utf8(key) {
Ok(s) => Ok(s),
Err(e) => Err(std::io::Error::new(
ErrorKind::Other,
format!("Failed converting key to string: {}", e),
)),
};
pub fn generate_actor_keypair() -> Result<Keypair, Error> {
let mut rng = rand::thread_rng();
let rsa = RsaPrivateKey::new(&mut rng, 2048)?;
let pkey = RsaPublicKey::from(&rsa);
let public_key = pkey.to_public_key_pem(LineEnding::default())?;
let private_key = rsa.to_pkcs8_pem(LineEnding::default())?.to_string();
Ok(Keypair {
private_key: key_to_string(private_key)?,
public_key: key_to_string(public_key)?,
private_key,
public_key,
})
}
@ -87,7 +83,7 @@ pub(crate) async fn sign_request(
request_builder: RequestBuilder,
actor_id: &Url,
activity: Bytes,
private_key: PKey<Private>,
private_key: RsaPrivateKey,
http_signature_compat: bool,
) -> Result<Request, Error> {
static CONFIG: Lazy<Config<DefaultSpawner>> =
@ -110,10 +106,10 @@ pub(crate) async fn sign_request(
Sha256::new(),
activity,
move |signing_string| {
let mut signer = Signer::new(MessageDigest::sha256(), &private_key)?;
signer.update(signing_string.as_bytes())?;
Ok(Base64.encode(signer.sign_to_vec()?)) as Result<_, Error>
Ok(Base64.encode(private_key.sign(
Pkcs1v15Sign::new::<Sha256>(),
&Sha256::digest(signing_string.as_bytes()),
)?)) as Result<_, Error>
},
)
.await
@ -193,8 +189,11 @@ fn verify_signature_inner(
uri: &Uri,
public_key: &str,
) -> Result<(), Error> {
static CONFIG: Lazy<http_signature_normalization::Config> =
Lazy::new(|| http_signature_normalization::Config::new().set_expiration(EXPIRES_AFTER));
static CONFIG: Lazy<http_signature_normalization::Config> = Lazy::new(|| {
http_signature_normalization::Config::new()
.set_expiration(EXPIRES_AFTER)
.require_digest()
});
let path_and_query = uri.path_and_query().map(PathAndQuery::as_str).unwrap_or("");
@ -206,15 +205,19 @@ fn verify_signature_inner(
"Verifying with key {}, message {}",
&public_key, &signing_string
);
let public_key = PKey::public_key_from_pem(public_key.as_bytes())?;
let mut verifier = Verifier::new(MessageDigest::sha256(), &public_key)?;
verifier.update(signing_string.as_bytes())?;
let public_key = RsaPublicKey::from_public_key_pem(public_key)?;
let base64_decoded = Base64
.decode(signature)
.map_err(|err| Error::Other(err.to_string()))?;
Ok(verifier.verify(&base64_decoded)?)
Ok(public_key
.verify(
Pkcs1v15Sign::new::<Sha256>(),
&Sha256::digest(signing_string.as_bytes()),
&base64_decoded,
)
.is_ok())
})?;
if verified {
@ -279,11 +282,13 @@ pub(crate) fn verify_body_hash(
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
pub mod test {
use super::*;
use crate::activity_sending::generate_request_headers;
use reqwest::Client;
use reqwest_middleware::ClientWithMiddleware;
use rsa::{pkcs1::DecodeRsaPrivateKey, pkcs8::DecodePrivateKey};
use std::str::FromStr;
static ACTOR_ID: Lazy<Url> = Lazy::new(|| Url::parse("https://example.com/u/alice").unwrap());
@ -306,7 +311,7 @@ pub mod test {
request_builder,
&ACTOR_ID,
"my activity".into(),
PKey::private_key_from_pem(test_keypair().private_key.as_bytes()).unwrap(),
RsaPrivateKey::from_pkcs8_pem(&test_keypair().private_key).unwrap(),
// set this to prevent created/expires headers to be generated and inserted
// automatically from current time
true,
@ -342,7 +347,7 @@ pub mod test {
request_builder,
&ACTOR_ID,
"my activity".to_string().into(),
PKey::private_key_from_pem(test_keypair().private_key.as_bytes()).unwrap(),
RsaPrivateKey::from_pkcs8_pem(&test_keypair().private_key).unwrap(),
false,
)
.await
@ -378,13 +383,13 @@ pub mod test {
}
pub fn test_keypair() -> Keypair {
let rsa = Rsa::private_key_from_pem(PRIVATE_KEY.as_bytes()).unwrap();
let pkey = PKey::from_rsa(rsa).unwrap();
let private_key = pkey.private_key_to_pem_pkcs8().unwrap();
let public_key = pkey.public_key_to_pem().unwrap();
let rsa = RsaPrivateKey::from_pkcs1_pem(PRIVATE_KEY).unwrap();
let pkey = RsaPublicKey::from(&rsa);
let public_key = pkey.to_public_key_pem(LineEnding::default()).unwrap();
let private_key = rsa.to_pkcs8_pem(LineEnding::default()).unwrap().to_string();
Keypair {
private_key: String::from_utf8(private_key).unwrap(),
public_key: String::from_utf8(public_key).unwrap(),
private_key,
public_key,
}
}

View file

@ -338,12 +338,12 @@ pub trait Collection: Sized {
#[doc(hidden)]
#[allow(clippy::unwrap_used)]
pub mod tests {
use super::*;
use super::{async_trait, ActivityHandler, Actor, Data, Debug, Object, PublicKey, Url};
use crate::{
error::Error,
fetch::object_id::ObjectId,
http_signatures::{generate_actor_keypair, Keypair},
protocol::{public_key::PublicKey, verification::verify_domains_match},
protocol::verification::verify_domains_match,
};
use activitystreams_kinds::{activity::FollowType, actor::PersonType};
use once_cell::sync::Lazy;