Compare commits
8 commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1cbeed318e | ||
|
|
cb05faefdc | ||
|
|
eca8f0fc6f | ||
|
|
d1f4da4198 | ||
|
|
df61c72344 | ||
|
|
c90044f708 | ||
|
|
3e4d54778c | ||
|
|
c4b24bd201 |
9 changed files with 3120 additions and 22 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
|
@ -1,6 +1,5 @@
|
|||
/target
|
||||
/.idea
|
||||
/Cargo.lock
|
||||
perf.data*
|
||||
flamegraph.svg
|
||||
|
||||
|
|
|
|||
2970
Cargo.lock
generated
Normal file
2970
Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load diff
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "activitypub_federation"
|
||||
version = "0.5.8"
|
||||
version = "0.5.11"
|
||||
edition = "2021"
|
||||
description = "High-level Activitypub framework"
|
||||
keywords = ["activitypub", "activitystreams", "federation", "fediverse"]
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ use crate::{
|
|||
};
|
||||
use error::Error;
|
||||
use std::{env::args, str::FromStr};
|
||||
use tokio::try_join;
|
||||
use tracing::log::{info, LevelFilter};
|
||||
|
||||
mod activities;
|
||||
|
|
@ -34,8 +35,10 @@ async fn main() -> Result<(), Error> {
|
|||
.map(|arg| Webserver::from_str(&arg).unwrap())
|
||||
.unwrap_or(Webserver::Axum);
|
||||
|
||||
let alpha = new_instance("localhost:8001", "alpha".to_string()).await?;
|
||||
let beta = new_instance("localhost:8002", "beta".to_string()).await?;
|
||||
let (alpha, beta) = try_join!(
|
||||
new_instance("localhost:8001", "alpha".to_string()),
|
||||
new_instance("localhost:8002", "beta".to_string())
|
||||
)?;
|
||||
listen(&alpha, &webserver)?;
|
||||
listen(&beta, &webserver)?;
|
||||
info!("Local instances started");
|
||||
|
|
|
|||
|
|
@ -24,9 +24,9 @@ use rsa::{pkcs8::DecodePrivateKey, RsaPrivateKey};
|
|||
use serde::Serialize;
|
||||
use std::{
|
||||
fmt::{Debug, Display},
|
||||
time::{Duration, SystemTime},
|
||||
time::{Duration, Instant, SystemTime},
|
||||
};
|
||||
use tracing::debug;
|
||||
use tracing::{debug, warn};
|
||||
use url::Url;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
|
|
@ -92,7 +92,17 @@ impl SendActivityTask {
|
|||
self.http_signature_compat,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Send the activity, and log a warning if its too slow.
|
||||
let now = Instant::now();
|
||||
let response = client.execute(request).await?;
|
||||
let elapsed = now.elapsed().as_secs();
|
||||
if elapsed > 10 {
|
||||
warn!(
|
||||
"Sending activity {} to {} took {}s",
|
||||
self.activity_id, self.inbox, elapsed
|
||||
);
|
||||
}
|
||||
self.handle_response(response).await
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -23,11 +23,16 @@ use crate::{
|
|||
use async_trait::async_trait;
|
||||
use derive_builder::Builder;
|
||||
use dyn_clone::{clone_trait_object, DynClone};
|
||||
use itertools::Itertools;
|
||||
use moka::future::Cache;
|
||||
use once_cell::sync::Lazy;
|
||||
use regex::Regex;
|
||||
use reqwest::{redirect::Policy, Client};
|
||||
use reqwest_middleware::ClientWithMiddleware;
|
||||
use rsa::{pkcs8::DecodePrivateKey, RsaPrivateKey};
|
||||
use serde::de::DeserializeOwned;
|
||||
use std::{
|
||||
net::{IpAddr, Ipv4Addr, Ipv6Addr},
|
||||
ops::Deref,
|
||||
sync::{
|
||||
atomic::{AtomicU32, Ordering},
|
||||
|
|
@ -35,6 +40,7 @@ use std::{
|
|||
},
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::net::lookup_host;
|
||||
use url::Url;
|
||||
|
||||
/// Configuration for this library, with various federation related settings
|
||||
|
|
@ -51,9 +57,14 @@ pub struct FederationConfig<T: Clone> {
|
|||
/// [crate::fetch::object_id::ObjectId] for more details.
|
||||
#[builder(default = "20")]
|
||||
pub(crate) http_fetch_limit: u32,
|
||||
#[builder(default = "reqwest::Client::default().into()")]
|
||||
/// HTTP client used for all outgoing requests. Middleware can be used to add functionality
|
||||
/// like log tracing or retry of failed requests.
|
||||
#[builder(default = "default_client()")]
|
||||
/// HTTP client used for all outgoing requests. When passing a custom client here you should
|
||||
/// also disable redirects and set timeouts.
|
||||
///
|
||||
/// Middleware can be used to add functionality like log tracing or retry of failed requests.
|
||||
/// Redirects are disabled by default, because automatic redirect URLs can't be validated.
|
||||
/// Instead a single redirect is handled manually. The default client sets a timeout of 10s
|
||||
/// to avoid excessive resource usage when connecting to dead servers.
|
||||
pub(crate) client: ClientWithMiddleware,
|
||||
/// Run library in debug mode. This allows usage of http and localhost urls. It also sends
|
||||
/// outgoing activities synchronously, not in background thread. This helps to make tests
|
||||
|
|
@ -102,6 +113,9 @@ pub struct FederationConfig<T: Clone> {
|
|||
pub(crate) queue_retry_count: usize,
|
||||
}
|
||||
|
||||
pub(crate) static DOMAIN_REGEX: Lazy<Regex> =
|
||||
Lazy::new(|| Regex::new(r"^[a-zA-Z0-9.-]*$").expect("compile regex"));
|
||||
|
||||
impl<T: Clone> FederationConfig<T> {
|
||||
/// Returns a new config builder with default values.
|
||||
pub fn builder() -> FederationConfigBuilder<T> {
|
||||
|
|
@ -156,17 +170,45 @@ impl<T: Clone> FederationConfig<T> {
|
|||
return Ok(());
|
||||
}
|
||||
|
||||
if url.domain().is_none() {
|
||||
let Some(domain) = url.domain() else {
|
||||
return Err(Error::UrlVerificationError("Url must have a domain"));
|
||||
};
|
||||
if !DOMAIN_REGEX.is_match(domain) {
|
||||
return Err(Error::UrlVerificationError("Invalid characters in domain"));
|
||||
}
|
||||
|
||||
if url.domain() == Some("localhost") && !self.debug {
|
||||
return Err(Error::UrlVerificationError(
|
||||
"Localhost is only allowed in debug mode",
|
||||
));
|
||||
// Extra checks only for production mode
|
||||
if !self.debug {
|
||||
if url.port().is_some() {
|
||||
return Err(Error::UrlVerificationError("Explicit port is not allowed"));
|
||||
}
|
||||
|
||||
// Resolve domain and see if it points to private IP
|
||||
// TODO: Use is_global() once stabilized
|
||||
// https://doc.rust-lang.org/std/net/enum.IpAddr.html#method.is_global
|
||||
let mut ips = lookup_host((domain.to_owned(), 80)).await?;
|
||||
let allow_local = std::env::var("DANGER_FEDERATION_ALLOW_LOCAL_IP").is_ok();
|
||||
let invalid_ip = !allow_local
|
||||
&& ips.any(|addr| match addr.ip().to_canonical() {
|
||||
IpAddr::V4(addr) => v4_is_invalid(addr),
|
||||
IpAddr::V6(addr) => v6_is_invalid(addr),
|
||||
});
|
||||
if invalid_ip {
|
||||
let ip_addrs = ips.join(", ");
|
||||
return Err(Error::DomainResolveError(domain.to_string(), ip_addrs));
|
||||
}
|
||||
}
|
||||
|
||||
self.url_verifier.verify(url).await?;
|
||||
// It is valid but uncommon for domains to end with `.` char. Drop this so it cant be used
|
||||
// to bypass domain blocklist. Avoid cloning url in common case.
|
||||
if domain.ends_with('.') {
|
||||
let mut url = url.clone();
|
||||
let domain = &domain[0..domain.len() - 1];
|
||||
url.set_host(Some(domain))?;
|
||||
self.url_verifier.verify(&url).await?;
|
||||
} else {
|
||||
self.url_verifier.verify(url).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -193,6 +235,30 @@ impl<T: Clone> FederationConfig<T> {
|
|||
}
|
||||
}
|
||||
|
||||
fn v4_is_invalid(v4: Ipv4Addr) -> bool {
|
||||
v4.is_private()
|
||||
|| v4.is_loopback()
|
||||
|| v4.is_link_local()
|
||||
|| v4.is_multicast()
|
||||
|| v4.is_documentation()
|
||||
|| v4.is_unspecified()
|
||||
|| v4.is_broadcast()
|
||||
}
|
||||
|
||||
fn v6_is_invalid(v6: Ipv6Addr) -> bool {
|
||||
let is_documentation = matches!(
|
||||
v6.segments(),
|
||||
[0x2001, 0xdb8, ..] | [0x3fff, 0..=0x0fff, ..]
|
||||
);
|
||||
is_documentation
|
||||
|| v6.is_loopback()
|
||||
|| v6.is_multicast()
|
||||
|| (v6.segments()[0] & 0xfe00) == 0xfc00 // is_unique_local
|
||||
|| (v6.segments()[0] & 0xffc0) == 0xfe80 // is_unicast_link_local
|
||||
|| v6.is_unspecified()
|
||||
|| v6.to_ipv4_mapped().is_some_and(v4_is_invalid)
|
||||
}
|
||||
|
||||
impl<T: Clone> FederationConfigBuilder<T> {
|
||||
/// Sets an actor to use to sign all federated fetch requests
|
||||
pub fn signed_fetch_actor<A: Actor>(&mut self, actor: &A) -> &mut Self {
|
||||
|
|
@ -348,6 +414,17 @@ impl<T: Clone> FederationMiddleware<T> {
|
|||
}
|
||||
}
|
||||
|
||||
fn default_client() -> ClientWithMiddleware {
|
||||
let timeout = Duration::from_secs(10);
|
||||
Client::builder()
|
||||
.redirect(Policy::none())
|
||||
.timeout(timeout)
|
||||
.connect_timeout(timeout)
|
||||
.build()
|
||||
.unwrap_or_else(|_| Client::default())
|
||||
.into()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[allow(clippy::unwrap_used)]
|
||||
mod test {
|
||||
|
|
|
|||
|
|
@ -28,6 +28,9 @@ pub enum Error {
|
|||
/// url verification error
|
||||
#[error("URL failed verification: {0}")]
|
||||
UrlVerificationError(&'static str),
|
||||
/// Resolving domain points to local IP.
|
||||
#[error("Resolving domain {0} points to local IP {1}. This may indicate an attacker attempting to access internal services. If intentional, you can ignore this error by setting DANGER_FEDERATION_ALLOW_LOCAL_IP=1")]
|
||||
DomainResolveError(String, String),
|
||||
/// Incoming activity has invalid digest for body
|
||||
#[error("Incoming activity has invalid digest for body")]
|
||||
ActivityBodyDigestInvalid,
|
||||
|
|
@ -78,6 +81,9 @@ pub enum Error {
|
|||
/// Attempted to fetch object but the response's id field doesn't match
|
||||
#[error("Attempted to fetch object from {0} but the response's id field doesn't match")]
|
||||
FetchWrongId(Url),
|
||||
/// I/O error from OS
|
||||
#[error(transparent)]
|
||||
IoError(#[from] std::io::Error),
|
||||
/// Other generic errors
|
||||
#[error("{0}")]
|
||||
Other(String),
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ use crate::{
|
|||
FEDERATION_CONTENT_TYPE,
|
||||
};
|
||||
use bytes::Bytes;
|
||||
use http::{HeaderValue, StatusCode};
|
||||
use http::{header::LOCATION, HeaderValue, StatusCode};
|
||||
use serde::de::DeserializeOwned;
|
||||
use std::sync::atomic::Ordering;
|
||||
use tracing::info;
|
||||
|
|
@ -59,7 +59,7 @@ pub async fn fetch_object_http<T: Clone, Kind: DeserializeOwned>(
|
|||
r#"application/ld+json; profile="https://www.w3.org/ns/activitystreams""#, // activitypub standard
|
||||
r#"application/activity+json; charset=utf-8"#, // mastodon
|
||||
];
|
||||
let res = fetch_object_http_with_accept(url, data, &FETCH_CONTENT_TYPE).await?;
|
||||
let res = fetch_object_http_with_accept(url, data, &FETCH_CONTENT_TYPE, false).await?;
|
||||
|
||||
// Ensure correct content-type to prevent vulnerabilities, with case insensitive comparison.
|
||||
let content_type = res
|
||||
|
|
@ -73,6 +73,15 @@ pub async fn fetch_object_http<T: Clone, Kind: DeserializeOwned>(
|
|||
|
||||
// Ensure id field matches final url after redirect
|
||||
if res.object_id.as_ref() != Some(&res.url) {
|
||||
if let Some(res_object_id) = res.object_id {
|
||||
data.config.verify_url_valid(&res_object_id).await?;
|
||||
// If id is different but still on the same domain, attempt to request object
|
||||
// again from url in id field.
|
||||
if res_object_id.domain() == res.url.domain() {
|
||||
return Box::pin(fetch_object_http(&res_object_id, data)).await;
|
||||
}
|
||||
}
|
||||
// Failed to fetch the object from its specified id
|
||||
return Err(Error::FetchWrongId(res.url));
|
||||
}
|
||||
|
||||
|
|
@ -91,6 +100,7 @@ async fn fetch_object_http_with_accept<T: Clone, Kind: DeserializeOwned>(
|
|||
url: &Url,
|
||||
data: &Data<T>,
|
||||
content_type: &HeaderValue,
|
||||
recursive: bool,
|
||||
) -> Result<FetchObjectResponse<Kind>, Error> {
|
||||
let config = &data.config;
|
||||
config.verify_url_valid(url).await?;
|
||||
|
|
@ -123,6 +133,19 @@ async fn fetch_object_http_with_accept<T: Clone, Kind: DeserializeOwned>(
|
|||
req.send().await?
|
||||
};
|
||||
|
||||
// Allow a single redirect using recursion. Further redirects are ignored.
|
||||
let location = res.headers().get(LOCATION).and_then(|l| l.to_str().ok());
|
||||
if let (Some(location), false) = (location, recursive) {
|
||||
let location = location.parse()?;
|
||||
return Box::pin(fetch_object_http_with_accept(
|
||||
&location,
|
||||
data,
|
||||
content_type,
|
||||
true,
|
||||
))
|
||||
.await;
|
||||
}
|
||||
|
||||
if res.status() == StatusCode::GONE {
|
||||
return Err(Error::ObjectDeleted(url.clone()));
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
use crate::{
|
||||
config::Data,
|
||||
config::{Data, DOMAIN_REGEX},
|
||||
error::Error,
|
||||
fetch::{fetch_object_http_with_accept, object_id::ObjectId},
|
||||
traits::{Actor, Object},
|
||||
|
|
@ -54,21 +54,31 @@ where
|
|||
.splitn(2, '@')
|
||||
.collect_tuple()
|
||||
.ok_or(WebFingerError::WrongFormat.into_crate_error())?;
|
||||
|
||||
// For production mode make sure that domain doesnt contain any port or path.
|
||||
if !data.config.debug && !DOMAIN_REGEX.is_match(domain) {
|
||||
return Err(Error::UrlVerificationError("Invalid characters in domain").into());
|
||||
}
|
||||
|
||||
let protocol = if data.config.debug { "http" } else { "https" };
|
||||
let fetch_url =
|
||||
format!("{protocol}://{domain}/.well-known/webfinger?resource=acct:{identifier}");
|
||||
debug!("Fetching webfinger url: {}", &fetch_url);
|
||||
|
||||
let res: Webfinger = fetch_object_http_with_accept(
|
||||
let res = fetch_object_http_with_accept::<_, Webfinger>(
|
||||
&Url::parse(&fetch_url).map_err(Error::UrlParse)?,
|
||||
data,
|
||||
&WEBFINGER_CONTENT_TYPE,
|
||||
false,
|
||||
)
|
||||
.await?
|
||||
.object;
|
||||
.await?;
|
||||
if res.url.as_str() != fetch_url {
|
||||
data.config.verify_url_valid(&res.url).await?;
|
||||
}
|
||||
|
||||
debug_assert_eq!(res.subject, format!("acct:{identifier}"));
|
||||
debug_assert_eq!(res.object.subject, format!("acct:{identifier}"));
|
||||
let links: Vec<Url> = res
|
||||
.object
|
||||
.links
|
||||
.iter()
|
||||
.filter(|link| {
|
||||
|
|
|
|||
Loading…
Reference in a new issue