dedupupup

This commit is contained in:
Felix Ableitner 2024-03-01 11:04:26 +01:00
parent 499f037b68
commit 3ea4bb6e53
2 changed files with 32 additions and 54 deletions

View file

@ -3,7 +3,7 @@
#![doc = include_str!("../docs/09_sending_activities.md")] #![doc = include_str!("../docs/09_sending_activities.md")]
use crate::{ use crate::{
activity_sending::{build_tasks, generate_request_headers, send, SendActivityTask}, activity_sending::{build_tasks, generate_request_headers, SendActivityTask},
config::Data, config::Data,
error::Error, error::Error,
http_signatures::sign_request, http_signatures::sign_request,
@ -91,30 +91,8 @@ async fn sign_and_send(
timeout: Duration, timeout: Duration,
retry_strategy: RetryStrategy, retry_strategy: RetryStrategy,
) -> Result<(), Error> { ) -> Result<(), Error> {
debug!("Sending {} to {}", task.activity_id, task.inbox,);
let request_builder = client
.post(task.inbox.to_string())
.timeout(timeout)
.headers(generate_request_headers(&task.inbox));
let request = sign_request(
request_builder,
&task.actor_id,
task.activity.clone(),
task.private_key.clone(),
task.http_signature_compat,
)
.await?;
retry( retry(
|| { || task.sign_and_send_internal(client, timeout),
send(
task,
client,
request
.try_clone()
.expect("The body of the request is not cloneable"),
)
},
retry_strategy, retry_strategy,
) )
.await .await

View file

@ -24,7 +24,7 @@ use serde::Serialize;
use std::{ use std::{
self, self,
fmt::{Debug, Display}, fmt::{Debug, Display},
time::SystemTime, time::{Duration, SystemTime},
}; };
use tracing::debug; use tracing::debug;
use url::Url; use url::Url;
@ -70,10 +70,19 @@ impl SendActivityTask {
/// convert a sendactivitydata to a request, signing and sending it /// convert a sendactivitydata to a request, signing and sending it
pub async fn sign_and_send<Datatype: Clone>(&self, data: &Data<Datatype>) -> Result<(), Error> { pub async fn sign_and_send<Datatype: Clone>(&self, data: &Data<Datatype>) -> Result<(), Error> {
let client = &data.config.client; self.sign_and_send_internal(&data.config.client, data.config.request_timeout)
.await
}
pub(crate) async fn sign_and_send_internal(
&self,
client: &ClientWithMiddleware,
timeout: Duration,
) -> Result<(), Error> {
debug!("Sending {} to {}", self.activity_id, self.inbox,);
let request_builder = client let request_builder = client
.post(self.inbox.to_string()) .post(self.inbox.to_string())
.timeout(data.config.request_timeout) .timeout(timeout)
.headers(generate_request_headers(&self.inbox)); .headers(generate_request_headers(&self.inbox));
let request = sign_request( let request = sign_request(
request_builder, request_builder,
@ -83,26 +92,16 @@ impl SendActivityTask {
self.http_signature_compat, self.http_signature_compat,
) )
.await?; .await?;
send(&self, client, request).await
}
}
pub(crate) async fn send<T: Display>(
activity: &T,
client: &ClientWithMiddleware,
request: Request,
) -> Result<(), Error> {
let response = client.execute(request).await?; let response = client.execute(request).await?;
match response { match response {
o if o.status().is_success() => { o if o.status().is_success() => {
debug!("Activity {activity} delivered successfully"); debug!("Activity {self} delivered successfully");
Ok(()) Ok(())
} }
o if o.status().is_client_error() => { o if o.status().is_client_error() => {
let text = o.text_limited().await?; let text = o.text_limited().await?;
debug!("Activity {activity} was rejected, aborting: {text}"); debug!("Activity {self} was rejected, aborting: {text}");
Ok(()) Ok(())
} }
o => { o => {
@ -110,11 +109,12 @@ pub(crate) async fn send<T: Display>(
let text = o.text_limited().await?; let text = o.text_limited().await?;
Err(Error::Other(format!( Err(Error::Other(format!(
"Activity {activity} failure with status {status}: {text}", "Activity {self} failure with status {status}: {text}",
))) )))
} }
} }
} }
}
pub(crate) fn serialize_activity<Activity: Serialize + Debug>( pub(crate) fn serialize_activity<Activity: Serialize + Debug>(
activity: &Activity, activity: &Activity,