Compare commits

...

11 commits

Author SHA1 Message Date
Felix Ableitner
8c7f99f8b9 ci 2026-03-16 16:19:55 +01:00
Felix Ableitner
3866637de7 fix deps 2026-03-16 16:19:26 +01:00
Felix Ableitner
6bbb2e8775 replace 2026-03-16 16:16:45 +01:00
Felix Ableitner
9430bab591 add lockfile 2026-03-16 16:15:05 +01:00
Felix Ableitner
7c20b6e567 [0.5] Add IP checks 2026-03-16 15:55:33 +01:00
Nutomic
eca8f0fc6f Improve error message, allow local IP federation via env var (#158)
* Improve error message, allow local IP federation via env var (fixes #152)

* fix
2026-03-16 15:53:12 +01:00
Nutomic
d1f4da4198 Log warning if activity sending is slow (#127) 2026-03-16 15:46:18 +01:00
Felix Ableitner
df61c72344 Version 0.5.10 2025-02-03 21:10:01 +01:00
Nutomic
c90044f708 Add more url validation (#134)
* Add more url validation

* fix

* more fix

* Verify url after redirect

* Dont allow redirect for webfinger

* clippy

* more domain validation

* clippy

* fix lemmy test

* Remove trailing . from domain

* clippy

* fix

* manual redirect handling

* clippy

* prevent infinite recursion

* add timeout, comment
2025-02-03 21:09:47 +01:00
Felix Ableitner
3e4d54778c Version 0.5.9 2024-09-13 16:10:41 +02:00
Nutomic
c4b24bd201 If id of fetched object doesnt match url, refetch it (#126) 2024-09-13 16:10:25 +02:00
9 changed files with 3120 additions and 22 deletions

1
.gitignore vendored
View file

@ -1,6 +1,5 @@
/target /target
/.idea /.idea
/Cargo.lock
perf.data* perf.data*
flamegraph.svg flamegraph.svg

2970
Cargo.lock generated Normal file

File diff suppressed because it is too large Load diff

View file

@ -1,6 +1,6 @@
[package] [package]
name = "activitypub_federation" name = "activitypub_federation"
version = "0.5.8" version = "0.5.10"
edition = "2021" edition = "2021"
description = "High-level Activitypub framework" description = "High-level Activitypub framework"
keywords = ["activitypub", "activitystreams", "federation", "fediverse"] keywords = ["activitypub", "activitystreams", "federation", "fediverse"]

View file

@ -7,6 +7,7 @@ use crate::{
}; };
use error::Error; use error::Error;
use std::{env::args, str::FromStr}; use std::{env::args, str::FromStr};
use tokio::try_join;
use tracing::log::{info, LevelFilter}; use tracing::log::{info, LevelFilter};
mod activities; mod activities;
@ -34,8 +35,10 @@ async fn main() -> Result<(), Error> {
.map(|arg| Webserver::from_str(&arg).unwrap()) .map(|arg| Webserver::from_str(&arg).unwrap())
.unwrap_or(Webserver::Axum); .unwrap_or(Webserver::Axum);
let alpha = new_instance("localhost:8001", "alpha".to_string()).await?; let (alpha, beta) = try_join!(
let beta = new_instance("localhost:8002", "beta".to_string()).await?; new_instance("localhost:8001", "alpha".to_string()),
new_instance("localhost:8002", "beta".to_string())
)?;
listen(&alpha, &webserver)?; listen(&alpha, &webserver)?;
listen(&beta, &webserver)?; listen(&beta, &webserver)?;
info!("Local instances started"); info!("Local instances started");

View file

@ -24,9 +24,9 @@ use rsa::{pkcs8::DecodePrivateKey, RsaPrivateKey};
use serde::Serialize; use serde::Serialize;
use std::{ use std::{
fmt::{Debug, Display}, fmt::{Debug, Display},
time::{Duration, SystemTime}, time::{Duration, Instant, SystemTime},
}; };
use tracing::debug; use tracing::{debug, warn};
use url::Url; use url::Url;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -92,7 +92,17 @@ impl SendActivityTask {
self.http_signature_compat, self.http_signature_compat,
) )
.await?; .await?;
// Send the activity, and log a warning if its too slow.
let now = Instant::now();
let response = client.execute(request).await?; let response = client.execute(request).await?;
let elapsed = now.elapsed().as_secs();
if elapsed > 10 {
warn!(
"Sending activity {} to {} took {}s",
self.activity_id, self.inbox, elapsed
);
}
self.handle_response(response).await self.handle_response(response).await
} }

View file

@ -23,11 +23,16 @@ use crate::{
use async_trait::async_trait; use async_trait::async_trait;
use derive_builder::Builder; use derive_builder::Builder;
use dyn_clone::{clone_trait_object, DynClone}; use dyn_clone::{clone_trait_object, DynClone};
use itertools::Itertools;
use moka::future::Cache; use moka::future::Cache;
use once_cell::sync::Lazy;
use regex::Regex;
use reqwest::{redirect::Policy, Client};
use reqwest_middleware::ClientWithMiddleware; use reqwest_middleware::ClientWithMiddleware;
use rsa::{pkcs8::DecodePrivateKey, RsaPrivateKey}; use rsa::{pkcs8::DecodePrivateKey, RsaPrivateKey};
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use std::{ use std::{
net::{IpAddr, Ipv4Addr, Ipv6Addr},
ops::Deref, ops::Deref,
sync::{ sync::{
atomic::{AtomicU32, Ordering}, atomic::{AtomicU32, Ordering},
@ -35,6 +40,7 @@ use std::{
}, },
time::Duration, time::Duration,
}; };
use tokio::net::lookup_host;
use url::Url; use url::Url;
/// Configuration for this library, with various federation related settings /// Configuration for this library, with various federation related settings
@ -51,9 +57,14 @@ pub struct FederationConfig<T: Clone> {
/// [crate::fetch::object_id::ObjectId] for more details. /// [crate::fetch::object_id::ObjectId] for more details.
#[builder(default = "20")] #[builder(default = "20")]
pub(crate) http_fetch_limit: u32, pub(crate) http_fetch_limit: u32,
#[builder(default = "reqwest::Client::default().into()")] #[builder(default = "default_client()")]
/// HTTP client used for all outgoing requests. Middleware can be used to add functionality /// HTTP client used for all outgoing requests. When passing a custom client here you should
/// like log tracing or retry of failed requests. /// also disable redirects and set timeouts.
///
/// Middleware can be used to add functionality like log tracing or retry of failed requests.
/// Redirects are disabled by default, because automatic redirect URLs can't be validated.
/// Instead a single redirect is handled manually. The default client sets a timeout of 10s
/// to avoid excessive resource usage when connecting to dead servers.
pub(crate) client: ClientWithMiddleware, pub(crate) client: ClientWithMiddleware,
/// Run library in debug mode. This allows usage of http and localhost urls. It also sends /// Run library in debug mode. This allows usage of http and localhost urls. It also sends
/// outgoing activities synchronously, not in background thread. This helps to make tests /// outgoing activities synchronously, not in background thread. This helps to make tests
@ -102,6 +113,9 @@ pub struct FederationConfig<T: Clone> {
pub(crate) queue_retry_count: usize, pub(crate) queue_retry_count: usize,
} }
pub(crate) static DOMAIN_REGEX: Lazy<Regex> =
Lazy::new(|| Regex::new(r"^[a-zA-Z0-9.-]*$").expect("compile regex"));
impl<T: Clone> FederationConfig<T> { impl<T: Clone> FederationConfig<T> {
/// Returns a new config builder with default values. /// Returns a new config builder with default values.
pub fn builder() -> FederationConfigBuilder<T> { pub fn builder() -> FederationConfigBuilder<T> {
@ -156,17 +170,45 @@ impl<T: Clone> FederationConfig<T> {
return Ok(()); return Ok(());
} }
if url.domain().is_none() { let Some(domain) = url.domain() else {
return Err(Error::UrlVerificationError("Url must have a domain")); return Err(Error::UrlVerificationError("Url must have a domain"));
};
if !DOMAIN_REGEX.is_match(domain) {
return Err(Error::UrlVerificationError("Invalid characters in domain"));
} }
if url.domain() == Some("localhost") && !self.debug { // Extra checks only for production mode
return Err(Error::UrlVerificationError( if !self.debug {
"Localhost is only allowed in debug mode", if url.port().is_some() {
)); return Err(Error::UrlVerificationError("Explicit port is not allowed"));
}
// Resolve domain and see if it points to private IP
// TODO: Use is_global() once stabilized
// https://doc.rust-lang.org/std/net/enum.IpAddr.html#method.is_global
let mut ips = lookup_host((domain.to_owned(), 80)).await?;
let allow_local = std::env::var("DANGER_FEDERATION_ALLOW_LOCAL_IP").is_ok();
let invalid_ip = !allow_local
&& ips.any(|addr| match addr.ip().to_canonical() {
IpAddr::V4(addr) => v4_is_invalid(addr),
IpAddr::V6(addr) => v6_is_invalid(addr),
});
if invalid_ip {
let ip_addrs = ips.join(", ");
return Err(Error::DomainResolveError(domain.to_string(), ip_addrs));
}
} }
self.url_verifier.verify(url).await?; // It is valid but uncommon for domains to end with `.` char. Drop this so it cant be used
// to bypass domain blocklist. Avoid cloning url in common case.
if domain.ends_with('.') {
let mut url = url.clone();
let domain = &domain[0..domain.len() - 1];
url.set_host(Some(domain))?;
self.url_verifier.verify(&url).await?;
} else {
self.url_verifier.verify(url).await?;
}
Ok(()) Ok(())
} }
@ -193,6 +235,30 @@ impl<T: Clone> FederationConfig<T> {
} }
} }
fn v4_is_invalid(v4: Ipv4Addr) -> bool {
v4.is_private()
|| v4.is_loopback()
|| v4.is_link_local()
|| v4.is_multicast()
|| v4.is_documentation()
|| v4.is_unspecified()
|| v4.is_broadcast()
}
fn v6_is_invalid(v6: Ipv6Addr) -> bool {
let is_documentation = matches!(
v6.segments(),
[0x2001, 0xdb8, ..] | [0x3fff, 0..=0x0fff, ..]
);
is_documentation
|| v6.is_loopback()
|| v6.is_multicast()
|| (v6.segments()[0] & 0xfe00) == 0xfc00 // is_unique_local
|| (v6.segments()[0] & 0xffc0) == 0xfe80 // is_unicast_link_local
|| v6.is_unspecified()
|| v6.to_ipv4_mapped().is_some_and(v4_is_invalid)
}
impl<T: Clone> FederationConfigBuilder<T> { impl<T: Clone> FederationConfigBuilder<T> {
/// Sets an actor to use to sign all federated fetch requests /// Sets an actor to use to sign all federated fetch requests
pub fn signed_fetch_actor<A: Actor>(&mut self, actor: &A) -> &mut Self { pub fn signed_fetch_actor<A: Actor>(&mut self, actor: &A) -> &mut Self {
@ -348,6 +414,17 @@ impl<T: Clone> FederationMiddleware<T> {
} }
} }
fn default_client() -> ClientWithMiddleware {
let timeout = Duration::from_secs(10);
Client::builder()
.redirect(Policy::none())
.timeout(timeout)
.connect_timeout(timeout)
.build()
.unwrap_or_else(|_| Client::default())
.into()
}
#[cfg(test)] #[cfg(test)]
#[allow(clippy::unwrap_used)] #[allow(clippy::unwrap_used)]
mod test { mod test {

View file

@ -28,6 +28,9 @@ pub enum Error {
/// url verification error /// url verification error
#[error("URL failed verification: {0}")] #[error("URL failed verification: {0}")]
UrlVerificationError(&'static str), UrlVerificationError(&'static str),
/// Resolving domain points to local IP.
#[error("Resolving domain {0} points to local IP {1}. This may indicate an attacker attempting to access internal services. If intentional, you can ignore this error by setting DANGER_FEDERATION_ALLOW_LOCAL_IP=1")]
DomainResolveError(String, String),
/// Incoming activity has invalid digest for body /// Incoming activity has invalid digest for body
#[error("Incoming activity has invalid digest for body")] #[error("Incoming activity has invalid digest for body")]
ActivityBodyDigestInvalid, ActivityBodyDigestInvalid,
@ -78,6 +81,9 @@ pub enum Error {
/// Attempted to fetch object but the response's id field doesn't match /// Attempted to fetch object but the response's id field doesn't match
#[error("Attempted to fetch object from {0} but the response's id field doesn't match")] #[error("Attempted to fetch object from {0} but the response's id field doesn't match")]
FetchWrongId(Url), FetchWrongId(Url),
/// I/O error from OS
#[error(transparent)]
IoError(#[from] std::io::Error),
/// Other generic errors /// Other generic errors
#[error("{0}")] #[error("{0}")]
Other(String), Other(String),

View file

@ -11,7 +11,7 @@ use crate::{
FEDERATION_CONTENT_TYPE, FEDERATION_CONTENT_TYPE,
}; };
use bytes::Bytes; use bytes::Bytes;
use http::{HeaderValue, StatusCode}; use http::{header::LOCATION, HeaderValue, StatusCode};
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use tracing::info; use tracing::info;
@ -59,7 +59,7 @@ pub async fn fetch_object_http<T: Clone, Kind: DeserializeOwned>(
r#"application/ld+json; profile="https://www.w3.org/ns/activitystreams""#, // activitypub standard r#"application/ld+json; profile="https://www.w3.org/ns/activitystreams""#, // activitypub standard
r#"application/activity+json; charset=utf-8"#, // mastodon r#"application/activity+json; charset=utf-8"#, // mastodon
]; ];
let res = fetch_object_http_with_accept(url, data, &FETCH_CONTENT_TYPE).await?; let res = fetch_object_http_with_accept(url, data, &FETCH_CONTENT_TYPE, false).await?;
// Ensure correct content-type to prevent vulnerabilities, with case insensitive comparison. // Ensure correct content-type to prevent vulnerabilities, with case insensitive comparison.
let content_type = res let content_type = res
@ -73,6 +73,15 @@ pub async fn fetch_object_http<T: Clone, Kind: DeserializeOwned>(
// Ensure id field matches final url after redirect // Ensure id field matches final url after redirect
if res.object_id.as_ref() != Some(&res.url) { if res.object_id.as_ref() != Some(&res.url) {
if let Some(res_object_id) = res.object_id {
data.config.verify_url_valid(&res_object_id).await?;
// If id is different but still on the same domain, attempt to request object
// again from url in id field.
if res_object_id.domain() == res.url.domain() {
return Box::pin(fetch_object_http(&res_object_id, data)).await;
}
}
// Failed to fetch the object from its specified id
return Err(Error::FetchWrongId(res.url)); return Err(Error::FetchWrongId(res.url));
} }
@ -91,6 +100,7 @@ async fn fetch_object_http_with_accept<T: Clone, Kind: DeserializeOwned>(
url: &Url, url: &Url,
data: &Data<T>, data: &Data<T>,
content_type: &HeaderValue, content_type: &HeaderValue,
recursive: bool,
) -> Result<FetchObjectResponse<Kind>, Error> { ) -> Result<FetchObjectResponse<Kind>, Error> {
let config = &data.config; let config = &data.config;
config.verify_url_valid(url).await?; config.verify_url_valid(url).await?;
@ -123,6 +133,19 @@ async fn fetch_object_http_with_accept<T: Clone, Kind: DeserializeOwned>(
req.send().await? req.send().await?
}; };
// Allow a single redirect using recursion. Further redirects are ignored.
let location = res.headers().get(LOCATION).and_then(|l| l.to_str().ok());
if let (Some(location), false) = (location, recursive) {
let location = location.parse()?;
return Box::pin(fetch_object_http_with_accept(
&location,
data,
content_type,
true,
))
.await;
}
if res.status() == StatusCode::GONE { if res.status() == StatusCode::GONE {
return Err(Error::ObjectDeleted(url.clone())); return Err(Error::ObjectDeleted(url.clone()));
} }

View file

@ -1,5 +1,5 @@
use crate::{ use crate::{
config::Data, config::{Data, DOMAIN_REGEX},
error::Error, error::Error,
fetch::{fetch_object_http_with_accept, object_id::ObjectId}, fetch::{fetch_object_http_with_accept, object_id::ObjectId},
traits::{Actor, Object}, traits::{Actor, Object},
@ -54,21 +54,31 @@ where
.splitn(2, '@') .splitn(2, '@')
.collect_tuple() .collect_tuple()
.ok_or(WebFingerError::WrongFormat.into_crate_error())?; .ok_or(WebFingerError::WrongFormat.into_crate_error())?;
// For production mode make sure that domain doesnt contain any port or path.
if !data.config.debug && !DOMAIN_REGEX.is_match(domain) {
return Err(Error::UrlVerificationError("Invalid characters in domain").into());
}
let protocol = if data.config.debug { "http" } else { "https" }; let protocol = if data.config.debug { "http" } else { "https" };
let fetch_url = let fetch_url =
format!("{protocol}://{domain}/.well-known/webfinger?resource=acct:{identifier}"); format!("{protocol}://{domain}/.well-known/webfinger?resource=acct:{identifier}");
debug!("Fetching webfinger url: {}", &fetch_url); debug!("Fetching webfinger url: {}", &fetch_url);
let res: Webfinger = fetch_object_http_with_accept( let res = fetch_object_http_with_accept::<_, Webfinger>(
&Url::parse(&fetch_url).map_err(Error::UrlParse)?, &Url::parse(&fetch_url).map_err(Error::UrlParse)?,
data, data,
&WEBFINGER_CONTENT_TYPE, &WEBFINGER_CONTENT_TYPE,
false,
) )
.await? .await?;
.object; if res.url.as_str() != fetch_url {
data.config.verify_url_valid(&res.url).await?;
}
debug_assert_eq!(res.subject, format!("acct:{identifier}")); debug_assert_eq!(res.object.subject, format!("acct:{identifier}"));
let links: Vec<Url> = res let links: Vec<Url> = res
.object
.links .links
.iter() .iter()
.filter(|link| { .filter(|link| {