diff --git a/Cargo.toml b/Cargo.toml index 84ace9c18..37ba11900 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,7 +64,7 @@ sysinfo = { version = "0.37.2", default-features = false, features = ["system"] derive_builder = "0.20.2" # Async -tokio = { version = "1.0", features = ["rt-multi-thread", "signal", "macros"] } +tokio = { version = "1.0", features = ["rt-multi-thread", "signal", "macros", "process"] } tokio-util = { version = "0.7.15", default-features = false, features = ["io"] } tracing-futures= { version = "0.2.5", features = ["std-future", "futures-03"] } futures-util = "0.3.5" diff --git a/clippy.toml b/clippy.toml index df7b6040e..221f59182 100644 --- a/clippy.toml +++ b/clippy.toml @@ -1,3 +1,12 @@ [[disallowed-types]] path = "axum::extract::Path" reason = "use our own custom web::extractors::Path for a nicer error response" + +[[disallowed-types]] +path = "crates_index_diff::Index" +reason = """ + Don't directly use crates_index_diff::Index because it might + lead to issues when used in async contexts. + We have our own wrapper struct (`crate::index::Index`) that is + async, and should be used instead. +""" diff --git a/src/bin/cratesfyi.rs b/src/bin/cratesfyi.rs index 61c4dc1dd..c6cac3673 100644 --- a/src/bin/cratesfyi.rs +++ b/src/bin/cratesfyi.rs @@ -298,8 +298,7 @@ impl QueueSubcommand { (Some(reference), false) => reference, (None, true) => { println!("Fetching changes to set reference to HEAD"); - let (_, oid) = ctx.index.diff()?.peek_changes()?; - oid + ctx.runtime.block_on(ctx.index.latest_commit_reference())? } (_, _) => unreachable!(), }; diff --git a/src/build_queue.rs b/src/build_queue.rs index ec7855846..146544c55 100644 --- a/src/build_queue.rs +++ b/src/build_queue.rs @@ -226,15 +226,13 @@ impl AsyncBuildQueue { /// /// Returns the number of crates added pub async fn get_new_crates(&self, index: &Index) -> Result { - let diff = index.diff()?; - let last_seen_reference = self .last_seen_reference() .await? .context("no last_seen_reference set in database")?; - diff.set_last_seen_reference(last_seen_reference)?; + index.set_last_seen_reference(last_seen_reference).await?; - let (changes, new_reference) = diff.peek_changes_ordered()?; + let (changes, new_reference) = index.peek_changes_ordered().await?; let mut conn = self.db.get_async().await?; let mut crates_added = 0; diff --git a/src/context.rs b/src/context.rs index 008d1a3b7..c87b3d293 100644 --- a/src/context.rs +++ b/src/context.rs @@ -67,14 +67,9 @@ impl Context { let cdn = Arc::new(CdnBackend::new(&config).await); - let index = Arc::new({ - let path = config.registry_index_path.clone(); - if let Some(registry_url) = config.registry_url.clone() { - Index::from_url(path, registry_url) - } else { - Index::new(path) - }? - }); + let index = Arc::new( + Index::from_url(&config.registry_index_path, config.registry_url.as_deref()).await?, + ); let runtime = runtime::Handle::current(); // sync wrappers around build-queue & storage async resources diff --git a/src/index.rs b/src/index.rs index a3ca021f1..133c42a07 100644 --- a/src/index.rs +++ b/src/index.rs @@ -1,79 +1,76 @@ -use crate::error::Result; -use crate::utils::report_error; -use anyhow::Context; -use crates_index_diff::gix; -use std::path::PathBuf; -use std::process::Command; -use std::sync::atomic::AtomicBool; +use crate::{ + error::Result, + utils::{report_error, run_blocking}, +}; +use anyhow::Context as _; +use crates_index_diff::{Change, gix, index::diff::Order}; +use std::{ + path::{Path, PathBuf}, + sync::{Arc, Mutex, atomic::AtomicBool}, +}; +use tokio::process::Command; +const THREAD_NAME: &str = "crates-index-diff"; + +/// async-friendly wrapper around `crates_index_diff::Index` pub struct Index { path: PathBuf, repository_url: Option, + // NOTE: we can use a sync mutex here, because we're only locking it + // inside handle_in_thread calls, so the mutex lock won't ever be held + // across await-points. + #[allow(clippy::disallowed_types)] + index: Arc>, } impl Index { - pub fn from_url(path: PathBuf, url: String) -> Result { - crates_index_diff::Index::from_path_or_cloned_with_options( - &path, - gix::progress::Discard, - &AtomicBool::default(), - crates_index_diff::index::CloneOptions { url: url.clone() }, - ) - .map(|_| ()) - .context("initialising registry index repository")?; + pub async fn from_url( + path: impl AsRef, + repository_url: Option>, + ) -> Result { + let path = path.as_ref().to_path_buf(); + let repository_url = repository_url.map(|url| url.as_ref().to_owned()); - Ok(Self { - path, - repository_url: Some(url), + let clone_options = repository_url + .as_ref() + .map(|url| crates_index_diff::index::CloneOptions { url: url.clone() }) + .unwrap_or_default(); + + let index = run_blocking(THREAD_NAME, { + let path = path.clone(); + move || { + Ok(Arc::new(Mutex::new( + #[allow(clippy::disallowed_types)] + crates_index_diff::Index::from_path_or_cloned_with_options( + &path, + gix::progress::Discard, + &AtomicBool::default(), + clone_options, + ) + .context("initialising registry index repository")?, + ))) + } }) - } + .await?; - pub fn new(path: PathBuf) -> Result { - // This initializes the repository, then closes it afterwards to avoid leaking file descriptors. - // See https://github.com/rust-lang/docs.rs/pull/847 - crates_index_diff::Index::from_path_or_cloned(&path) - .map(|_| ()) - .context("initialising registry index repository")?; Ok(Self { + index, path, - repository_url: None, + repository_url, }) } - pub fn diff(&self) -> Result { - let options = self - .repository_url - .clone() - .map(|url| crates_index_diff::index::CloneOptions { url }) - .unwrap_or_default(); - let diff = crates_index_diff::Index::from_path_or_cloned_with_options( - &self.path, - gix::progress::Discard, - &AtomicBool::default(), - options, - ) - .context("re-opening registry index for diff")?; - Ok(diff) - } - - pub(crate) fn crates(&self) -> Result { - tracing::debug!("Opening with `crates_index`"); - // crates_index requires the repo url to match the existing origin or it tries to reinitialize the repo - let repo_url = self - .repository_url - .as_deref() - .unwrap_or("https://github.com/rust-lang/crates.io-index"); - let mut index = crates_index::GitIndex::with_path(&self.path, repo_url)?; - index.update()?; - Ok(index) + pub async fn new(path: impl AsRef) -> Result { + Self::from_url(path, None::<&str>).await } - pub fn run_git_gc(&self) { + pub async fn run_git_gc(&self) { let gc = Command::new("git") .arg("-C") .arg(&self.path) .args(["gc", "--auto"]) .output() + .await .with_context(|| format!("failed to run `git gc --auto`\npath: {:#?}", &self.path)); if let Err(err) = gc { @@ -81,6 +78,43 @@ impl Index { } } + async fn peek_changes_with_order( + &self, + order: Order, + ) -> Result<(Vec, gix::hash::ObjectId)> { + let index = self.index.clone(); + run_blocking(THREAD_NAME, move || { + let index = index.lock().unwrap(); + index + .peek_changes_with_options(gix::progress::Discard, &AtomicBool::default(), order) + .map_err(Into::into) + }) + .await + } + + pub async fn peek_changes(&self) -> Result<(Vec, gix::hash::ObjectId)> { + self.peek_changes_with_order(Order::ImplementationDefined) + .await + } + + pub async fn peek_changes_ordered(&self) -> Result<(Vec, gix::hash::ObjectId)> { + self.peek_changes_with_order(Order::AsInCratesIndex).await + } + + pub async fn set_last_seen_reference(&self, to: gix::hash::ObjectId) -> Result<()> { + let index = self.index.clone(); + run_blocking(THREAD_NAME, move || { + let index = index.lock().unwrap(); + index.set_last_seen_reference(to).map_err(Into::into) + }) + .await + } + + pub async fn latest_commit_reference(&self) -> Result { + let (_, oid) = self.peek_changes().await?; + Ok(oid) + } + pub fn repository_url(&self) -> Option<&str> { self.repository_url.as_deref() } diff --git a/src/utils/consistency/index.rs b/src/utils/consistency/index.rs index a8a5ac9ff..412f7a811 100644 --- a/src/utils/consistency/index.rs +++ b/src/utils/consistency/index.rs @@ -1,33 +1,53 @@ use super::data::{Crate, Crates, Release, Releases}; -use crate::Index; +use crate::{Config, utils::run_blocking}; use anyhow::Result; use rayon::iter::ParallelIterator; +use tracing::debug; -pub(super) fn load(index: &Index) -> Result { - let mut result: Crates = index - .crates()? - .crates_parallel() - .map(|krate| { - krate.map(|krate| { - let mut releases: Releases = krate - .versions() - .iter() - .map(|version| Release { - version: version.version().into(), - yanked: Some(version.is_yanked()), - }) - .collect(); - releases.sort_by(|lhs, rhs| lhs.version.cmp(&rhs.version)); - - Crate { - name: krate.name().into(), - releases, - } +pub(super) async fn load(config: &Config) -> Result { + let registry_index_path = config.registry_index_path.clone(); + let registry_url = config + .registry_url + .as_deref() + .unwrap_or("https://github.com/rust-lang/crates.io-index") + .to_owned(); + + run_blocking("load-crates-index", move || { + debug!("Opening with `crates_index`"); + let mut index = crates_index::GitIndex::with_path( + ®istry_index_path, + // crates_index requires the repo url to match the existing origin or it tries to reinitialize the repo + ®istry_url, + )?; + + index.update()?; + + let mut result: Crates = index + .crates_parallel() + .map(|krate| { + krate.map(|krate| { + let mut releases: Releases = krate + .versions() + .iter() + .map(|version| Release { + version: version.version().into(), + yanked: Some(version.is_yanked()), + }) + .collect(); + + releases.sort_by(|lhs, rhs| lhs.version.cmp(&rhs.version)); + + Crate { + name: krate.name().into(), + releases, + } + }) }) - }) - .collect::>()?; + .collect::>()?; - result.sort_by(|lhs, rhs| lhs.name.cmp(&rhs.name)); + result.sort_by(|lhs, rhs| lhs.name.cmp(&rhs.name)); - Ok(result) + Ok(result) + }) + .await } diff --git a/src/utils/consistency/mod.rs b/src/utils/consistency/mod.rs index acb90588d..29ac32db0 100644 --- a/src/utils/consistency/mod.rs +++ b/src/utils/consistency/mod.rs @@ -1,4 +1,4 @@ -use crate::{Context, db::delete, utils::spawn_blocking}; +use crate::{Context, db::delete}; use anyhow::{Context as _, Result}; use itertools::Itertools; use tracing::{info, warn}; @@ -32,12 +32,9 @@ pub async fn run_check(ctx: &Context, dry_run: bool) -> Result<()> { .context("Loading crate data from database for consistency check")?; tracing::info!("Loading data from index..."); - let index_data = spawn_blocking({ - let index = ctx.index.clone(); - move || index::load(&index) - }) - .await - .context("Loading crate data from index for consistency check")?; + let index_data = index::load(&ctx.config) + .await + .context("Loading crate data from index for consistency check")?; let diff = diff::calculate_diff(db_data.iter(), index_data.iter()); let result = handle_diff(ctx, diff.iter(), dry_run).await?; diff --git a/src/utils/daemon.rs b/src/utils/daemon.rs index 7699774e6..afdaf02f9 100644 --- a/src/utils/daemon.rs +++ b/src/utils/daemon.rs @@ -12,7 +12,7 @@ use std::future::Future; use std::sync::Arc; use std::thread; use std::time::Duration; -use tokio::{runtime, task::spawn_blocking, time::Instant}; +use tokio::{runtime, time::Instant}; use tracing::{debug, info}; /// Run the registry watcher @@ -41,11 +41,7 @@ pub async fn watch_registry( } if last_gc.elapsed().as_secs() >= config.registry_gc_interval { - spawn_blocking({ - let index = index.clone(); - move || index.run_git_gc() - }) - .await?; + index.run_git_gc().await; last_gc = Instant::now(); } tokio::time::sleep(config.delay_between_registry_fetches).await; diff --git a/src/utils/mod.rs b/src/utils/mod.rs index bbde45481..3d2634bdd 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -22,10 +22,10 @@ mod html; mod queue; pub(crate) mod queue_builder; pub(crate) mod rustc_version; -use anyhow::Result; +use anyhow::{Context as _, Result}; use serde::Serialize; use serde::de::DeserializeOwned; -use std::panic; +use std::{fmt, panic}; use tracing::{Span, error, warn}; pub(crate) mod sized_buffer; @@ -128,6 +128,46 @@ where } } +/// Move the execution of a blocking function into a separate, new thread. +/// +/// Only for long-running / expensive operations that would block the async runtime or its +/// blocking workerpool. +/// +/// The rule should be: +/// * async stuff -> in the tokio runtime, other async functions +/// * blocking I/O -> `spawn_blocking` +/// * CPU-Bound things: +/// - `render_in_threadpool` (continious load like rendering) +/// - `run_blocking` (sporadic CPU bound load) +/// +/// The thread-name will help us better seeing where our CPU load is coming from on the +/// servers. +/// +/// Generally speaking, using tokio's `spawn_blocking` is also ok-ish, if the work is sporadic. +/// But then I wouldn't get thread-names. +pub(crate) async fn run_blocking(name: N, f: F) -> Result +where + N: Into + fmt::Display, + F: FnOnce() -> Result + Send + 'static, + R: Send + 'static, +{ + let name = name.into(); + let span = tracing::Span::current(); + let (send, recv) = tokio::sync::oneshot::channel(); + thread::Builder::new() + .name(format!("docsrs-{name}")) + .spawn(move || { + let _guard = span.enter(); + + // `.send` only fails when the receiver is dropped while we work, + // at which point we don't need the result anymore. + let _ = send.send(f()); + }) + .with_context(|| format!("couldn't spawn worker thread for {}", &name))?; + + recv.await.context("sender was dropped")? +} + pub(crate) fn retry(mut f: impl FnMut() -> Result, max_attempts: u32) -> Result { for attempt in 1.. { match f() {