add compression to cache and remove precompression

This commit is contained in:
slonkazoid 2024-04-20 23:02:23 +03:00
parent 18385d3e57
commit ed1a858d51
Signed by: slonk
SSH key fingerprint: SHA256:tbZfJX4IOvZ0LGWOWu5Ijo8jfMPi78TU7x1VoEeCIjM
9 changed files with 175 additions and 244 deletions

42
Cargo.lock generated
View file

@ -380,6 +380,7 @@ dependencies = [
"tower-http",
"tracing",
"tracing-subscriber",
"zstd",
]
[[package]]
@ -460,6 +461,10 @@ name = "cc"
version = "1.0.94"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17f6e324229dc011159fcc089755d1e2e216a90d43a7dea6853ca740b84f35e7"
dependencies = [
"jobserver",
"libc",
]
[[package]]
name = "cfg-if"
@ -1196,6 +1201,15 @@ version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b"
[[package]]
name = "jobserver"
version = "0.1.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "685a7d121ee3f65ae4fddd72b25a04bb36b6af81bc0828f7d5434c0fe60fa3a2"
dependencies = [
"libc",
]
[[package]]
name = "js-sys"
version = "0.3.69"
@ -2613,3 +2627,31 @@ checksum = "56c1936c4cc7a1c9ab21a1ebb602eb942ba868cbd44a99cb7cdc5892335e1c85"
dependencies = [
"linked-hash-map",
]
[[package]]
name = "zstd"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d789b1514203a1120ad2429eae43a7bd32b90976a7bb8a05f7ec02fa88cc23a"
dependencies = [
"zstd-safe",
]
[[package]]
name = "zstd-safe"
version = "7.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1cd99b45c6bc03a018c8b8a86025678c87e55526064e38f9df301989dce7ec0a"
dependencies = [
"zstd-sys",
]
[[package]]
name = "zstd-sys"
version = "2.0.10+zstd.1.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c253a4914af5bafc8fa8c86ee400827e83cf6ec01195ec1f1ed8441bf00d65aa"
dependencies = [
"cc",
"pkg-config",
]

View file

@ -47,3 +47,4 @@ tower-http = { version = "0.5.2", features = [
], default-features = false }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
zstd = "0.13.1"

View file

@ -20,7 +20,7 @@ blazingly fast markdown blog software written in rust memory safe
- [ ] better error reporting and pages
- [ ] better tracing
- [ ] cache cleanup task
- [ ] (de)compress cache with zstd on startup/shutdown
- [x] (de)compress cache with zstd on startup/shutdown
- [ ] make date parsing less strict
- [ ] make date formatting better
- [ ] clean up imports and require less features
@ -42,9 +42,11 @@ markdown_access = true # allow users to see the raw markdown of a post
[cache] # cache settings
enable = true # save metadata and rendered posts into RAM
# highly recommended, only turn off if asolutely necessary
#persistence = "..." # file to save the cache to on shutdown, and
# to load from on startup. uncomment to enable
# highly recommended, only turn off if absolutely necessary
persistence = false # save the cache to on shutdown and load on startup
file = "cache" # file to save the cache to
compress = true # compress the cache file
compression_level = 3 # zstd compression level, 3 is recommended
[render] # post rendering settings
syntect.load_defaults = false # include default syntect themes
@ -135,3 +137,18 @@ standard. examples of valid and invalid dates:
- `GET /posts/<name>`: view a post
- `GET /posts/<name>.md`: view the raw markdown of a post
- `GET /post/*`: redirects to `/posts/*`
## Cache
bingus-blog caches every post retrieved and keeps it permanently in cache.
the only way a cache entry is removed is when it's requested and it does
not exist in the filesystem. cache entries don't expire, but they get
invalidated when the mtime of the markdown file changes.
if cache persistence is on, the cache is compressed & written on shutdown,
and read & decompressed on startup. one may opt to set the cache location
to point to a tmpfs so it saves and loads really fast, but it doesn't persist
across boots, also at the cost of even more RAM usage.
the compression reduced a 3.21 MB file cache into 0.18 MB with almost instantly.
there is basically no good reason to not have compression on.

View file

@ -1,20 +0,0 @@
use std::{
ffi::{OsStr, OsString},
path::{Path, PathBuf},
};
// i will kill you rust stdlib
pub trait Append<T>
where
Self: Into<OsString>,
T: From<OsString>,
{
fn append(self, ext: impl AsRef<OsStr>) -> T {
let mut buffer: OsString = self.into();
buffer.push(ext.as_ref());
T::from(buffer)
}
}
impl Append<PathBuf> for PathBuf {}
impl Append<PathBuf> for &Path {}

View file

@ -1,60 +0,0 @@
// TODO: make this bearable
use std::{
fs::{self, Metadata},
io::{self, Result},
path::Path,
process::{Child, Command},
sync::Mutex,
};
fn compress_file(path: &Path, metadata: Metadata, handles: &Mutex<Vec<Child>>) -> Result<()> {
let compressed_file = format!("{}.gz", path.to_str().unwrap());
if match fs::metadata(compressed_file) {
Ok(existing_metadata) => metadata.modified()? > existing_metadata.modified()?,
Err(err) => match err.kind() {
io::ErrorKind::NotFound => true,
_ => return Err(err),
},
} {
let mut handles_guard = handles.lock().unwrap();
handles_guard.push(Command::new("gzip").arg("-kf5").arg(path).spawn()?);
}
Ok(())
}
fn compress_recursively(path: &Path, handles: &Mutex<Vec<Child>>) -> Result<()> {
let metadata = fs::metadata(path)?;
if metadata.is_dir() {
for entry in fs::read_dir(path)? {
compress_recursively(&entry?.path(), handles)?
}
Ok(())
} else if match path.extension() {
Some(ext) => ext == "gz",
None => false,
} || metadata.is_symlink()
{
Ok(())
} else {
compress_file(path, metadata, handles)
}
}
pub fn compress_epicly<P: AsRef<Path>>(path: P) -> Result<u64> {
let mut i = 0;
let handles = Mutex::new(Vec::new());
compress_recursively(AsRef::<Path>::as_ref(&path), &handles)?;
let handles = handles.into_inner().unwrap();
for mut handle in handles {
assert!(handle.wait().unwrap().success());
i += 1;
}
Ok(i)
}

View file

@ -9,6 +9,8 @@ use serde::{Deserialize, Serialize};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tracing::{error, info};
use crate::ranged_i128_visitor::RangedI128Visitor;
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
#[serde(default)]
pub struct SyntectConfig {
@ -23,19 +25,15 @@ pub struct RenderConfig {
pub syntect: SyntectConfig,
}
#[cfg(feature = "precompression")]
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(default)]
pub struct PrecompressionConfig {
pub enable: bool,
pub watch: bool,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(default)]
pub struct CacheConfig {
pub enable: bool,
pub persistence: Option<PathBuf>,
pub persistence: bool,
pub file: PathBuf,
pub compress: bool,
#[serde(deserialize_with = "check_zstd_level_bounds")]
pub compression_level: i32,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
@ -47,8 +45,6 @@ pub struct Config {
pub description: String,
pub posts_dir: PathBuf,
pub render: RenderConfig,
#[cfg(feature = "precompression")]
pub precompression: PrecompressionConfig,
pub cache: CacheConfig,
pub markdown_access: bool,
}
@ -62,8 +58,6 @@ impl Default for Config {
description: "blazingly fast markdown blog software written in rust memory safe".into(),
render: Default::default(),
posts_dir: "posts".into(),
#[cfg(feature = "precompression")]
precompression: Default::default(),
cache: Default::default(),
markdown_access: true,
}
@ -80,21 +74,14 @@ impl Default for SyntectConfig {
}
}
#[cfg(feature = "precompression")]
impl Default for PrecompressionConfig {
fn default() -> Self {
Self {
enable: false,
watch: true,
}
}
}
impl Default for CacheConfig {
fn default() -> Self {
Self {
enable: true,
persistence: None,
persistence: false,
file: "cache".into(),
compress: true,
compression_level: 3,
}
}
}
@ -143,3 +130,11 @@ pub async fn load() -> Result<Config> {
},
}
}
fn check_zstd_level_bounds<'de, D>(d: D) -> Result<i32, D::Error>
where
D: serde::Deserializer<'de>,
{
d.deserialize_i32(RangedI128Visitor::<1, 22>)
.map(|x| x as i32)
}

View file

@ -1,16 +1,15 @@
#![feature(let_chains, stmt_expr_attributes, proc_macro_hygiene)]
mod append_path;
mod compress;
mod config;
mod error;
mod filters;
mod hash_arc_store;
mod markdown_render;
mod post;
mod watcher;
mod ranged_i128_visitor;
use std::future::IntoFuture;
use std::io::Read;
use std::net::SocketAddr;
use std::process::exit;
use std::sync::Arc;
@ -35,11 +34,9 @@ use tracing::level_filters::LevelFilter;
use tracing::{error, info, info_span, warn, Span};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
use crate::compress::compress_epicly;
use crate::config::Config;
use crate::error::PostError;
use crate::post::{PostManager, PostMetadata, RenderStats};
use crate::watcher::watch;
type ArcState = Arc<AppState>;
@ -158,53 +155,38 @@ async fn main() -> eyre::Result<()> {
let mut tasks = JoinSet::new();
let mut cancellation_tokens = Vec::new();
#[cfg(feature = "precompression")]
if config.precompression.enable {
let span = info_span!("compression");
info!(parent: span.clone(), "compressing static");
let compressed = tokio::task::spawn_blocking(|| compress_epicly("static"))
.await
.unwrap()
.context("couldn't compress static")?;
let _handle = span.enter();
if compressed > 0 {
info!(compressed_files=%compressed, "compressed {compressed} files");
}
if config.precompression.watch {
info!("starting compressor task");
let span = span.clone();
let token = CancellationToken::new();
let passed_token = token.clone();
tasks.spawn(async move {
watch(span, passed_token, Default::default())
.await
.context("failed to watch static")
.unwrap()
});
cancellation_tokens.push(token);
}
}
let posts = if config.cache.enable {
if let Some(path) = config.cache.persistence.as_ref()
&& tokio::fs::try_exists(&path)
if config.cache.persistence
&& tokio::fs::try_exists(&config.cache.file)
.await
.with_context(|| format!("failed to check if {} exists", path.display()))?
.with_context(|| {
format!("failed to check if {} exists", config.cache.file.display())
})?
{
info!("loading cache from file");
let path = &config.cache.file;
let load_cache = async {
let mut cache_file = tokio::fs::File::open(&path)
.await
.context("failed to open cache file")?;
let mut serialized = Vec::with_capacity(4096);
let serialized = if config.cache.compress {
let cache_file = cache_file.into_std().await;
tokio::task::spawn_blocking(move || {
let mut buf = Vec::with_capacity(4096);
zstd::stream::read::Decoder::new(cache_file)?.read_to_end(&mut buf)?;
Ok::<_, std::io::Error>(buf)
})
.await
.context("failed to join blocking thread")?
.context("failed to read cache file")?
} else {
let mut buf = Vec::with_capacity(4096);
cache_file
.read_to_end(&mut serialized)
.read_to_end(&mut buf)
.await
.context("failed to read cache file")?;
buf
};
let cache =
bitcode::deserialize(serialized.as_slice()).context("failed to parse cache")?;
Ok::<PostManager, color_eyre::Report>(PostManager::new_with_cache(
@ -219,7 +201,11 @@ async fn main() -> eyre::Result<()> {
Err(err) => {
error!("failed to load cache: {}", err);
info!("using empty cache");
PostManager::new(config.posts_dir.clone(), config.render.clone())
PostManager::new_with_cache(
config.posts_dir.clone(),
config.render.clone(),
Default::default(),
)
}
}
} else {
@ -330,18 +316,27 @@ async fn main() -> eyre::Result<()> {
AppState::clone(state.as_ref())
});
if config.cache.enable
&& let Some(path) = config.cache.persistence.as_ref()
&& config.cache.persistence
&& let Some(cache) = posts.into_cache()
{
let cache = posts
.into_cache()
.unwrap_or_else(|| unreachable!("cache should always exist in this state"));
let mut serialized = bitcode::serialize(&cache).context("failed to serialize cache")?;
let path = &config.cache.file;
let serialized = bitcode::serialize(&cache).context("failed to serialize cache")?;
let mut cache_file = tokio::fs::File::create(path)
.await
.with_context(|| format!("failed to open cache at {}", path.display()))?;
cache_file
.write_all(serialized.as_mut_slice())
if config.cache.compress {
let cache_file = cache_file.into_std().await;
tokio::task::spawn_blocking(move || {
std::io::Write::write_all(
&mut zstd::stream::write::Encoder::new(cache_file, 3)?.auto_finish(),
&serialized,
)
})
.await
.context("failed to join blocking thread")?
} else {
cache_file.write_all(&serialized).await
}
.context("failed to write cache to file")?;
info!("wrote cache to {}", path.display());
}

View file

@ -0,0 +1,37 @@
pub struct RangedI128Visitor<const START: i128, const END: i128>;
impl<'de, const START: i128, const END: i128> serde::de::Visitor<'de>
for RangedI128Visitor<START, END>
{
type Value = i128;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(formatter, "an integer between {START} and {END}")
}
fn visit_i32<E>(self, v: i32) -> std::result::Result<Self::Value, E>
where
E: serde::de::Error,
{
self.visit_i128(v as i128)
}
fn visit_i64<E>(self, v: i64) -> std::prelude::v1::Result<Self::Value, E>
where
E: serde::de::Error,
{
self.visit_i128(v as i128)
}
fn visit_i128<E>(self, v: i128) -> std::prelude::v1::Result<Self::Value, E>
where
E: serde::de::Error,
{
if v >= START && v <= END {
Ok(v)
} else {
Err(E::custom(format!(
"integer is out of bounds ({START}..{END})"
)))
}
}
}

View file

@ -1,76 +0,0 @@
use notify::{event::RemoveKind, Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use tokio_util::sync::CancellationToken;
use tracing::{info, Span};
use crate::append_path::Append;
use crate::compress::compress_epicly;
pub async fn watch(
span: Span,
token: CancellationToken,
config: Config,
) -> Result<(), notify::Error> {
let (tx, mut rx) = tokio::sync::mpsc::channel(12);
let mut watcher = RecommendedWatcher::new(
move |res| {
tx.blocking_send(res)
.expect("failed to send message over channel")
},
config,
)?;
watcher.watch(std::path::Path::new("static"), RecursiveMode::Recursive)?;
while let Some(received) = tokio::select! {
received = rx.recv() => received,
_ = token.cancelled() => return Ok(())
} {
match received {
Ok(event) => {
if event.kind.is_create() || event.kind.is_modify() {
let cloned_span = span.clone();
let compressed =
tokio::task::spawn_blocking(move || -> std::io::Result<u64> {
let _handle = cloned_span.enter();
let mut i = 0;
for path in event.paths {
if path.extension().is_some_and(|ext| ext == "gz") {
continue;
}
info!("{} changed, compressing", path.display());
i += compress_epicly(&path)?;
}
Ok(i)
})
.await
.unwrap()?;
if compressed > 0 {
let _handle = span.enter();
info!(compressed_files=%compressed, "compressed {compressed} files");
}
} else if let EventKind::Remove(remove_event) = event.kind // UNSTABLE
&& matches!(remove_event, RemoveKind::File)
{
for path in event.paths {
if path.extension().is_some_and(|ext| ext == "gz") {
continue;
}
let gz_path = path.clone().append(".gz");
if tokio::fs::try_exists(&gz_path).await? {
info!(
"{} removed, also removing {}",
path.display(),
gz_path.display()
);
tokio::fs::remove_file(&gz_path).await?
}
}
}
}
Err(err) => return Err(err),
}
}
Ok(())
}