refactor: update dependencies (#2)

* chore(deps): bump reqwest

* chore(deps): bump http

* chore(deps): bump http-signature-normalization-reqwest

* chore(deps): bump hyper

* chore(deps): bump axum

* chore(deps): use axum-extra

* refactor(tests): use axum::serve

* fix: axum inbox

Co-Authored-By: j0 <me@j0.lol>

* fix(examples): use axum::serve

* chore(ci): add doc test

* fix: docs test

* fix(tests): fix port

* fix(examples): use tokio::spawn

* chore(examples): remove actix-web code

---------

Co-authored-by: j0 <me@j0.lol>
This commit is contained in:
藍+85CD 2024-07-25 19:40:57 +08:00 committed by GitHub
parent 03569d5a56
commit 5fc5546aec
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 75 additions and 163 deletions

View file

@ -20,4 +20,5 @@ jobs:
# - run: cargo fmt -- --check
- run: cargo clippy --all-targets --all-features
- run: cargo test --all-features --no-fail-fast
- run: cargo doc --all-features
- run: cargo run --example local_federation axum

View file

@ -10,7 +10,13 @@ documentation = "https://docs.rs/activitypub_federation/"
[features]
default = ["axum"]
axum = ["dep:axum", "dep:tower", "dep:hyper", "dep:http-body-util"]
axum = [
"dep:axum",
"dep:axum-extra",
"dep:tower",
"dep:hyper",
"dep:http-body-util",
]
diesel = ["dep:diesel"]
[lints.rust]
@ -36,18 +42,18 @@ serde = { version = "1.0.204", features = ["derive"] }
async-trait = "0.1.81"
url = { version = "2.5.2", features = ["serde"] }
serde_json = { version = "1.0.120", features = ["preserve_order"] }
reqwest = { version = "0.11.27", default-features = false, features = [
reqwest = { version = "0.12.5", default-features = false, features = [
"json",
"stream",
"rustls-tls",
] }
reqwest-middleware = "0.2.5"
reqwest-middleware = "0.3.2"
tracing = "0.1.40"
base64 = "0.22.1"
rand = "0.8.5"
rsa = "0.9.6"
once_cell = "1.19.0"
http = "0.2.12"
http = "1.1.0"
sha2 = { version = "0.10.8", features = ["oid"] }
thiserror = "1.0.62"
derive_builder = "0.20.0"
@ -55,7 +61,7 @@ itertools = "0.13.0"
dyn-clone = "1.0.17"
enum_delegate = "0.2.0"
httpdate = "1.0.3"
http-signature-normalization-reqwest = { version = "0.10.0", default-features = false, features = [
http-signature-normalization-reqwest = { version = "0.12.0", default-features = false, features = [
"sha-2",
"middleware",
"default-spawner",
@ -82,24 +88,24 @@ futures = "0.3.30"
moka = { version = "0.12.8", features = ["future"] }
# Axum
axum = { version = "0.6.20", features = [
axum = { version = "0.7.5", features = [
"json",
"headers",
], default-features = false, optional = true }
axum-extra = { version = "0.9.3", features = ["typed-header"], optional = true }
tower = { version = "0.4.13", optional = true }
hyper = { version = "0.14", optional = true }
hyper = { version = "1.4.1", optional = true }
http-body-util = { version = "0.1.2", optional = true }
[dev-dependencies]
anyhow = "1.0.86"
env_logger = "0.11.3"
tower-http = { version = "0.5.2", features = ["map-request-body", "util"] }
axum = { version = "0.6.20", features = [
axum = { version = "0.7.5", features = [
"http1",
"tokio",
"query",
], default-features = false }
axum-macros = "0.3.8"
axum-macros = "0.4.1"
tokio = { version = "1.38.0", features = ["full"] }
[profile.dev]

View file

@ -15,9 +15,9 @@ The next step is to allow other servers to fetch our actors and objects. For thi
# use activitypub_federation::config::FederationMiddleware;
# use axum::routing::get;
# use crate::activitypub_federation::traits::Object;
# use axum::headers::ContentType;
# use axum_extra::headers::ContentType;
# use activitypub_federation::FEDERATION_CONTENT_TYPE;
# use axum::TypedHeader;
# use axum_extra::TypedHeader;
# use axum::response::IntoResponse;
# use http::HeaderMap;
# async fn generate_user_html(_: String, _: Data<DbConnection>) -> axum::response::Response { todo!() }
@ -33,11 +33,11 @@ async fn main() -> Result<(), Error> {
.route("/user/:name", get(http_get_user))
.layer(FederationMiddleware::new(data));
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
tracing::debug!("listening on {}", addr);
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await?;
let listener = tokio::net::TcpListener::bind("127.0.0.1:3000")
.await
.unwrap();
tracing::debug!("listening on {}", listener.local_addr().unwrap());
axum::serve(listener, app).await.unwrap();
Ok(())
}

View file

@ -12,10 +12,8 @@ use axum::{
Router,
};
use error::Error;
use std::{
net::ToSocketAddrs,
sync::{Arc, Mutex},
};
use std::sync::{Arc, Mutex};
use tokio::net::TcpListener;
use tracing::log::{info, LevelFilter};
mod activities;
@ -60,13 +58,13 @@ async fn main() -> Result<(), Error> {
.route("/.well-known/webfinger", get(webfinger))
.layer(FederationMiddleware::new(config));
let addr = BIND_ADDRESS
.to_socket_addrs()?
.next()
.expect("Failed to lookup domain name");
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await?;
axum::serve(
TcpListener::bind(BIND_ADDRESS)
.await
.expect("Failed to lookup domain name"),
app.into_make_service(),
)
.await?;
Ok(())
}

View file

@ -1,97 +0,0 @@
use crate::{
error::Error,
instance::DatabaseHandle,
objects::person::{DbUser, PersonAcceptedActivities},
};
use activitypub_federation::{
actix_web::{inbox::receive_activity, signing_actor},
config::{Data, FederationConfig, FederationMiddleware},
fetch::webfinger::{build_webfinger_response, extract_webfinger_name},
protocol::context::WithContext,
traits::{Actor, Object},
FEDERATION_CONTENT_TYPE,
};
use actix_web::{web, web::Bytes, App, HttpRequest, HttpResponse, HttpServer};
use anyhow::anyhow;
use serde::Deserialize;
use tracing::info;
pub fn listen(config: &FederationConfig<DatabaseHandle>) -> Result<(), Error> {
let hostname = config.domain();
info!("Listening with actix-web on {hostname}");
let config = config.clone();
let server = HttpServer::new(move || {
App::new()
.wrap(FederationMiddleware::new(config.clone()))
.route("/", web::get().to(http_get_system_user))
.route("/{user}", web::get().to(http_get_user))
.route("/{user}/inbox", web::post().to(http_post_user_inbox))
.route("/.well-known/webfinger", web::get().to(webfinger))
})
.bind(hostname)?
.run();
tokio::spawn(server);
Ok(())
}
/// Handles requests to fetch system user json over HTTP
pub async fn http_get_system_user(data: Data<DatabaseHandle>) -> Result<HttpResponse, Error> {
let json_user = data.system_user.clone().into_json(&data).await?;
Ok(HttpResponse::Ok()
.content_type(FEDERATION_CONTENT_TYPE)
.json(WithContext::new_default(json_user)))
}
/// Handles requests to fetch user json over HTTP
pub async fn http_get_user(
request: HttpRequest,
user_name: web::Path<String>,
data: Data<DatabaseHandle>,
) -> Result<HttpResponse, Error> {
let signed_by = signing_actor::<DbUser>(&request, None, &data).await?;
// here, checks can be made on the actor or the domain to which
// it belongs, to verify whether it is allowed to access this resource
info!(
"Fetch user request is signed by system account {}",
signed_by.id()
);
let db_user = data.local_user();
if user_name.into_inner() == db_user.name {
let json_user = db_user.into_json(&data).await?;
Ok(HttpResponse::Ok()
.content_type(FEDERATION_CONTENT_TYPE)
.json(WithContext::new_default(json_user)))
} else {
Err(anyhow!("Invalid user").into())
}
}
/// Handles messages received in user inbox
pub async fn http_post_user_inbox(
request: HttpRequest,
body: Bytes,
data: Data<DatabaseHandle>,
) -> Result<HttpResponse, Error> {
receive_activity::<WithContext<PersonAcceptedActivities>, DbUser, DatabaseHandle>(
request, body, &data,
)
.await
}
#[derive(Deserialize)]
pub struct WebfingerQuery {
resource: String,
}
pub async fn webfinger(
query: web::Query<WebfingerQuery>,
data: Data<DatabaseHandle>,
) -> Result<HttpResponse, Error> {
let name = extract_webfinger_name(&query.resource, &data)?;
let db_user = data.read_user(name)?;
Ok(HttpResponse::Ok().json(build_webfinger_response(
query.resource.clone(),
db_user.ap_id.into_inner(),
)))
}

View file

@ -1,6 +0,0 @@
use crate::error::Error;
use actix_web::ResponseError;
pub(crate) mod http;
impl ResponseError for Error {}

View file

@ -17,15 +17,14 @@ use axum::{
extract::{Path, Query},
response::IntoResponse,
routing::{get, post},
Json,
Router,
Json, Router,
};
use axum_macros::debug_handler;
use serde::Deserialize;
use std::net::ToSocketAddrs;
use tokio::net::TcpListener;
use tracing::info;
pub fn listen(config: &FederationConfig<DatabaseHandle>) -> Result<(), Error> {
pub async fn listen(config: &FederationConfig<DatabaseHandle>) -> Result<(), Error> {
let hostname = config.domain();
info!("Listening with axum on {hostname}");
let config = config.clone();
@ -35,13 +34,17 @@ pub fn listen(config: &FederationConfig<DatabaseHandle>) -> Result<(), Error> {
.route("/.well-known/webfinger", get(webfinger))
.layer(FederationMiddleware::new(config));
let addr = hostname
.to_socket_addrs()?
.next()
.expect("Failed to lookup domain name");
let server = axum::Server::bind(&addr).serve(app.into_make_service());
let server = axum::serve(
TcpListener::bind(hostname)
.await
.expect("Failed to lookup domain name"),
app.into_make_service(),
);
tokio::spawn(async move {
server.await.expect("Failed to start server");
});
tokio::spawn(server);
Ok(())
}

View file

@ -76,13 +76,12 @@ impl FromStr for Webserver {
}
}
pub fn listen(
pub async fn listen(
config: &FederationConfig<DatabaseHandle>,
webserver: &Webserver,
) -> Result<(), Error> {
match webserver {
Webserver::Axum => crate::axum::http::listen(config)?,
// Webserver::ActixWeb => crate::actix_web::http::listen(config)?,
Webserver::Axum => crate::axum::http::listen(config).await?,
}
Ok(())
}

View file

@ -34,8 +34,8 @@ async fn main() -> Result<(), Error> {
let alpha = new_instance("localhost:8001", "alpha".to_string()).await?;
let beta = new_instance("localhost:8002", "beta".to_string()).await?;
listen(&alpha, &webserver)?;
listen(&beta, &webserver)?;
listen(&alpha, &webserver).await?;
listen(&beta, &webserver).await?;
info!("Local instances started");
info!("Alpha user follows beta user via webfinger");

View file

@ -424,6 +424,7 @@ mod tests {
use bytes::Bytes;
use http::{HeaderMap, StatusCode};
use std::time::Instant;
use tokio::net::TcpListener;
use tracing::debug;
// This will periodically send back internal errors to test the retry
@ -451,10 +452,12 @@ mod tests {
.route("/", post(dodgy_handler))
.with_state(state);
axum::Server::bind(&"0.0.0.0:8002".parse().unwrap())
.serve(app.into_make_service())
.await
.unwrap();
axum::serve(
TcpListener::bind("0.0.0.0:8002").await.unwrap(),
app.into_make_service(),
)
.await
.unwrap();
}
#[tokio::test(flavor = "multi_thread")]

View file

@ -232,6 +232,7 @@ mod tests {
sync::{atomic::AtomicUsize, Arc},
time::Instant,
};
use tokio::net::TcpListener;
use tracing::info;
// This will periodically send back internal errors to test the retry
@ -251,10 +252,12 @@ mod tests {
.route("/", post(dodgy_handler))
.with_state(state);
axum::Server::bind(&"0.0.0.0:8001".parse().unwrap())
.serve(app.into_make_service())
.await
.unwrap();
axum::serve(
TcpListener::bind("0.0.0.0:8001").await.unwrap(),
app.into_make_service(),
)
.await
.unwrap();
}
#[tokio::test(flavor = "multi_thread")]

View file

@ -11,9 +11,9 @@ use crate::{
};
use axum::{
async_trait,
body::{Bytes, HttpBody},
body::{Body, HttpBody},
extract::FromRequest,
http::{Request, StatusCode},
http::StatusCode,
response::{IntoResponse, Response},
};
use http::{HeaderMap, Method, Uri};
@ -59,21 +59,23 @@ pub struct ActivityData {
}
#[async_trait]
impl<S, B> FromRequest<S, B> for ActivityData
impl<S> FromRequest<S> for ActivityData
where
Bytes: FromRequest<S, B>,
B: HttpBody + Send + 'static,
Body: HttpBody + Send + 'static,
S: Send + Sync,
<B as HttpBody>::Error: std::fmt::Display,
<B as HttpBody>::Data: Send,
<axum::body::Body as HttpBody>::Error: std::fmt::Display,
<axum::body::Body as HttpBody>::Data: Send,
{
type Rejection = Response;
async fn from_request(req: Request<B>, _state: &S) -> Result<Self, Self::Rejection> {
async fn from_request(
req: axum::extract::Request,
_state: &S,
) -> Result<Self, Self::Rejection> {
let (parts, body) = req.into_parts();
// this wont work if the body is an long running stream
let bytes = hyper::body::to_bytes(body)
let bytes = axum::body::to_bytes(body, usize::MAX)
.await
.map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()).into_response())?;