aboutsummaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
author alemi <me@alemi.dev>2024-05-03 04:43:25 +0200
committer alemi <me@alemi.dev>2024-05-03 04:43:25 +0200
commit9e42d4b4606b0f0b44e1389f6c65769316545292 (patch)
tree72cc2d8229f52a7eb4b28c1928b470f070fe7408
parent7454da652585b08d9e5303b699bd24396580ace0 (diff)
feat: add server crawler
i may remove this tho, it definitely should not be arbitrarily invokable by local users!!!
-rw-r--r--src/routes/activitypub/object/replies.rs7
-rw-r--r--src/server/fetcher.rs62
2 files changed, 66 insertions, 3 deletions
diff --git a/src/routes/activitypub/object/replies.rs b/src/routes/activitypub/object/replies.rs
index 8f69b9b..35d08cc 100644
--- a/src/routes/activitypub/object/replies.rs
+++ b/src/routes/activitypub/object/replies.rs
@@ -1,16 +1,21 @@
use axum::extract::{Path, Query, State};
use sea_orm::{ColumnTrait, Condition, PaginatorTrait, QueryFilter};
-use crate::{model, routes::activitypub::{JsonLD, Pagination}, server::{auth::AuthIdentity, Context}, url};
+use crate::{model, routes::activitypub::{JsonLD, Pagination, TryFetch}, server::{auth::AuthIdentity, fetcher::Fetcher, Context}, url};
pub async fn get(
State(ctx): State<Context>,
Path(id): Path<String>,
AuthIdentity(auth): AuthIdentity,
+ Query(q): Query<TryFetch>,
) -> crate::Result<JsonLD<serde_json::Value>> {
let replies_id = url!(ctx, "/objects/{id}/replies");
let oid = ctx.uri("objects", id);
+ if auth.is_local() && q.fetch {
+ ctx.fetch_thread(&oid).await?;
+ }
+
let count = model::addressing::Entity::find_addressed(auth.my_id())
.filter(auth.filter_condition())
.filter(model::object::Column::InReplyTo.eq(oid))
diff --git a/src/server/fetcher.rs b/src/server/fetcher.rs
index 07cad64..21af542 100644
--- a/src/server/fetcher.rs
+++ b/src/server/fetcher.rs
@@ -1,11 +1,11 @@
use std::collections::BTreeMap;
-use apb::{target::Addressed, Activity, Collection, Object};
+use apb::{target::Addressed, Activity, Collection, CollectionPage, Link, Object};
use base64::Engine;
use reqwest::{header::{ACCEPT, CONTENT_TYPE, USER_AGENT}, Method, Response};
use sea_orm::{sea_query::Expr, ColumnTrait, EntityTrait, IntoActiveModel, QueryFilter};
-use crate::{model, VERSION};
+use crate::{errors::UpubError, model, VERSION};
use super::{auth::HttpSignature, Context};
@@ -20,6 +20,8 @@ pub trait Fetcher {
async fn fetch_activity(&self, id: &str) -> crate::Result<model::activity::Model>;
async fn pull_activity(&self, id: &str) -> crate::Result<model::activity::Model>;
+ async fn fetch_thread(&self, id: &str) -> crate::Result<()>;
+
async fn request(
method: reqwest::Method,
url: &str,
@@ -165,6 +167,10 @@ impl Fetcher for Context {
Ok(activity_model)
}
+ async fn fetch_thread(&self, id: &str) -> crate::Result<()> {
+ crawl_replies(self, id, 0).await
+ }
+
async fn fetch_object(&self, id: &str) -> crate::Result<model::object::Model> {
fetch_object_inner(self, id, 0).await
}
@@ -179,6 +185,58 @@ impl Fetcher for Context {
}
#[async_recursion::async_recursion]
+async fn crawl_replies(ctx: &Context, id: &str, depth: usize) -> crate::Result<()> {
+ tracing::info!("crawling replies of '{id}'");
+ let object = Context::request(
+ Method::GET, id, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(),
+ ).await?.json::<serde_json::Value>().await?;
+
+ let object_model = model::object::Model::new(&object)?;
+ match model::object::Entity::insert(object_model.into_active_model())
+ .exec(ctx.db()).await
+ {
+ Ok(_) => {},
+ Err(sea_orm::DbErr::RecordNotInserted) => {},
+ Err(e) => return Err(e.into()),
+ }
+
+ if depth > 16 {
+ tracing::warn!("stopping thread crawling: too deep!");
+ return Ok(());
+ }
+
+ let mut page_url = match object.replies().get() {
+ Some(serde_json::Value::String(x)) => {
+ let replies = Context::request(
+ Method::GET, x, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(),
+ ).await?.json::<serde_json::Value>().await?;
+ replies.first().id()
+ },
+ Some(serde_json::Value::Object(x)) => {
+ let obj = serde_json::Value::Object(x.clone()); // lol putting it back, TODO!
+ obj.first().id()
+ },
+ _ => return Ok(()),
+ };
+
+ while let Some(ref url) = page_url {
+ let replies = Context::request(
+ Method::GET, url, None, &format!("https://{}", ctx.domain()), &ctx.app().private_key, ctx.domain(),
+ ).await?.json::<serde_json::Value>().await?;
+
+ for reply in replies.items() {
+ // TODO right now it crawls one by one, could be made in parallel but would be quite more
+ // abusive, so i'll keep it like this while i try it out
+ crawl_replies(ctx, reply.href(), depth + 1).await?;
+ }
+
+ page_url = replies.next().id();
+ }
+
+ Ok(())
+}
+
+#[async_recursion::async_recursion]
async fn fetch_object_inner(ctx: &Context, id: &str, depth: usize) -> crate::Result<model::object::Model> {
if let Some(x) = model::object::Entity::find_by_id(id).one(ctx.db()).await? {
return Ok(x); // already in db, easy