Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ sysinfo = { version = "0.37.2", default-features = false, features = ["system"]
derive_builder = "0.20.2"

# Async
tokio = { version = "1.0", features = ["rt-multi-thread", "signal", "macros"] }
tokio = { version = "1.0", features = ["rt-multi-thread", "signal", "macros", "process"] }
tokio-util = { version = "0.7.15", default-features = false, features = ["io"] }
tracing-futures= { version = "0.2.5", features = ["std-future", "futures-03"] }
futures-util = "0.3.5"
Expand Down
9 changes: 9 additions & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
[[disallowed-types]]
path = "axum::extract::Path"
reason = "use our own custom web::extractors::Path for a nicer error response"

[[disallowed-types]]
path = "crates_index_diff::Index"
reason = """
Don't directly use crates_index_diff::Index because it might
lead to issues when used in async contexts.
We have our own wrapper struct (`crate::index::Index`) that is
async, and should be used instead.
"""
3 changes: 1 addition & 2 deletions src/bin/cratesfyi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,7 @@ impl QueueSubcommand {
(Some(reference), false) => reference,
(None, true) => {
println!("Fetching changes to set reference to HEAD");
let (_, oid) = ctx.index.diff()?.peek_changes()?;
oid
ctx.runtime.block_on(ctx.index.latest_commit_reference())?
}
(_, _) => unreachable!(),
};
Expand Down
6 changes: 2 additions & 4 deletions src/build_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,15 +226,13 @@ impl AsyncBuildQueue {
///
/// Returns the number of crates added
pub async fn get_new_crates(&self, index: &Index) -> Result<usize> {
let diff = index.diff()?;

let last_seen_reference = self
.last_seen_reference()
.await?
.context("no last_seen_reference set in database")?;
diff.set_last_seen_reference(last_seen_reference)?;
index.set_last_seen_reference(last_seen_reference).await?;

let (changes, new_reference) = diff.peek_changes_ordered()?;
let (changes, new_reference) = index.peek_changes_ordered().await?;

let mut conn = self.db.get_async().await?;
let mut crates_added = 0;
Expand Down
11 changes: 3 additions & 8 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,9 @@ impl Context {

let cdn = Arc::new(CdnBackend::new(&config).await);

let index = Arc::new({
let path = config.registry_index_path.clone();
if let Some(registry_url) = config.registry_url.clone() {
Index::from_url(path, registry_url)
} else {
Index::new(path)
}?
});
let index = Arc::new(
Index::from_url(&config.registry_index_path, config.registry_url.as_deref()).await?,
);

let runtime = runtime::Handle::current();
// sync wrappers around build-queue & storage async resources
Expand Down
142 changes: 88 additions & 54 deletions src/index.rs
Original file line number Diff line number Diff line change
@@ -1,86 +1,120 @@
use crate::error::Result;
use crate::utils::report_error;
use anyhow::Context;
use crates_index_diff::gix;
use std::path::PathBuf;
use std::process::Command;
use std::sync::atomic::AtomicBool;
use crate::{
error::Result,
utils::{report_error, run_blocking},
};
use anyhow::Context as _;
use crates_index_diff::{Change, gix, index::diff::Order};
use std::{
path::{Path, PathBuf},
sync::{Arc, Mutex, atomic::AtomicBool},
};
use tokio::process::Command;

const THREAD_NAME: &str = "crates-index-diff";

/// async-friendly wrapper around `crates_index_diff::Index`
pub struct Index {
path: PathBuf,
repository_url: Option<String>,
// NOTE: we can use a sync mutex here, because we're only locking it
// inside handle_in_thread calls, so the mutex lock won't ever be held
// across await-points.
#[allow(clippy::disallowed_types)]
index: Arc<Mutex<crates_index_diff::Index>>,
}

impl Index {
pub fn from_url(path: PathBuf, url: String) -> Result<Self> {
crates_index_diff::Index::from_path_or_cloned_with_options(
&path,
gix::progress::Discard,
&AtomicBool::default(),
crates_index_diff::index::CloneOptions { url: url.clone() },
)
.map(|_| ())
.context("initialising registry index repository")?;
pub async fn from_url(
path: impl AsRef<Path>,
repository_url: Option<impl AsRef<str>>,
) -> Result<Self> {
let path = path.as_ref().to_path_buf();
let repository_url = repository_url.map(|url| url.as_ref().to_owned());

Ok(Self {
path,
repository_url: Some(url),
let clone_options = repository_url
.as_ref()
.map(|url| crates_index_diff::index::CloneOptions { url: url.clone() })
.unwrap_or_default();

let index = run_blocking(THREAD_NAME, {
let path = path.clone();
move || {
Ok(Arc::new(Mutex::new(
#[allow(clippy::disallowed_types)]
crates_index_diff::Index::from_path_or_cloned_with_options(
&path,
gix::progress::Discard,
&AtomicBool::default(),
clone_options,
)
.context("initialising registry index repository")?,
)))
}
})
}
.await?;

pub fn new(path: PathBuf) -> Result<Self> {
// This initializes the repository, then closes it afterwards to avoid leaking file descriptors.
// See https://github.com/rust-lang/docs.rs/pull/847
crates_index_diff::Index::from_path_or_cloned(&path)
.map(|_| ())
.context("initialising registry index repository")?;
Ok(Self {
index,
path,
repository_url: None,
repository_url,
})
}

pub fn diff(&self) -> Result<crates_index_diff::Index> {
let options = self
.repository_url
.clone()
.map(|url| crates_index_diff::index::CloneOptions { url })
.unwrap_or_default();
let diff = crates_index_diff::Index::from_path_or_cloned_with_options(
&self.path,
gix::progress::Discard,
&AtomicBool::default(),
options,
)
.context("re-opening registry index for diff")?;
Ok(diff)
}

pub(crate) fn crates(&self) -> Result<crates_index::GitIndex> {
tracing::debug!("Opening with `crates_index`");
// crates_index requires the repo url to match the existing origin or it tries to reinitialize the repo
let repo_url = self
.repository_url
.as_deref()
.unwrap_or("https://github.com/rust-lang/crates.io-index");
let mut index = crates_index::GitIndex::with_path(&self.path, repo_url)?;
index.update()?;
Ok(index)
pub async fn new(path: impl AsRef<Path>) -> Result<Self> {
Self::from_url(path, None::<&str>).await
}

pub fn run_git_gc(&self) {
pub async fn run_git_gc(&self) {
let gc = Command::new("git")
.arg("-C")
.arg(&self.path)
.args(["gc", "--auto"])
.output()
.await
.with_context(|| format!("failed to run `git gc --auto`\npath: {:#?}", &self.path));

if let Err(err) = gc {
report_error(&err);
}
}

async fn peek_changes_with_order(
&self,
order: Order,
) -> Result<(Vec<Change>, gix::hash::ObjectId)> {
let index = self.index.clone();
run_blocking(THREAD_NAME, move || {
let index = index.lock().unwrap();
index
.peek_changes_with_options(gix::progress::Discard, &AtomicBool::default(), order)
.map_err(Into::into)
})
.await
}

pub async fn peek_changes(&self) -> Result<(Vec<Change>, gix::hash::ObjectId)> {
self.peek_changes_with_order(Order::ImplementationDefined)
.await
}

pub async fn peek_changes_ordered(&self) -> Result<(Vec<Change>, gix::hash::ObjectId)> {
self.peek_changes_with_order(Order::AsInCratesIndex).await
}

pub async fn set_last_seen_reference(&self, to: gix::hash::ObjectId) -> Result<()> {
let index = self.index.clone();
run_blocking(THREAD_NAME, move || {
let index = index.lock().unwrap();
index.set_last_seen_reference(to).map_err(Into::into)
})
.await
}

pub async fn latest_commit_reference(&self) -> Result<gix::ObjectId> {
let (_, oid) = self.peek_changes().await?;
Ok(oid)
}

pub fn repository_url(&self) -> Option<&str> {
self.repository_url.as_deref()
}
Expand Down
70 changes: 45 additions & 25 deletions src/utils/consistency/index.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,53 @@
use super::data::{Crate, Crates, Release, Releases};
use crate::Index;
use crate::{Config, utils::run_blocking};
use anyhow::Result;
use rayon::iter::ParallelIterator;
use tracing::debug;

pub(super) fn load(index: &Index) -> Result<Crates> {
let mut result: Crates = index
.crates()?
.crates_parallel()
.map(|krate| {
krate.map(|krate| {
let mut releases: Releases = krate
.versions()
.iter()
.map(|version| Release {
version: version.version().into(),
yanked: Some(version.is_yanked()),
})
.collect();
releases.sort_by(|lhs, rhs| lhs.version.cmp(&rhs.version));

Crate {
name: krate.name().into(),
releases,
}
pub(super) async fn load(config: &Config) -> Result<Crates> {
let registry_index_path = config.registry_index_path.clone();
let registry_url = config
.registry_url
.as_deref()
.unwrap_or("https://github.com/rust-lang/crates.io-index")
.to_owned();

run_blocking("load-crates-index", move || {
debug!("Opening with `crates_index`");
let mut index = crates_index::GitIndex::with_path(
&registry_index_path,
// crates_index requires the repo url to match the existing origin or it tries to reinitialize the repo
&registry_url,
)?;

index.update()?;

let mut result: Crates = index
.crates_parallel()
.map(|krate| {
krate.map(|krate| {
let mut releases: Releases = krate
.versions()
.iter()
.map(|version| Release {
version: version.version().into(),
yanked: Some(version.is_yanked()),
})
.collect();

releases.sort_by(|lhs, rhs| lhs.version.cmp(&rhs.version));

Crate {
name: krate.name().into(),
releases,
}
})
})
})
.collect::<Result<_, _>>()?;
.collect::<Result<_, _>>()?;

result.sort_by(|lhs, rhs| lhs.name.cmp(&rhs.name));
result.sort_by(|lhs, rhs| lhs.name.cmp(&rhs.name));

Ok(result)
Ok(result)
})
.await
}
11 changes: 4 additions & 7 deletions src/utils/consistency/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{Context, db::delete, utils::spawn_blocking};
use crate::{Context, db::delete};
use anyhow::{Context as _, Result};
use itertools::Itertools;
use tracing::{info, warn};
Expand Down Expand Up @@ -32,12 +32,9 @@ pub async fn run_check(ctx: &Context, dry_run: bool) -> Result<()> {
.context("Loading crate data from database for consistency check")?;

tracing::info!("Loading data from index...");
let index_data = spawn_blocking({
let index = ctx.index.clone();
move || index::load(&index)
})
.await
.context("Loading crate data from index for consistency check")?;
let index_data = index::load(&ctx.config)
.await
.context("Loading crate data from index for consistency check")?;

let diff = diff::calculate_diff(db_data.iter(), index_data.iter());
let result = handle_diff(ctx, diff.iter(), dry_run).await?;
Expand Down
8 changes: 2 additions & 6 deletions src/utils/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::future::Future;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use tokio::{runtime, task::spawn_blocking, time::Instant};
use tokio::{runtime, time::Instant};
use tracing::{debug, info};

/// Run the registry watcher
Expand Down Expand Up @@ -41,11 +41,7 @@ pub async fn watch_registry(
}

if last_gc.elapsed().as_secs() >= config.registry_gc_interval {
spawn_blocking({
let index = index.clone();
move || index.run_git_gc()
})
.await?;
index.run_git_gc().await;
last_gc = Instant::now();
}
tokio::time::sleep(config.delay_between_registry_fetches).await;
Expand Down
Loading
Loading