diff --git a/Cargo.toml b/Cargo.toml index d01b04a9c..3cea9847f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,7 +64,7 @@ sysinfo = { version = "0.37.2", default-features = false, features = ["system"] derive_builder = "0.20.2" # Async -tokio = { version = "1.0", features = ["rt-multi-thread", "signal", "macros"] } +tokio = { version = "1.0", features = ["rt-multi-thread", "signal", "macros", "sync"] } tokio-util = { version = "0.7.15", default-features = false, features = ["io"] } futures-util = "0.3.5" async-stream = "0.3.5" diff --git a/src/bin/cratesfyi.rs b/src/bin/cratesfyi.rs index 61c4dc1dd..1bfb3c08f 100644 --- a/src/bin/cratesfyi.rs +++ b/src/bin/cratesfyi.rs @@ -5,8 +5,9 @@ use docs_rs::{ db::{self, CrateId, Overrides, add_path_into_database}, start_background_metrics_webserver, start_web_server, utils::{ - ConfigName, get_config, get_crate_pattern_and_priority, list_crate_priorities, - queue_builder, remove_crate_priority, set_config, set_crate_priority, + ConfigName, daemon::start_background_cache_cleaner, get_config, + get_crate_pattern_and_priority, list_crate_priorities, queue_builder, + remove_crate_priority, set_config, set_crate_priority, }, }; use futures_util::StreamExt; @@ -214,6 +215,8 @@ impl CommandLine { queue_builder(&ctx, RustwideBuilder::init(&ctx)?)?; } Self::StartWebServer { socket_addr } => { + start_background_cache_cleaner(&ctx)?; + // Blocks indefinitely start_web_server(Some(socket_addr), &ctx)?; } @@ -568,11 +571,17 @@ enum DatabaseSubcommand { #[arg(long)] dry_run: bool, }, + + PruneArchiveIndexCache, } impl DatabaseSubcommand { fn handle_args(self, ctx: Context) -> Result<()> { match self { + Self::PruneArchiveIndexCache => ctx + .runtime + .block_on(async { ctx.async_storage.prune_archive_index_cache().await }) + .context("Failed to prune archive index cache")?, Self::Migrate { version } => ctx .runtime .block_on(async { diff --git a/src/config.rs b/src/config.rs index b99eb77db..16c9c80c9 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,6 +1,6 @@ use crate::{cdn::CdnKind, storage::StorageKind}; use anyhow::{Context, Result, anyhow, bail}; -use std::{env::VarError, error::Error, path::PathBuf, str::FromStr, time::Duration}; +use std::{env::VarError, error::Error, io, path, path::PathBuf, str::FromStr, time::Duration}; use tracing::trace; use url::Url; @@ -85,6 +85,9 @@ pub struct Config { // for the remote archives? pub(crate) local_archive_cache_path: PathBuf, + // How long do we want to keep the locally cached index files? + pub(crate) local_archive_cache_ttl: Duration, + // Where to collect metrics for the metrics initiative. // When empty, we won't collect metrics. pub(crate) compiler_metrics_collection_path: Option, @@ -209,10 +212,14 @@ impl Config { .cdn_max_queued_age(Duration::from_secs(env("DOCSRS_CDN_MAX_QUEUED_AGE", 3600)?)) .cloudfront_distribution_id_web(maybe_env("CLOUDFRONT_DISTRIBUTION_ID_WEB")?) .cloudfront_distribution_id_static(maybe_env("CLOUDFRONT_DISTRIBUTION_ID_STATIC")?) - .local_archive_cache_path(env( + .local_archive_cache_path(ensure_absolute_path(env( "DOCSRS_ARCHIVE_INDEX_CACHE_PATH", prefix.join("archive_cache"), - )?) + )?)?) + .local_archive_cache_ttl(Duration::from_secs(env( + "DOCSRS_ARCHIVE_INDEX_CACHE_TTL", + 48 * 3600, // 48 hours + )?)) .compiler_metrics_collection_path(maybe_env("DOCSRS_COMPILER_METRICS_PATH")?) .temp_dir(temp_dir) .rustwide_workspace(env( @@ -235,6 +242,14 @@ impl Config { } } +fn ensure_absolute_path(path: PathBuf) -> io::Result { + if path.is_absolute() { + Ok(path) + } else { + Ok(path::absolute(&path)?) + } +} + fn env(var: &str, default: T) -> Result where T: FromStr, diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 6925dfc58..f8008dfef 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -4,9 +4,11 @@ mod database; mod s3; pub use self::compression::{CompressionAlgorithm, CompressionAlgorithms, compress, decompress}; -use self::compression::{wrap_reader_for_decompression, wrap_writer_for_compression}; -use self::database::DatabaseBackend; -use self::s3::S3Backend; +use self::{ + compression::{wrap_reader_for_decompression, wrap_writer_for_compression}, + database::DatabaseBackend, + s3::S3Backend, +}; use crate::{ Config, InstanceMetrics, db::{ @@ -18,28 +20,36 @@ use crate::{ utils::spawn_blocking, }; use anyhow::anyhow; +use async_stream::try_stream; use chrono::{DateTime, Utc}; +use dashmap::DashMap; use fn_error_context::context; -use futures_util::stream::BoxStream; +use futures_util::{Stream, StreamExt as _, stream::BoxStream}; use mime::Mime; use path_slash::PathExt; use std::{ - fmt, - fs::{self, File}, + cmp, fmt, + fs::{self, File, Metadata}, io::{self, BufReader}, + iter, num::ParseIntError, ops::RangeInclusive, path::{Path, PathBuf}, + str::FromStr, sync::Arc, + time::{Duration, Instant, SystemTime}, }; -use std::{iter, str::FromStr}; use tokio::{ io::{AsyncRead, AsyncWriteExt}, runtime, + sync::RwLock, }; -use tracing::{error, info_span, instrument, trace}; +use tracing::{debug, error, info, info_span, instrument, trace}; use walkdir::WalkDir; +static ARCHIVE_INDEX_FILE_EXTENSION: &str = "index"; +const ARCHIVE_INDEX_ACCESS_TIME_DEBOUNCE: Duration = Duration::from_secs(3600); + type FileRange = RangeInclusive; #[derive(Debug, thiserror::Error)] @@ -112,6 +122,31 @@ impl StreamingBlob { } } +/// Recursively walks a directory and yields all files (not directories) found within it. +/// +/// Roughly an async version of `get_file_list` below. +fn walk_dir_recursive( + root: impl AsRef, +) -> impl Stream> { + let root = root.as_ref().to_path_buf(); + try_stream! { + let mut dirs = vec![root]; + + while let Some(dir) = dirs.pop() { + let mut entries = tokio::fs::read_dir(&dir).await?; + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + let meta = entry.metadata().await?; + if meta.is_dir() { + dirs.push(path.clone()); + } else { + yield (path, meta); + } + } + } + } +} + pub fn get_file_list>(path: P) -> Box>> { let path = path.as_ref().to_path_buf(); if path.is_file() { @@ -178,6 +213,11 @@ enum StorageBackend { pub struct AsyncStorage { backend: StorageBackend, config: Arc, + /// Locks to synchronize access to the locally cached archive index files. + locks: DashMap>>, + /// for debouncing the update to last-access-time on the locally cached + /// archive index files. + access_time_last_touch: DashMap, } impl AsyncStorage { @@ -196,6 +236,8 @@ impl AsyncStorage { } }, config, + locks: DashMap::new(), + access_time_last_touch: DashMap::new(), }) } @@ -310,12 +352,10 @@ impl AsyncStorage { path: &str, ) -> Result { match self - .download_archive_index(archive_path, latest_build_id) + .find_in_archive_index(archive_path, latest_build_id, path) .await { - Ok(index_filename) => Ok(archive_index::find_in_file(index_filename, path) - .await? - .is_some()), + Ok(file_info) => Ok(file_info.is_some()), Err(err) => { if err.downcast_ref::().is_some() { Ok(false) @@ -372,41 +412,216 @@ impl AsyncStorage { Ok(blob.decompress()) } + #[instrument(skip(self, meta))] + async fn check_and_prune_archive_index_file(&self, path: PathBuf, meta: Metadata) { + let ttl = self.config.local_archive_cache_ttl; + + let rwlock = self.local_index_cache_lock(&path); + + let Ok(_write_guard) = rwlock.try_write() else { + // if we can't acquire the write/exclusive lock, we assume + // the file is still in use, so we don't have to check its age, + // because we would never delete it. + trace!("archive index file is in use, skipping prune"); + return; + }; + + // typically mtime is just the download-time of the index, and atime is newer. + // In case it's not, we use the latest of both. + let last_recorded_access = match (meta.modified(), meta.accessed()) { + (Ok(mtime), Ok(atime)) => Some(cmp::max(atime, mtime)), + (Err(err), Ok(atime)) => { + debug!(?err, "error fetching modified-time, using access-time"); + Some(atime) + } + (Ok(mtime), Err(err)) => { + debug!(?err, "error fetching access-time, using modified-time"); + Some(mtime) + } + (Err(err_mtime), Err(err_atime)) => { + debug!( + ?err_mtime, + ?err_atime, + "error fetching either modified-time or access-time" + ); + None + } + }; + + let now = SystemTime::now(); + let age = last_recorded_access.and_then(|atime| now.duration_since(atime).ok()); + let should_delete = age.map(|age| age > ttl).unwrap_or(false); + + if !should_delete { + trace!(?last_recorded_access, ?age, "file is recent, skipping"); + + return; + } + + trace!( + ?last_recorded_access, + ?age, + "removing cached archive index file" + ); + + // Best-effort: remove WAL/SHM companions first, then the DB file. + let wal = path.with_extension(format!("{ARCHIVE_INDEX_FILE_EXTENSION}-wal")); + let shm = path.with_extension(format!("{ARCHIVE_INDEX_FILE_EXTENSION}-shm")); + let _ = tokio::fs::remove_file(&wal).await; + let _ = tokio::fs::remove_file(&shm).await; + + match tokio::fs::remove_file(&path).await { + Ok(_) => { + self.locks.remove(&path); + self.access_time_last_touch.remove(&path); + } + Err(err) => { + debug!(?err, "couldn't delete archive index file"); + } + } + } + + /// remove old cached archive index files. + /// + /// We're using the access-time from the filesystem to determine the last access time. + /// + /// On linux, this field is updated only once every 24h, and on mac & windows not at all. + /// So we manually update the access-time when we use the file, debounced to once per hour. + #[instrument(skip(self))] + pub async fn prune_archive_index_cache(&self) -> Result<()> { + let ttl = self.config.local_archive_cache_ttl; + let cache_dir = &self.config.local_archive_cache_path; + let now = SystemTime::now(); + + info!(?ttl, ?now, cache_dir=%cache_dir.display(), "pruning archive index cache"); + + walk_dir_recursive(&cache_dir) + .for_each_concurrent(8, |item| async move { + let Ok((path, meta)) = item else { return }; + + if path.is_dir() { + // this shouldn't happen, since walk_dir_recursive only emits files, not + // directories + return; + } + + if let Some(ext) = path.extension() + && ext != ARCHIVE_INDEX_FILE_EXTENSION + { + return; + } + + self.check_and_prune_archive_index_file(path, meta).await; + }) + .await; + + Ok(()) + } + + fn local_index_cache_lock(&self, local_index_path: impl AsRef) -> Arc> { + let local_index_path = local_index_path.as_ref().to_path_buf(); + + self.locks + .entry(local_index_path) + .or_insert_with(|| Arc::new(RwLock::new(()))) + .downgrade() + .clone() + } + #[instrument] - pub(super) async fn download_archive_index( + async fn find_in_archive_index( &self, archive_path: &str, latest_build_id: Option, - ) -> Result { - // remote/folder/and/x.zip.index - let remote_index_path = format!("{archive_path}.index"); + path_in_archive: &str, + ) -> Result> { + // we know that config.local_archive_cache_path is an absolute path, not relative. + // So it will be usable as key in the DashMap. let local_index_path = self.config.local_archive_cache_path.join(format!( - "{archive_path}.{}.index", + "{archive_path}.{}.{ARCHIVE_INDEX_FILE_EXTENSION}", latest_build_id.map(|id| id.0).unwrap_or(0) )); - if !local_index_path.exists() { - let index_content = self.get(&remote_index_path, usize::MAX).await?.content; + let rwlock = self.local_index_cache_lock(&local_index_path); - tokio::fs::create_dir_all( - local_index_path - .parent() - .ok_or_else(|| anyhow!("index path without parent"))?, - ) - .await?; + // directly acquire the read-lock, so the syscall (`path.exists()`) below is already + // protected. + let mut _read_guard = rwlock.read().await; + + if !tokio::fs::try_exists(&local_index_path).await? { + // upgrade the lock to a write-lock for downloading & storing the index. + drop(_read_guard); + let _write_guard = rwlock.write().await; + + // check existance again in case of Race Condition (TOCTOU) + if !tokio::fs::try_exists(&local_index_path).await? { + // remote/folder/and/x.zip.index + let remote_index_path = format!("{archive_path}.{ARCHIVE_INDEX_FILE_EXTENSION}"); + + tokio::fs::create_dir_all( + local_index_path + .parent() + .ok_or_else(|| anyhow!("index path without parent"))?, + ) + .await?; + + let temp_path = + tempfile::NamedTempFile::new_in(&self.config.local_archive_cache_path)? + .into_temp_path(); + + { + let mut file = tokio::fs::File::create(&temp_path).await?; + let mut stream = self.get_stream(&remote_index_path).await?.content; + + tokio::io::copy(&mut stream, &mut file).await?; + + file.flush().await?; + } + + tokio::fs::rename(temp_path, &local_index_path).await?; + } + + _read_guard = _write_guard.downgrade(); + } - // when we don't have a locally cached index and many parallel request - // we might download the same archive index multiple times here. - // So we're storing the content into a temporary file before renaming it - // into the final location. - let temp_path = tempfile::NamedTempFile::new_in(&self.config.local_archive_cache_path)? - .into_temp_path(); - let mut file = tokio::fs::File::create(&temp_path).await?; - file.write_all(&index_content).await?; - tokio::fs::rename(temp_path, &local_index_path).await?; + // update last-access-time, debounced + // + // Linux is the only system that updates atime on file access by default, when the mount + // option is set. But even then, it's only updated once every 24h. + // We want to update more often, so we do it ourselves here, but only once per hour. + let now_instant = Instant::now(); + let touch = match self + .access_time_last_touch + .get(&local_index_path) + .map(|t| *t) + { + Some(prev) => now_instant.duration_since(prev) >= ARCHIVE_INDEX_ACCESS_TIME_DEBOUNCE, + None => true, + }; + if touch { + debug!(path=%local_index_path.display(), "updating access time of local archive index cache"); + + self.access_time_last_touch + .insert(local_index_path.clone(), now_instant); + + spawn_blocking({ + let local_index_path = local_index_path.clone(); + move || { + let file = fs::OpenOptions::new() + .read(true) + .write(true) + .open(&local_index_path)?; + + let now = SystemTime::now(); + let times = fs::FileTimes::new().set_accessed(now); + file.set_times(times)?; + Ok(()) + } + }) + .await?; } - Ok(local_index_path) + archive_index::find_in_file(local_index_path, path_in_archive).await } #[instrument] @@ -417,11 +632,8 @@ impl AsyncStorage { path: &str, max_size: usize, ) -> Result { - let index_filename = self - .download_archive_index(archive_path, latest_build_id) - .await?; - - let info = archive_index::find_in_file(index_filename, path) + let info = self + .find_in_archive_index(archive_path, latest_build_id, path) .await? .ok_or(PathNotFoundError)?; @@ -451,11 +663,8 @@ impl AsyncStorage { latest_build_id: Option, path: &str, ) -> Result { - let index_filename = self - .download_archive_index(archive_path, latest_build_id) - .await?; - - let info = archive_index::find_in_file(index_filename, path) + let info = self + .find_in_archive_index(archive_path, latest_build_id, path) .await? .ok_or(PathNotFoundError)?; @@ -528,7 +737,7 @@ impl AsyncStorage { .await?; let alg = CompressionAlgorithm::default(); - let remote_index_path = format!("{}.index", &archive_path); + let remote_index_path = format!("{}.{ARCHIVE_INDEX_FILE_EXTENSION}", &archive_path); let compressed_index_content = { let _span = info_span!("create_archive_index", %remote_index_path).entered(); @@ -839,17 +1048,6 @@ impl Storage { .block_on(self.inner.get_range(path, max_size, range, compression)) } - pub(super) fn download_index( - &self, - archive_path: &str, - latest_build_id: Option, - ) -> Result { - self.runtime.block_on( - self.inner - .download_archive_index(archive_path, latest_build_id), - ) - } - pub(crate) fn get_from_archive( &self, archive_path: &str, @@ -1328,16 +1526,17 @@ mod backend_tests { fs::write(path, "data")?; } + let remote_index_filename = format!("folder/test.zip.0.{ARCHIVE_INDEX_FILE_EXTENSION}"); let local_index_location = storage .inner .config .local_archive_cache_path - .join("folder/test.zip.0.index"); + .join(&remote_index_filename); let (stored_files, compression_alg) = storage.store_all_in_archive("folder/test.zip", dir.path())?; - assert!(storage.exists("folder/test.zip.index")?); + assert!(storage.exists(&remote_index_filename)?); assert_eq!(compression_alg, CompressionAlgorithm::Bzip2); assert_eq!(stored_files.len(), files.len()); diff --git a/src/utils/daemon.rs b/src/utils/daemon.rs index 7699774e6..b3387c1ec 100644 --- a/src/utils/daemon.rs +++ b/src/utils/daemon.rs @@ -118,6 +118,42 @@ pub fn start_background_queue_rebuild(context: &Context) -> Result<(), Error> { Ok(()) } +/// prunes the local cache directories for the webserver +/// +/// This runs every hour, but also directly after webserver startup. +pub fn start_background_cache_cleaner(context: &Context) -> Result<(), Error> { + let runtime = context.runtime.clone(); + let storage = context.async_storage.clone(); + + runtime.spawn({ + let storage = storage.clone(); + async move { + tokio::time::sleep(Duration::from_secs(60)).await; + if let Err(err) = storage + .prune_archive_index_cache() + .await + .context("error running the initial archive index cache pruning") + { + report_error(&err); + } + } + }); + + async_cron( + &runtime, + "archive index cache pruning", + Duration::from_secs(3600), + move || { + let storage = storage.clone(); + async move { + storage.prune_archive_index_cache().await?; + Ok(()) + } + }, + ); + Ok(()) +} + pub fn start_background_cdn_invalidator(context: &Context) -> Result<(), Error> { let metrics = context.instance_metrics.clone(); let config = context.config.clone(); @@ -206,6 +242,7 @@ pub fn start_daemon(context: Context, enable_registry_watcher: bool) -> Result<( start_background_repository_stats_updater(&context)?; start_background_cdn_invalidator(&context)?; start_background_queue_rebuild(&context)?; + start_background_cache_cleaner(&context)?; // NOTE: if a error occurred earlier in `start_daemon`, the server will _not_ be joined - // instead it will get killed when the process exits.