bingus-blog/src/main.rs

440 lines
14 KiB
Rust
Raw Normal View History

2024-04-30 11:44:40 +03:00
#![feature(let_chains)]
2024-04-18 04:05:38 +03:00
mod config;
mod error;
mod filters;
mod hash_arc_store;
mod markdown_render;
mod post;
mod ranged_i128_visitor;
mod systemtime_as_secs;
2024-04-18 04:05:38 +03:00
use std::future::IntoFuture;
use std::io::Read;
2024-04-18 04:05:38 +03:00
use std::net::SocketAddr;
use std::process::exit;
use std::sync::Arc;
use std::time::Duration;
use askama_axum::Template;
2024-05-01 18:25:01 +03:00
use axum::extract::{Path, Query, State};
2024-05-02 19:23:20 +03:00
use axum::http::{header, Request};
2024-04-18 04:05:38 +03:00
use axum::response::{IntoResponse, Redirect, Response};
use axum::routing::{get, Router};
use axum::Json;
use color_eyre::eyre::{self, Context};
2024-05-02 19:23:20 +03:00
use error::AppError;
use rss::{Category, ChannelBuilder, ItemBuilder};
2024-05-01 18:25:01 +03:00
use serde::Deserialize;
2024-04-18 04:05:38 +03:00
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
use tokio::task::JoinSet;
use tokio::{select, signal};
2024-04-18 04:05:38 +03:00
use tokio_util::sync::CancellationToken;
use tower_http::services::ServeDir;
use tower_http::trace::TraceLayer;
use tracing::level_filters::LevelFilter;
use tracing::{debug, error, info, info_span, warn, Span};
2024-04-18 04:05:38 +03:00
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
use crate::config::Config;
2024-05-01 18:25:01 +03:00
use crate::error::{AppResult, PostError};
2024-05-01 23:12:52 +03:00
use crate::post::cache::{Cache, CACHE_VERSION};
2024-04-18 04:05:38 +03:00
use crate::post::{PostManager, PostMetadata, RenderStats};
type ArcState = Arc<AppState>;
#[derive(Clone)]
struct AppState {
pub config: Config,
pub posts: PostManager,
}
#[derive(Template)]
#[template(path = "index.html")]
struct IndexTemplate {
title: String,
description: String,
posts: Vec<PostMetadata>,
}
#[derive(Template)]
#[template(path = "post.html")]
struct PostTemplate {
2024-04-18 04:05:38 +03:00
meta: PostMetadata,
rendered: String,
rendered_in: RenderStats,
markdown_access: bool,
2024-04-18 04:05:38 +03:00
}
2024-05-01 18:25:01 +03:00
#[derive(Deserialize)]
struct QueryParams {
tag: Option<String>,
#[serde(rename = "n")]
num_posts: Option<usize>,
2024-04-20 23:46:20 +03:00
}
2024-05-01 18:25:01 +03:00
async fn index(
State(state): State<ArcState>,
Query(query): Query<QueryParams>,
) -> AppResult<IndexTemplate> {
let posts = state
.posts
2024-05-02 19:23:20 +03:00
.get_max_n_post_metadata_with_optional_tag_sorted(query.num_posts, query.tag.as_ref())
2024-05-01 18:25:01 +03:00
.await?;
2024-04-18 04:05:38 +03:00
Ok(IndexTemplate {
title: state.config.title.clone(),
description: state.config.description.clone(),
2024-05-01 18:25:01 +03:00
posts,
2024-04-18 04:05:38 +03:00
})
}
2024-05-01 18:25:01 +03:00
async fn all_posts(
State(state): State<ArcState>,
Query(query): Query<QueryParams>,
) -> AppResult<Json<Vec<PostMetadata>>> {
let posts = state
.posts
2024-05-02 19:23:20 +03:00
.get_max_n_post_metadata_with_optional_tag_sorted(query.num_posts, query.tag.as_ref())
2024-05-01 18:25:01 +03:00
.await?;
Ok(Json(posts))
}
2024-05-02 19:23:20 +03:00
async fn rss(
State(state): State<ArcState>,
Query(query): Query<QueryParams>,
) -> AppResult<Response> {
if !state.config.rss.enable {
return Err(AppError::RssDisabled);
}
let posts = state
.posts
2024-05-02 20:09:28 +03:00
.get_all_posts_filtered(|metadata, _| {
!query
.tag
.as_ref()
.is_some_and(|tag| !metadata.tags.contains(tag))
})
2024-05-02 19:23:20 +03:00
.await?;
let mut channel = ChannelBuilder::default();
channel
.title(&state.config.title)
.link(state.config.rss.link.to_string())
.description(&state.config.description);
//TODO: .language()
for (metadata, content, _) in posts {
channel.item(
ItemBuilder::default()
.title(metadata.title)
.description(metadata.description)
.author(metadata.author)
.categories(
metadata
.tags
.into_iter()
.map(|tag| Category {
name: tag,
domain: None,
})
.collect::<Vec<Category>>(),
)
.pub_date(metadata.created_at.map(|date| date.to_rfc2822()))
.content(content)
.link(
state
.config
.rss
.link
.join(&format!("/posts/{}", metadata.name))?
.to_string(),
)
.build(),
);
}
let body = channel.build().to_string();
drop(channel);
Ok(([(header::CONTENT_TYPE, "text/xml")], body).into_response())
}
2024-04-18 04:05:38 +03:00
async fn post(State(state): State<ArcState>, Path(name): Path<String>) -> AppResult<Response> {
2024-05-01 18:25:01 +03:00
if name.ends_with(".md") && state.config.raw_access {
let mut file = tokio::fs::OpenOptions::new()
.read(true)
2024-05-01 18:25:01 +03:00
.open(state.config.dirs.posts.join(&name))
.await?;
let mut buf = Vec::new();
file.read_to_end(&mut buf).await?;
Ok(([("content-type", "text/plain")], buf).into_response())
} else {
let post = state.posts.get_post(&name).await?;
let page = PostTemplate {
meta: post.0,
rendered: post.1,
rendered_in: post.2,
2024-05-01 18:25:01 +03:00
markdown_access: state.config.raw_access,
};
2024-04-18 04:05:38 +03:00
Ok(page.into_response())
2024-04-18 04:05:38 +03:00
}
}
#[tokio::main]
async fn main() -> eyre::Result<()> {
#[cfg(feature = "tokio-console")]
console_subscriber::init();
color_eyre::install()?;
#[cfg(not(feature = "tokio-console"))]
tracing_subscriber::registry()
.with(
EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env_lossy(),
)
.with(tracing_subscriber::fmt::layer())
.init();
let config = config::load()
.await
2024-04-18 19:17:33 +03:00
.context("couldn't load configuration")?;
2024-04-18 04:05:38 +03:00
2024-05-01 18:25:01 +03:00
let socket_addr = SocketAddr::new(config.http.host, config.http.port);
2024-04-18 04:05:38 +03:00
let mut tasks = JoinSet::new();
let cancellation_token = CancellationToken::new();
2024-04-18 04:05:38 +03:00
2024-04-20 20:59:00 +03:00
let posts = if config.cache.enable {
if config.cache.persistence
&& tokio::fs::try_exists(&config.cache.file)
2024-04-18 04:05:38 +03:00
.await
.with_context(|| {
format!("failed to check if {} exists", config.cache.file.display())
})?
2024-04-20 20:59:00 +03:00
{
info!("loading cache from file");
let path = &config.cache.file;
2024-04-20 20:59:00 +03:00
let load_cache = async {
let mut cache_file = tokio::fs::File::open(&path)
.await
.context("failed to open cache file")?;
let serialized = if config.cache.compress {
let cache_file = cache_file.into_std().await;
tokio::task::spawn_blocking(move || {
let mut buf = Vec::with_capacity(4096);
zstd::stream::read::Decoder::new(cache_file)?.read_to_end(&mut buf)?;
Ok::<_, std::io::Error>(buf)
})
2024-04-20 20:59:00 +03:00
.await
.context("failed to join blocking thread")?
.context("failed to read cache file")?
} else {
let mut buf = Vec::with_capacity(4096);
cache_file
.read_to_end(&mut buf)
.await
.context("failed to read cache file")?;
buf
};
2024-05-01 23:12:52 +03:00
let mut cache: Cache =
2024-04-20 20:59:00 +03:00
bitcode::deserialize(serialized.as_slice()).context("failed to parse cache")?;
2024-05-01 23:12:52 +03:00
if cache.version() < CACHE_VERSION {
warn!("cache version changed, clearing cache");
cache = Cache::default();
};
2024-04-20 20:59:00 +03:00
Ok::<PostManager, color_eyre::Report>(PostManager::new_with_cache(
2024-05-01 18:25:01 +03:00
config.dirs.posts.clone(),
2024-04-20 20:59:00 +03:00
config.render.clone(),
cache,
))
}
.await;
match load_cache {
Ok(posts) => posts,
Err(err) => {
error!("failed to load cache: {}", err);
info!("using empty cache");
PostManager::new_with_cache(
2024-05-01 18:25:01 +03:00
config.dirs.posts.clone(),
config.render.clone(),
Default::default(),
)
2024-04-20 20:59:00 +03:00
}
}
} else {
PostManager::new_with_cache(
2024-05-01 18:25:01 +03:00
config.dirs.posts.clone(),
2024-04-18 04:05:38 +03:00
config.render.clone(),
2024-04-20 20:59:00 +03:00
Default::default(),
)
2024-04-18 04:05:38 +03:00
}
} else {
2024-05-01 18:25:01 +03:00
PostManager::new(config.dirs.posts.clone(), config.render.clone())
2024-04-18 04:05:38 +03:00
};
let state = Arc::new(AppState { config, posts });
if state.config.cache.enable && state.config.cache.cleanup {
if let Some(t) = state.config.cache.cleanup_interval {
let state = Arc::clone(&state);
let token = cancellation_token.child_token();
debug!("setting up cleanup task");
tasks.spawn(async move {
let mut interval = tokio::time::interval(Duration::from_millis(t));
loop {
select! {
_ = token.cancelled() => break,
_ = interval.tick() => {
state.posts.cleanup().await
}
}
}
});
} else {
state.posts.cleanup().await;
}
}
2024-04-18 04:05:38 +03:00
let app = Router::new()
.route("/", get(index))
.route(
"/post/:name",
get(
|Path(name): Path<String>| async move { Redirect::to(&format!("/posts/{}", name)) },
),
)
.route("/posts/:name", get(post))
.route("/posts", get(all_posts))
2024-05-02 19:23:20 +03:00
.route("/feed.xml", get(rss))
2024-04-18 04:05:38 +03:00
.nest_service("/static", ServeDir::new("static").precompressed_gzip())
.nest_service("/media", ServeDir::new("media"))
.layer(
TraceLayer::new_for_http()
.make_span_with(|request: &Request<_>| {
info_span!(
"request",
method = ?request.method(),
path = ?request.uri().path(),
)
})
.on_response(|response: &Response<_>, duration: Duration, span: &Span| {
let _ = span.enter();
let status = response.status();
info!(?status, ?duration, "response");
}),
)
.with_state(state.clone());
2024-05-01 18:25:01 +03:00
let listener = TcpListener::bind(socket_addr)
2024-04-18 04:05:38 +03:00
.await
2024-05-01 18:25:01 +03:00
.with_context(|| format!("couldn't listen on {}", socket_addr))?;
2024-04-18 04:05:38 +03:00
let local_addr = listener
.local_addr()
2024-04-18 19:17:33 +03:00
.context("couldn't get socket address")?;
2024-04-18 04:05:38 +03:00
info!("listening on http://{}", local_addr);
let sigint = signal::ctrl_c();
#[cfg(unix)]
let mut sigterm_handler =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;
#[cfg(unix)]
let sigterm = sigterm_handler.recv();
#[cfg(not(unix))] // TODO: kill all windows server users
let sigterm = std::future::pending::<()>();
let axum_token = cancellation_token.child_token();
2024-04-18 04:05:38 +03:00
let mut server = axum::serve(
listener,
app.into_make_service_with_connect_info::<SocketAddr>(),
)
.with_graceful_shutdown(async move { axum_token.cancelled().await })
.into_future();
tokio::select! {
result = &mut server => {
2024-04-18 19:17:33 +03:00
result.context("failed to serve app")?;
2024-04-18 04:05:38 +03:00
},
_ = sigint => {
info!("received SIGINT, exiting gracefully");
},
_ = sigterm => {
info!("received SIGTERM, exiting gracefully");
}
};
let cleanup = async move {
// stop tasks
cancellation_token.cancel();
2024-04-18 19:17:33 +03:00
server.await.context("failed to serve app")?;
2024-04-18 04:05:38 +03:00
while let Some(task) = tasks.join_next().await {
2024-04-18 19:17:33 +03:00
task.context("failed to join task")?;
2024-04-18 04:05:38 +03:00
}
// write cache to file
2024-05-02 17:41:06 +03:00
let config = &state.config;
let posts = &state.posts;
2024-04-20 20:59:00 +03:00
if config.cache.enable
&& config.cache.persistence
2024-05-02 17:41:06 +03:00
&& let Some(cache) = posts.cache()
2024-04-20 20:59:00 +03:00
{
let path = &config.cache.file;
2024-05-02 17:41:06 +03:00
let serialized = bitcode::serialize(cache).context("failed to serialize cache")?;
2024-04-18 04:05:38 +03:00
let mut cache_file = tokio::fs::File::create(path)
.await
.with_context(|| format!("failed to open cache at {}", path.display()))?;
2024-05-02 17:41:06 +03:00
let compression_level = config.cache.compression_level;
if config.cache.compress {
let cache_file = cache_file.into_std().await;
tokio::task::spawn_blocking(move || {
std::io::Write::write_all(
2024-05-02 17:41:06 +03:00
&mut zstd::stream::write::Encoder::new(cache_file, compression_level)?
.auto_finish(),
&serialized,
)
})
2024-04-18 04:05:38 +03:00
.await
.context("failed to join blocking thread")?
} else {
cache_file.write_all(&serialized).await
}
.context("failed to write cache to file")?;
2024-04-18 04:05:38 +03:00
info!("wrote cache to {}", path.display());
}
Ok::<(), color_eyre::Report>(())
};
let sigint = signal::ctrl_c();
#[cfg(unix)]
let mut sigterm_handler =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;
#[cfg(unix)]
let sigterm = sigterm_handler.recv();
#[cfg(not(unix))]
let sigterm = std::future::pending::<()>();
tokio::select! {
result = cleanup => {
2024-04-18 19:17:33 +03:00
result.context("cleanup failed, oh well")?;
2024-04-18 04:05:38 +03:00
},
_ = sigint => {
warn!("received second signal, exiting");
exit(1);
},
_ = sigterm => {
warn!("received second signal, exiting");
exit(1);
}
}
Ok(())
}