Compare commits

..

5 commits
main ... hatsu

Author SHA1 Message Date
藍+85CD
9b2df05977
fix: lint code 2024-07-26 20:51:48 +08:00
藍+85CD
1d877348ed
fix(clippy): warn all 2024-07-26 20:41:37 +08:00
藍+85CD
35f90287f7
fix: cargo fmt check 2024-07-26 17:10:05 +08:00
藍+85CD
5fc5546aec
refactor: update dependencies (#2)
* chore(deps): bump reqwest

* chore(deps): bump http

* chore(deps): bump http-signature-normalization-reqwest

* chore(deps): bump hyper

* chore(deps): bump axum

* chore(deps): use axum-extra

* refactor(tests): use axum::serve

* fix: axum inbox

Co-Authored-By: j0 <me@j0.lol>

* fix(examples): use axum::serve

* chore(ci): add doc test

* fix: docs test

* fix(tests): fix port

* fix(examples): use tokio::spawn

* chore(examples): remove actix-web code

---------

Co-authored-by: j0 <me@j0.lol>
2024-07-25 19:40:57 +08:00
藍+85CD
03569d5a56
refactor!: remove actix-web feature (#1)
* refactor!: remove `actix-web` feature

* chore: remove codeowners

* chore(ci): add check action

* fix(ci/check): remove nightly toolchain

* fix(ci): allow dead code

* refactor(examples): remove actix-web
2024-07-25 17:02:10 +08:00
30 changed files with 129 additions and 490 deletions

1
.github/CODEOWNERS vendored
View file

@ -1 +0,0 @@
* @Nutomic @dessalines

24
.github/workflows/check.yml vendored Normal file
View file

@ -0,0 +1,24 @@
name: check
on:
pull_request:
workflow_dispatch:
env:
CARGO_TERM_COLOR: always
# https://github.com/Mozilla-Actions/sccache-action#rust-code
RUSTC_WRAPPER: "sccache"
SCCACHE_GHA_ENABLED: "true"
jobs:
check:
runs-on: ubuntu-latest
name: check
steps:
- uses: actions/checkout@v4
- uses: mozilla-actions/sccache-action@v0.0.3
# - run: cargo fmt -- --check
- run: cargo clippy --all-targets --all-features
- run: cargo test --all-features --no-fail-fast
- run: cargo doc --all-features
- run: cargo run --example local_federation axum

View file

@ -9,9 +9,14 @@ repository = "https://github.com/LemmyNet/activitypub-federation-rust"
documentation = "https://docs.rs/activitypub_federation/" documentation = "https://docs.rs/activitypub_federation/"
[features] [features]
default = ["actix-web", "axum"] default = ["axum"]
actix-web = ["dep:actix-web"] axum = [
axum = ["dep:axum", "dep:tower", "dep:hyper", "dep:http-body-util"] "dep:axum",
"dep:axum-extra",
"dep:tower",
"dep:hyper",
"dep:http-body-util",
]
diesel = ["dep:diesel"] diesel = ["dep:diesel"]
[lints.rust] [lints.rust]
@ -37,18 +42,18 @@ serde = { version = "1.0.204", features = ["derive"] }
async-trait = "0.1.81" async-trait = "0.1.81"
url = { version = "2.5.2", features = ["serde"] } url = { version = "2.5.2", features = ["serde"] }
serde_json = { version = "1.0.120", features = ["preserve_order"] } serde_json = { version = "1.0.120", features = ["preserve_order"] }
reqwest = { version = "0.11.27", default-features = false, features = [ reqwest = { version = "0.12.5", default-features = false, features = [
"json", "json",
"stream", "stream",
"rustls-tls", "rustls-tls",
] } ] }
reqwest-middleware = "0.2.5" reqwest-middleware = "0.3.2"
tracing = "0.1.40" tracing = "0.1.40"
base64 = "0.22.1" base64 = "0.22.1"
rand = "0.8.5" rand = "0.8.5"
rsa = "0.9.6" rsa = "0.9.6"
once_cell = "1.19.0" once_cell = "1.19.0"
http = "0.2.12" http = "1.1.0"
sha2 = { version = "0.10.8", features = ["oid"] } sha2 = { version = "0.10.8", features = ["oid"] }
thiserror = "1.0.62" thiserror = "1.0.62"
derive_builder = "0.20.0" derive_builder = "0.20.0"
@ -56,7 +61,7 @@ itertools = "0.13.0"
dyn-clone = "1.0.17" dyn-clone = "1.0.17"
enum_delegate = "0.2.0" enum_delegate = "0.2.0"
httpdate = "1.0.3" httpdate = "1.0.3"
http-signature-normalization-reqwest = { version = "0.10.0", default-features = false, features = [ http-signature-normalization-reqwest = { version = "0.12.0", default-features = false, features = [
"sha-2", "sha-2",
"middleware", "middleware",
"default-spawner", "default-spawner",
@ -82,28 +87,25 @@ diesel = { version = "2.2.1", features = [
futures = "0.3.30" futures = "0.3.30"
moka = { version = "0.12.8", features = ["future"] } moka = { version = "0.12.8", features = ["future"] }
# Actix-web
actix-web = { version = "4.8.0", default-features = false, optional = true }
# Axum # Axum
axum = { version = "0.6.20", features = [ axum = { version = "0.7.5", features = [
"json", "json",
"headers",
], default-features = false, optional = true } ], default-features = false, optional = true }
axum-extra = { version = "0.9.3", features = ["typed-header"], optional = true }
tower = { version = "0.4.13", optional = true } tower = { version = "0.4.13", optional = true }
hyper = { version = "0.14", optional = true } hyper = { version = "1.4.1", optional = true }
http-body-util = { version = "0.1.2", optional = true } http-body-util = { version = "0.1.2", optional = true }
[dev-dependencies] [dev-dependencies]
anyhow = "1.0.86" anyhow = "1.0.86"
env_logger = "0.11.3" env_logger = "0.11.3"
tower-http = { version = "0.5.2", features = ["map-request-body", "util"] } tower-http = { version = "0.5.2", features = ["map-request-body", "util"] }
axum = { version = "0.6.20", features = [ axum = { version = "0.7.5", features = [
"http1", "http1",
"tokio", "tokio",
"query", "query",
], default-features = false } ], default-features = false }
axum-macros = "0.3.8" axum-macros = "0.4.1"
tokio = { version = "1.38.0", features = ["full"] } tokio = { version = "1.38.0", features = ["full"] }
[profile.dev] [profile.dev]

View file

@ -15,9 +15,9 @@ The next step is to allow other servers to fetch our actors and objects. For thi
# use activitypub_federation::config::FederationMiddleware; # use activitypub_federation::config::FederationMiddleware;
# use axum::routing::get; # use axum::routing::get;
# use crate::activitypub_federation::traits::Object; # use crate::activitypub_federation::traits::Object;
# use axum::headers::ContentType; # use axum_extra::headers::ContentType;
# use activitypub_federation::FEDERATION_CONTENT_TYPE; # use activitypub_federation::FEDERATION_CONTENT_TYPE;
# use axum::TypedHeader; # use axum_extra::TypedHeader;
# use axum::response::IntoResponse; # use axum::response::IntoResponse;
# use http::HeaderMap; # use http::HeaderMap;
# async fn generate_user_html(_: String, _: Data<DbConnection>) -> axum::response::Response { todo!() } # async fn generate_user_html(_: String, _: Data<DbConnection>) -> axum::response::Response { todo!() }
@ -33,11 +33,11 @@ async fn main() -> Result<(), Error> {
.route("/user/:name", get(http_get_user)) .route("/user/:name", get(http_get_user))
.layer(FederationMiddleware::new(data)); .layer(FederationMiddleware::new(data));
let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); let listener = tokio::net::TcpListener::bind("127.0.0.1:3000")
tracing::debug!("listening on {}", addr); .await
axum::Server::bind(&addr) .unwrap();
.serve(app.into_make_service()) tracing::debug!("listening on {}", listener.local_addr().unwrap());
.await?; axum::serve(listener, app).await.unwrap();
Ok(()) Ok(())
} }

View file

@ -31,7 +31,7 @@ pub struct CreatePost {
impl CreatePost { impl CreatePost {
pub async fn send(note: Note, inbox: Url, data: &Data<DatabaseHandle>) -> Result<(), Error> { pub async fn send(note: Note, inbox: Url, data: &Data<DatabaseHandle>) -> Result<(), Error> {
print!("Sending reply to {}", &note.attributed_to); print!("Sending reply to {}", &note.attributed_to);
let create = CreatePost { let create = Self {
actor: note.attributed_to.clone(), actor: note.attributed_to.clone(),
to: note.to.clone(), to: note.to.clone(),
object: note, object: note,

View file

@ -15,6 +15,6 @@ where
T: Into<anyhow::Error>, T: Into<anyhow::Error>,
{ {
fn from(t: T) -> Self { fn from(t: T) -> Self {
Error(t.into()) Self(t.into())
} }
} }

View file

@ -12,10 +12,8 @@ use axum::{
Router, Router,
}; };
use error::Error; use error::Error;
use std::{ use std::sync::{Arc, Mutex};
net::ToSocketAddrs, use tokio::net::TcpListener;
sync::{Arc, Mutex},
};
use tracing::log::{info, LevelFilter}; use tracing::log::{info, LevelFilter};
mod activities; mod activities;
@ -60,13 +58,13 @@ async fn main() -> Result<(), Error> {
.route("/.well-known/webfinger", get(webfinger)) .route("/.well-known/webfinger", get(webfinger))
.layer(FederationMiddleware::new(config)); .layer(FederationMiddleware::new(config));
let addr = BIND_ADDRESS axum::serve(
.to_socket_addrs()? TcpListener::bind(BIND_ADDRESS)
.next() .await
.expect("Failed to lookup domain name"); .expect("Failed to lookup domain name"),
axum::Server::bind(&addr) app.into_make_service(),
.serve(app.into_make_service()) )
.await?; .await?;
Ok(()) Ok(())
} }

View file

@ -35,11 +35,11 @@ pub enum PersonAcceptedActivities {
} }
impl DbUser { impl DbUser {
pub fn new(hostname: &str, name: &str) -> Result<DbUser, Error> { pub fn new(hostname: &str, name: &str) -> Result<Self, Error> {
let ap_id = Url::parse(&format!("https://{}/{}", hostname, &name))?.into(); let ap_id = Url::parse(&format!("https://{}/{}", hostname, &name))?.into();
let inbox = Url::parse(&format!("https://{}/{}/inbox", hostname, &name))?; let inbox = Url::parse(&format!("https://{}/{}/inbox", hostname, &name))?;
let keypair = generate_actor_keypair()?; let keypair = generate_actor_keypair()?;
Ok(DbUser { Ok(Self {
name: name.to_string(), name: name.to_string(),
ap_id, ap_id,
inbox, inbox,
@ -108,7 +108,7 @@ impl Object for DbUser {
json: Self::Kind, json: Self::Kind,
_data: &Data<Self::DataType>, _data: &Data<Self::DataType>,
) -> Result<Self, Self::Error> { ) -> Result<Self, Self::Error> {
Ok(DbUser { Ok(Self {
name: json.preferred_username, name: json.preferred_username,
ap_id: json.id, ap_id: json.id,
inbox: json.inbox, inbox: json.inbox,

View file

@ -84,7 +84,7 @@ impl Object for DbPost {
&json.content, &json.id &json.content, &json.id
); );
let creator = json.attributed_to.dereference(data).await?; let creator = json.attributed_to.dereference(data).await?;
let post = DbPost { let post = Self {
text: json.content, text: json.content,
ap_id: json.id.clone(), ap_id: json.id.clone(),
creator: json.attributed_to.clone(), creator: json.attributed_to.clone(),

View file

@ -9,5 +9,5 @@ pub fn generate_object_id(domain: &str) -> Result<Url, ParseError> {
.take(7) .take(7)
.map(char::from) .map(char::from)
.collect(); .collect();
Url::parse(&format!("https://{}/objects/{}", domain, id)) Url::parse(&format!("https://{domain}/objects/{id}"))
} }

View file

@ -19,8 +19,8 @@ pub struct Accept {
} }
impl Accept { impl Accept {
pub fn new(actor: ObjectId<DbUser>, object: Follow, id: Url) -> Accept { pub fn new(actor: ObjectId<DbUser>, object: Follow, id: Url) -> Self {
Accept { Self {
actor, actor,
object, object,
kind: Default::default(), kind: Default::default(),

View file

@ -26,8 +26,8 @@ pub struct CreatePost {
} }
impl CreatePost { impl CreatePost {
pub fn new(note: Note, id: Url) -> CreatePost { pub fn new(note: Note, id: Url) -> Self {
CreatePost { Self {
actor: note.attributed_to.clone(), actor: note.attributed_to.clone(),
to: note.to.clone(), to: note.to.clone(),
object: note, object: note,

View file

@ -24,8 +24,8 @@ pub struct Follow {
} }
impl Follow { impl Follow {
pub fn new(actor: ObjectId<DbUser>, object: ObjectId<DbUser>, id: Url) -> Follow { pub fn new(actor: ObjectId<DbUser>, object: ObjectId<DbUser>, id: Url) -> Self {
Follow { Self {
actor, actor,
object, object,
kind: Default::default(), kind: Default::default(),

View file

@ -1,97 +0,0 @@
use crate::{
error::Error,
instance::DatabaseHandle,
objects::person::{DbUser, PersonAcceptedActivities},
};
use activitypub_federation::{
actix_web::{inbox::receive_activity, signing_actor},
config::{Data, FederationConfig, FederationMiddleware},
fetch::webfinger::{build_webfinger_response, extract_webfinger_name},
protocol::context::WithContext,
traits::{Actor, Object},
FEDERATION_CONTENT_TYPE,
};
use actix_web::{web, web::Bytes, App, HttpRequest, HttpResponse, HttpServer};
use anyhow::anyhow;
use serde::Deserialize;
use tracing::info;
pub fn listen(config: &FederationConfig<DatabaseHandle>) -> Result<(), Error> {
let hostname = config.domain();
info!("Listening with actix-web on {hostname}");
let config = config.clone();
let server = HttpServer::new(move || {
App::new()
.wrap(FederationMiddleware::new(config.clone()))
.route("/", web::get().to(http_get_system_user))
.route("/{user}", web::get().to(http_get_user))
.route("/{user}/inbox", web::post().to(http_post_user_inbox))
.route("/.well-known/webfinger", web::get().to(webfinger))
})
.bind(hostname)?
.run();
tokio::spawn(server);
Ok(())
}
/// Handles requests to fetch system user json over HTTP
pub async fn http_get_system_user(data: Data<DatabaseHandle>) -> Result<HttpResponse, Error> {
let json_user = data.system_user.clone().into_json(&data).await?;
Ok(HttpResponse::Ok()
.content_type(FEDERATION_CONTENT_TYPE)
.json(WithContext::new_default(json_user)))
}
/// Handles requests to fetch user json over HTTP
pub async fn http_get_user(
request: HttpRequest,
user_name: web::Path<String>,
data: Data<DatabaseHandle>,
) -> Result<HttpResponse, Error> {
let signed_by = signing_actor::<DbUser>(&request, None, &data).await?;
// here, checks can be made on the actor or the domain to which
// it belongs, to verify whether it is allowed to access this resource
info!(
"Fetch user request is signed by system account {}",
signed_by.id()
);
let db_user = data.local_user();
if user_name.into_inner() == db_user.name {
let json_user = db_user.into_json(&data).await?;
Ok(HttpResponse::Ok()
.content_type(FEDERATION_CONTENT_TYPE)
.json(WithContext::new_default(json_user)))
} else {
Err(anyhow!("Invalid user").into())
}
}
/// Handles messages received in user inbox
pub async fn http_post_user_inbox(
request: HttpRequest,
body: Bytes,
data: Data<DatabaseHandle>,
) -> Result<HttpResponse, Error> {
receive_activity::<WithContext<PersonAcceptedActivities>, DbUser, DatabaseHandle>(
request, body, &data,
)
.await
}
#[derive(Deserialize)]
pub struct WebfingerQuery {
resource: String,
}
pub async fn webfinger(
query: web::Query<WebfingerQuery>,
data: Data<DatabaseHandle>,
) -> Result<HttpResponse, Error> {
let name = extract_webfinger_name(&query.resource, &data)?;
let db_user = data.read_user(name)?;
Ok(HttpResponse::Ok().json(build_webfinger_response(
query.resource.clone(),
db_user.ap_id.into_inner(),
)))
}

View file

@ -1,6 +0,0 @@
use crate::error::Error;
use actix_web::ResponseError;
pub(crate) mod http;
impl ResponseError for Error {}

View file

@ -22,10 +22,10 @@ use axum::{
}; };
use axum_macros::debug_handler; use axum_macros::debug_handler;
use serde::Deserialize; use serde::Deserialize;
use std::net::ToSocketAddrs; use tokio::net::TcpListener;
use tracing::info; use tracing::info;
pub fn listen(config: &FederationConfig<DatabaseHandle>) -> Result<(), Error> { pub async fn listen(config: &FederationConfig<DatabaseHandle>) -> Result<(), Error> {
let hostname = config.domain(); let hostname = config.domain();
info!("Listening with axum on {hostname}"); info!("Listening with axum on {hostname}");
let config = config.clone(); let config = config.clone();
@ -35,13 +35,17 @@ pub fn listen(config: &FederationConfig<DatabaseHandle>) -> Result<(), Error> {
.route("/.well-known/webfinger", get(webfinger)) .route("/.well-known/webfinger", get(webfinger))
.layer(FederationMiddleware::new(config)); .layer(FederationMiddleware::new(config));
let addr = hostname let server = axum::serve(
.to_socket_addrs()? TcpListener::bind(hostname)
.next() .await
.expect("Failed to lookup domain name"); .expect("Failed to lookup domain name"),
let server = axum::Server::bind(&addr).serve(app.into_make_service()); app.into_make_service(),
);
tokio::spawn(async move {
server.await.expect("Failed to start server");
});
tokio::spawn(server);
Ok(()) Ok(())
} }

View file

@ -15,6 +15,6 @@ where
T: Into<anyhow::Error>, T: Into<anyhow::Error>,
{ {
fn from(t: T) -> Self { fn from(t: T) -> Self {
Error(t.into()) Self(t.into())
} }
} }

View file

@ -16,7 +16,7 @@ pub async fn new_instance(
name: String, name: String,
) -> Result<FederationConfig<DatabaseHandle>, Error> { ) -> Result<FederationConfig<DatabaseHandle>, Error> {
let mut system_user = DbUser::new(hostname, "system".into())?; let mut system_user = DbUser::new(hostname, "system".into())?;
system_user.ap_id = Url::parse(&format!("http://{}/", hostname))?.into(); system_user.ap_id = Url::parse(&format!("http://{hostname}/"))?.into();
let local_user = DbUser::new(hostname, name)?; let local_user = DbUser::new(hostname, name)?;
let database = Arc::new(Database { let database = Arc::new(Database {
@ -63,7 +63,6 @@ impl UrlVerifier for MyUrlVerifier {
pub enum Webserver { pub enum Webserver {
Axum, Axum,
ActixWeb,
} }
impl FromStr for Webserver { impl FromStr for Webserver {
@ -71,20 +70,18 @@ impl FromStr for Webserver {
fn from_str(s: &str) -> Result<Self, Self::Err> { fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s { Ok(match s {
"axum" => Webserver::Axum, "axum" => Self::Axum,
"actix-web" => Webserver::ActixWeb,
_ => panic!("Invalid webserver parameter, must be either `axum` or `actix-web`"), _ => panic!("Invalid webserver parameter, must be either `axum` or `actix-web`"),
}) })
} }
} }
pub fn listen( pub async fn listen(
config: &FederationConfig<DatabaseHandle>, config: &FederationConfig<DatabaseHandle>,
webserver: &Webserver, webserver: &Webserver,
) -> Result<(), Error> { ) -> Result<(), Error> {
match webserver { match webserver {
Webserver::Axum => crate::axum::http::listen(config)?, Webserver::Axum => crate::axum::http::listen(config).await?,
Webserver::ActixWeb => crate::actix_web::http::listen(config)?,
} }
Ok(()) Ok(())
} }

View file

@ -10,8 +10,6 @@ use std::{env::args, str::FromStr};
use tracing::log::{info, LevelFilter}; use tracing::log::{info, LevelFilter};
mod activities; mod activities;
#[cfg(feature = "actix-web")]
mod actix_web;
#[cfg(feature = "axum")] #[cfg(feature = "axum")]
mod axum; mod axum;
mod error; mod error;
@ -31,13 +29,12 @@ async fn main() -> Result<(), Error> {
info!("Start with parameter `axum` or `actix-web` to select the webserver"); info!("Start with parameter `axum` or `actix-web` to select the webserver");
let webserver = args() let webserver = args()
.nth(1) .nth(1)
.map(|arg| Webserver::from_str(&arg).unwrap()) .map_or(Webserver::Axum, |arg| Webserver::from_str(&arg).unwrap());
.unwrap_or(Webserver::Axum);
let alpha = new_instance("localhost:8001", "alpha".to_string()).await?; let alpha = new_instance("localhost:8001", "alpha".to_string()).await?;
let beta = new_instance("localhost:8002", "beta".to_string()).await?; let beta = new_instance("localhost:8002", "beta".to_string()).await?;
listen(&alpha, &webserver)?; listen(&alpha, &webserver).await?;
listen(&beta, &webserver)?; listen(&beta, &webserver).await?;
info!("Local instances started"); info!("Local instances started");
info!("Alpha user follows beta user via webfinger"); info!("Alpha user follows beta user via webfinger");

View file

@ -45,11 +45,11 @@ pub enum PersonAcceptedActivities {
} }
impl DbUser { impl DbUser {
pub fn new(hostname: &str, name: String) -> Result<DbUser, Error> { pub fn new(hostname: &str, name: String) -> Result<Self, Error> {
let ap_id = Url::parse(&format!("http://{}/{}", hostname, &name))?.into(); let ap_id = Url::parse(&format!("http://{}/{}", hostname, &name))?.into();
let inbox = Url::parse(&format!("http://{}/{}/inbox", hostname, &name))?; let inbox = Url::parse(&format!("http://{}/{}/inbox", hostname, &name))?;
let keypair = generate_actor_keypair()?; let keypair = generate_actor_keypair()?;
Ok(DbUser { Ok(Self {
name, name,
ap_id, ap_id,
inbox, inbox,
@ -83,7 +83,7 @@ impl DbUser {
} }
pub async fn follow(&self, other: &str, data: &Data<DatabaseHandle>) -> Result<(), Error> { pub async fn follow(&self, other: &str, data: &Data<DatabaseHandle>) -> Result<(), Error> {
let other: DbUser = webfinger_resolve_actor(other, data).await?; let other: Self = webfinger_resolve_actor(other, data).await?;
let id = generate_object_id(data.domain())?; let id = generate_object_id(data.domain())?;
let follow = Follow::new(self.ap_id.clone(), other.ap_id.clone(), id.clone()); let follow = Follow::new(self.ap_id.clone(), other.ap_id.clone(), id.clone());
self.send(follow, vec![other.shared_inbox_or_inbox()], false, data) self.send(follow, vec![other.shared_inbox_or_inbox()], false, data)
@ -96,7 +96,7 @@ impl DbUser {
let create = CreatePost::new(post.into_json(data).await?, id.clone()); let create = CreatePost::new(post.into_json(data).await?, id.clone());
let mut inboxes = vec![]; let mut inboxes = vec![];
for f in self.followers.clone() { for f in self.followers.clone() {
let user: DbUser = ObjectId::from(f).dereference(data).await?; let user: Self = ObjectId::from(f).dereference(data).await?;
inboxes.push(user.shared_inbox_or_inbox()); inboxes.push(user.shared_inbox_or_inbox());
} }
self.send(create, inboxes, true, data).await?; self.send(create, inboxes, true, data).await?;
@ -170,7 +170,7 @@ impl Object for DbUser {
} }
async fn from_json(json: Self::Kind, data: &Data<Self::DataType>) -> Result<Self, Self::Error> { async fn from_json(json: Self::Kind, data: &Data<Self::DataType>) -> Result<Self, Self::Error> {
let user = DbUser { let user = Self {
name: json.preferred_username, name: json.preferred_username,
ap_id: json.id, ap_id: json.id,
inbox: json.inbox, inbox: json.inbox,

View file

@ -18,9 +18,9 @@ pub struct DbPost {
} }
impl DbPost { impl DbPost {
pub fn new(text: String, creator: ObjectId<DbUser>) -> Result<DbPost, Error> { pub fn new(text: String, creator: ObjectId<DbUser>) -> Result<Self, Error> {
let ap_id = generate_object_id(creator.inner().domain().unwrap())?.into(); let ap_id = generate_object_id(creator.inner().domain().unwrap())?.into();
Ok(DbPost { Ok(Self {
text, text,
ap_id, ap_id,
creator, creator,
@ -80,7 +80,7 @@ impl Object for DbPost {
} }
async fn from_json(json: Self::Kind, data: &Data<Self::DataType>) -> Result<Self, Self::Error> { async fn from_json(json: Self::Kind, data: &Data<Self::DataType>) -> Result<Self, Self::Error> {
let post = DbPost { let post = Self {
text: json.content, text: json.content,
ap_id: json.id, ap_id: json.id,
creator: json.attributed_to, creator: json.attributed_to,

View file

@ -9,5 +9,5 @@ pub fn generate_object_id(domain: &str) -> Result<Url, ParseError> {
.take(7) .take(7)
.map(char::from) .map(char::from)
.collect(); .collect();
Url::parse(&format!("http://{}/objects/{}", domain, id)) Url::parse(&format!("http://{domain}/objects/{id}"))
} }

View file

@ -424,6 +424,7 @@ mod tests {
use bytes::Bytes; use bytes::Bytes;
use http::{HeaderMap, StatusCode}; use http::{HeaderMap, StatusCode};
use std::time::Instant; use std::time::Instant;
use tokio::net::TcpListener;
use tracing::debug; use tracing::debug;
// This will periodically send back internal errors to test the retry // This will periodically send back internal errors to test the retry
@ -451,10 +452,12 @@ mod tests {
.route("/", post(dodgy_handler)) .route("/", post(dodgy_handler))
.with_state(state); .with_state(state);
axum::Server::bind(&"0.0.0.0:8002".parse().unwrap()) axum::serve(
.serve(app.into_make_service()) TcpListener::bind("0.0.0.0:8002").await.unwrap(),
.await app.into_make_service(),
.unwrap(); )
.await
.unwrap();
} }
#[tokio::test(flavor = "multi_thread")] #[tokio::test(flavor = "multi_thread")]

View file

@ -232,6 +232,7 @@ mod tests {
sync::{atomic::AtomicUsize, Arc}, sync::{atomic::AtomicUsize, Arc},
time::Instant, time::Instant,
}; };
use tokio::net::TcpListener;
use tracing::info; use tracing::info;
// This will periodically send back internal errors to test the retry // This will periodically send back internal errors to test the retry
@ -251,10 +252,12 @@ mod tests {
.route("/", post(dodgy_handler)) .route("/", post(dodgy_handler))
.with_state(state); .with_state(state);
axum::Server::bind(&"0.0.0.0:8001".parse().unwrap()) axum::serve(
.serve(app.into_make_service()) TcpListener::bind("0.0.0.0:8001").await.unwrap(),
.await app.into_make_service(),
.unwrap(); )
.await
.unwrap();
} }
#[tokio::test(flavor = "multi_thread")] #[tokio::test(flavor = "multi_thread")]

View file

@ -1,182 +0,0 @@
//! Handles incoming activities, verifying HTTP signatures and other checks
use crate::{
config::Data,
error::Error,
http_signatures::{verify_body_hash, verify_signature},
parse_received_activity,
traits::{ActivityHandler, Actor, Object},
};
use actix_web::{web::Bytes, HttpRequest, HttpResponse};
use serde::de::DeserializeOwned;
use tracing::debug;
/// Handles incoming activities, verifying HTTP signatures and other checks
///
/// After successful validation, activities are passed to respective [trait@ActivityHandler].
pub async fn receive_activity<Activity, ActorT, Datatype>(
request: HttpRequest,
body: Bytes,
data: &Data<Datatype>,
) -> Result<HttpResponse, <Activity as ActivityHandler>::Error>
where
Activity: ActivityHandler<DataType = Datatype> + DeserializeOwned + Send + 'static,
ActorT: Object<DataType = Datatype> + Actor + Send + 'static,
for<'de2> <ActorT as Object>::Kind: serde::Deserialize<'de2>,
<Activity as ActivityHandler>::Error: From<Error> + From<<ActorT as Object>::Error>,
<ActorT as Object>::Error: From<Error>,
Datatype: Clone,
{
verify_body_hash(request.headers().get("Digest"), &body)?;
let (activity, actor) = parse_received_activity::<Activity, ActorT, _>(&body, data).await?;
verify_signature(
request.headers(),
request.method(),
request.uri(),
actor.public_key_pem(),
)?;
debug!("Receiving activity {}", activity.id().to_string());
activity.verify(data).await?;
activity.receive(data).await?;
Ok(HttpResponse::Ok().finish())
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod test {
use super::*;
use crate::{
activity_sending::generate_request_headers,
config::FederationConfig,
fetch::object_id::ObjectId,
http_signatures::sign_request,
traits::tests::{DbConnection, DbUser, Follow, DB_USER_KEYPAIR},
};
use actix_web::test::TestRequest;
use reqwest::Client;
use reqwest_middleware::ClientWithMiddleware;
use serde_json::json;
use url::Url;
#[tokio::test]
async fn test_receive_activity() {
let (body, incoming_request, config) = setup_receive_test().await;
receive_activity::<Follow, DbUser, DbConnection>(
incoming_request.to_http_request(),
body,
&config.to_request_data(),
)
.await
.unwrap();
}
#[tokio::test]
async fn test_receive_activity_invalid_body_signature() {
let (_, incoming_request, config) = setup_receive_test().await;
let err = receive_activity::<Follow, DbUser, DbConnection>(
incoming_request.to_http_request(),
"invalid".into(),
&config.to_request_data(),
)
.await
.err()
.unwrap();
assert_eq!(&err, &Error::ActivityBodyDigestInvalid)
}
#[tokio::test]
async fn test_receive_activity_invalid_path() {
let (body, incoming_request, config) = setup_receive_test().await;
let incoming_request = incoming_request.uri("/wrong");
let err = receive_activity::<Follow, DbUser, DbConnection>(
incoming_request.to_http_request(),
body,
&config.to_request_data(),
)
.await
.err()
.unwrap();
assert_eq!(&err, &Error::ActivitySignatureInvalid)
}
#[tokio::test]
async fn test_receive_unparseable_activity() {
let (_, _, config) = setup_receive_test().await;
let actor = Url::parse("http://ds9.lemmy.ml/u/lemmy_alpha").unwrap();
let id = "http://localhost:123/1";
let activity = json!({
"actor": actor.as_str(),
"to": ["https://www.w3.org/ns/activitystreams#Public"],
"object": "http://ds9.lemmy.ml/post/1",
"cc": ["http://enterprise.lemmy.ml/c/main"],
"type": "Delete",
"id": id
}
);
let body: Bytes = serde_json::to_vec(&activity).unwrap().into();
let incoming_request = construct_request(&body, &actor).await;
// intentionally cause a parse error by using wrong type for deser
let res = receive_activity::<Follow, DbUser, DbConnection>(
incoming_request.to_http_request(),
body,
&config.to_request_data(),
)
.await;
match res {
Err(Error::ParseReceivedActivity(_, url)) => {
assert_eq!(id, url.expect("has url").as_str());
}
_ => unreachable!(),
}
}
async fn construct_request(body: &Bytes, actor: &Url) -> TestRequest {
let inbox = "https://example.com/inbox";
let headers = generate_request_headers(&Url::parse(inbox).unwrap());
let request_builder = ClientWithMiddleware::from(Client::default())
.post(inbox)
.headers(headers);
let outgoing_request = sign_request(
request_builder,
actor,
body.clone(),
DB_USER_KEYPAIR.private_key().unwrap(),
false,
)
.await
.unwrap();
let mut incoming_request = TestRequest::post().uri(outgoing_request.url().path());
for h in outgoing_request.headers() {
incoming_request = incoming_request.append_header(h);
}
incoming_request
}
async fn setup_receive_test() -> (Bytes, TestRequest, FederationConfig<DbConnection>) {
let activity = Follow {
actor: ObjectId::parse("http://localhost:123").unwrap(),
object: ObjectId::parse("http://localhost:124").unwrap(),
kind: Default::default(),
id: "http://localhost:123/1".try_into().unwrap(),
};
let body: Bytes = serde_json::to_vec(&activity).unwrap().into();
let incoming_request = construct_request(&body, activity.actor.inner()).await;
let config = FederationConfig::builder()
.domain("localhost:8002")
.app_data(DbConnection)
.debug(true)
.build()
.await
.unwrap();
(body, incoming_request, config)
}
}

View file

@ -1,76 +0,0 @@
use crate::config::{Data, FederationConfig, FederationMiddleware};
use actix_web::{
dev::{forward_ready, Payload, Service, ServiceRequest, ServiceResponse, Transform},
Error,
FromRequest,
HttpMessage,
HttpRequest,
};
use std::future::{ready, Ready};
impl<S, B, T> Transform<S, ServiceRequest> for FederationMiddleware<T>
where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
S::Future: 'static,
B: 'static,
T: Clone + Sync + 'static,
{
type Response = ServiceResponse<B>;
type Error = Error;
type Transform = FederationService<S, T>;
type InitError = ();
type Future = Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future {
ready(Ok(FederationService {
service,
config: self.0.clone(),
}))
}
}
/// Passes [FederationConfig] to HTTP handlers, converting it to [Data] in the process
#[doc(hidden)]
pub struct FederationService<S, T: Clone>
where
S: Service<ServiceRequest, Error = Error>,
S::Future: 'static,
T: Sync,
{
service: S,
config: FederationConfig<T>,
}
impl<S, B, T> Service<ServiceRequest> for FederationService<S, T>
where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
S::Future: 'static,
B: 'static,
T: Clone + Sync + 'static,
{
type Response = ServiceResponse<B>;
type Error = Error;
type Future = S::Future;
forward_ready!(service);
fn call(&self, req: ServiceRequest) -> Self::Future {
req.extensions_mut().insert(self.config.clone());
self.service.call(req)
}
}
impl<T: Clone + 'static> FromRequest for Data<T> {
type Error = Error;
type Future = Ready<Result<Self, Self::Error>>;
fn from_request(req: &HttpRequest, _payload: &mut Payload) -> Self::Future {
ready(match req.extensions().get::<FederationConfig<T>>() {
Some(c) => Ok(c.to_request_data()),
None => Err(actix_web::error::ErrorBadRequest(
"Missing extension, did you register FederationMiddleware?",
)),
})
}
}

View file

@ -1,31 +0,0 @@
//! Utilities for using this library with actix-web framework
pub mod inbox;
#[doc(hidden)]
pub mod middleware;
use crate::{
config::Data,
error::Error,
http_signatures::{self, verify_body_hash},
traits::{Actor, Object},
};
use actix_web::{web::Bytes, HttpRequest};
use serde::Deserialize;
/// Checks whether the request is signed by an actor of type A, and returns
/// the actor in question if a valid signature is found.
pub async fn signing_actor<A>(
request: &HttpRequest,
body: Option<Bytes>,
data: &Data<<A as Object>::DataType>,
) -> Result<A, <A as Object>::Error>
where
A: Object + Actor,
<A as Object>::Error: From<Error>,
for<'de2> <A as Object>::Kind: Deserialize<'de2>,
{
verify_body_hash(request.headers().get("Digest"), &body.unwrap_or_default())?;
http_signatures::signing_actor(request.headers(), request.method(), request.uri(), data).await
}

View file

@ -11,9 +11,9 @@ use crate::{
}; };
use axum::{ use axum::{
async_trait, async_trait,
body::{Bytes, HttpBody}, body::{Body, HttpBody},
extract::FromRequest, extract::FromRequest,
http::{Request, StatusCode}, http::StatusCode,
response::{IntoResponse, Response}, response::{IntoResponse, Response},
}; };
use http::{HeaderMap, Method, Uri}; use http::{HeaderMap, Method, Uri};
@ -59,21 +59,23 @@ pub struct ActivityData {
} }
#[async_trait] #[async_trait]
impl<S, B> FromRequest<S, B> for ActivityData impl<S> FromRequest<S> for ActivityData
where where
Bytes: FromRequest<S, B>, Body: HttpBody + Send + 'static,
B: HttpBody + Send + 'static,
S: Send + Sync, S: Send + Sync,
<B as HttpBody>::Error: std::fmt::Display, <axum::body::Body as HttpBody>::Error: std::fmt::Display,
<B as HttpBody>::Data: Send, <axum::body::Body as HttpBody>::Data: Send,
{ {
type Rejection = Response; type Rejection = Response;
async fn from_request(req: Request<B>, _state: &S) -> Result<Self, Self::Rejection> { async fn from_request(
req: axum::extract::Request,
_state: &S,
) -> Result<Self, Self::Rejection> {
let (parts, body) = req.into_parts(); let (parts, body) = req.into_parts();
// this wont work if the body is an long running stream // this wont work if the body is an long running stream
let bytes = hyper::body::to_bytes(body) let bytes = axum::body::to_bytes(body, usize::MAX)
.await .await
.map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()).into_response())?; .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()).into_response())?;

View file

@ -2,7 +2,6 @@
//! //!
//! Signature creation and verification is handled internally in the library. See //! Signature creation and verification is handled internally in the library. See
//! [send_activity](crate::activity_sending::SendActivityTask::sign_and_send) and //! [send_activity](crate::activity_sending::SendActivityTask::sign_and_send) and
//! [receive_activity (actix-web)](crate::actix_web::inbox::receive_activity) /
//! [receive_activity (axum)](crate::axum::inbox::receive_activity). //! [receive_activity (axum)](crate::axum::inbox::receive_activity).
use crate::{ use crate::{
@ -139,6 +138,7 @@ where
/// from any actor of type A, and returns that actor if a valid signature is found. /// from any actor of type A, and returns that actor if a valid signature is found.
/// This function will return an `Err` variant when no signature is found /// This function will return an `Err` variant when no signature is found
/// or if the signature could not be verified. /// or if the signature could not be verified.
#[allow(dead_code)]
pub(crate) async fn signing_actor<'a, A, H>( pub(crate) async fn signing_actor<'a, A, H>(
headers: H, headers: H,
method: &Method, method: &Method,
@ -230,10 +230,12 @@ struct DigestPart {
#[allow(dead_code)] #[allow(dead_code)]
pub algorithm: String, pub algorithm: String,
/// The hashsum /// The hashsum
#[allow(dead_code)]
pub digest: String, pub digest: String,
} }
impl DigestPart { impl DigestPart {
#[allow(dead_code)]
fn try_from_header(h: &HeaderValue) -> Option<Vec<DigestPart>> { fn try_from_header(h: &HeaderValue) -> Option<Vec<DigestPart>> {
let h = h.to_str().ok()?.split(';').next()?; let h = h.to_str().ok()?.split(';').next()?;
let v: Vec<_> = h let v: Vec<_> = h
@ -258,6 +260,7 @@ impl DigestPart {
} }
/// Verify body of an inbox request against the hash provided in `Digest` header. /// Verify body of an inbox request against the hash provided in `Digest` header.
#[allow(dead_code)]
pub(crate) fn verify_body_hash( pub(crate) fn verify_body_hash(
digest_header: Option<&HeaderValue>, digest_header: Option<&HeaderValue>,
body: &[u8], body: &[u8],

View file

@ -9,11 +9,10 @@
#![doc = include_str!("../docs/09_sending_activities.md")] #![doc = include_str!("../docs/09_sending_activities.md")]
#![doc = include_str!("../docs/10_fetching_objects_with_unknown_type.md")] #![doc = include_str!("../docs/10_fetching_objects_with_unknown_type.md")]
#![deny(missing_docs)] #![deny(missing_docs)]
#![warn(clippy::all)]
pub mod activity_queue; pub mod activity_queue;
pub mod activity_sending; pub mod activity_sending;
#[cfg(feature = "actix-web")]
pub mod actix_web;
#[cfg(feature = "axum")] #[cfg(feature = "axum")]
pub mod axum; pub mod axum;
pub mod config; pub mod config;