Skip to content

Commit 067c67b

Browse files
committed
migrate crates-io-index handling to async with spawn_blocking
1 parent de318c5 commit 067c67b

File tree

10 files changed

+181
-108
lines changed

10 files changed

+181
-108
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ sysinfo = { version = "0.37.2", default-features = false, features = ["system"]
6464
derive_builder = "0.20.2"
6565

6666
# Async
67-
tokio = { version = "1.0", features = ["rt-multi-thread", "signal", "macros"] }
67+
tokio = { version = "1.0", features = ["rt-multi-thread", "signal", "macros", "process"] }
6868
tokio-util = { version = "0.7.15", default-features = false, features = ["io"] }
6969
tracing-futures= { version = "0.2.5", features = ["std-future", "futures-03"] }
7070
futures-util = "0.3.5"

clippy.toml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,12 @@
11
[[disallowed-types]]
22
path = "axum::extract::Path"
33
reason = "use our own custom web::extractors::Path for a nicer error response"
4+
5+
[[disallowed-types]]
6+
path = "crates_index_diff::Index"
7+
reason = """
8+
Don't directly use crates_index_diff::Index because it might
9+
lead to issues when used in async contexts.
10+
We have our own wrapper struct (`crate::index::Index`) that is
11+
async, and should be used instead.
12+
"""

src/bin/cratesfyi.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -298,8 +298,7 @@ impl QueueSubcommand {
298298
(Some(reference), false) => reference,
299299
(None, true) => {
300300
println!("Fetching changes to set reference to HEAD");
301-
let (_, oid) = ctx.index.diff()?.peek_changes()?;
302-
oid
301+
ctx.runtime.block_on( ctx.index.latest_commit_reference())?
303302
}
304303
(_, _) => unreachable!(),
305304
};

src/build_queue.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -226,15 +226,13 @@ impl AsyncBuildQueue {
226226
///
227227
/// Returns the number of crates added
228228
pub async fn get_new_crates(&self, index: &Index) -> Result<usize> {
229-
let diff = index.diff()?;
230-
231229
let last_seen_reference = self
232230
.last_seen_reference()
233231
.await?
234232
.context("no last_seen_reference set in database")?;
235-
diff.set_last_seen_reference(last_seen_reference)?;
233+
index.set_last_seen_reference(last_seen_reference).await?;
236234

237-
let (changes, new_reference) = diff.peek_changes_ordered()?;
235+
let (changes, new_reference) = index.peek_changes_ordered().await?;
238236

239237
let mut conn = self.db.get_async().await?;
240238
let mut crates_added = 0;

src/context.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,9 @@ impl Context {
6767

6868
let cdn = Arc::new(CdnBackend::new(&config).await);
6969

70-
let index = Arc::new({
71-
let path = config.registry_index_path.clone();
72-
if let Some(registry_url) = config.registry_url.clone() {
73-
Index::from_url(path, registry_url)
74-
} else {
75-
Index::new(path)
76-
}?
77-
});
70+
let index = Arc::new(
71+
Index::from_url(&config.registry_index_path, config.registry_url.as_deref()).await?,
72+
);
7873

7974
let runtime = runtime::Handle::current();
8075
// sync wrappers around build-queue & storage async resources

src/index.rs

Lines changed: 83 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,86 +1,115 @@
1-
use crate::error::Result;
2-
use crate::utils::report_error;
3-
use anyhow::Context;
4-
use crates_index_diff::gix;
5-
use std::path::PathBuf;
6-
use std::process::Command;
7-
use std::sync::atomic::AtomicBool;
1+
use crate::{
2+
error::Result,
3+
utils::{report_error, run_blocking_scoped},
4+
};
5+
use anyhow::Context as _;
6+
use crates_index_diff::{Change, gix, index::diff::Order};
7+
use std::{
8+
path::{Path, PathBuf},
9+
sync::{Arc, Mutex, atomic::AtomicBool},
10+
};
11+
use tokio::process::Command;
812

13+
const THREAD_NAME: &str = "crates-index-diff";
14+
15+
/// async-friendly wrapper around `crates_index_diff::Index`
916
pub struct Index {
1017
path: PathBuf,
1118
repository_url: Option<String>,
19+
// NOTE: we can use a sync mutex here, because we're only locking it
20+
// inside handle_in_thread calls, so the mutex lock won't ever be held
21+
// across await-points.
22+
#[allow(clippy::disallowed_types)]
23+
index: Arc<Mutex<crates_index_diff::Index>>,
1224
}
1325

1426
impl Index {
15-
pub fn from_url(path: PathBuf, url: String) -> Result<Self> {
16-
crates_index_diff::Index::from_path_or_cloned_with_options(
17-
&path,
18-
gix::progress::Discard,
19-
&AtomicBool::default(),
20-
crates_index_diff::index::CloneOptions { url: url.clone() },
21-
)
22-
.map(|_| ())
23-
.context("initialising registry index repository")?;
27+
pub async fn from_url(
28+
path: impl AsRef<Path>,
29+
repository_url: Option<impl AsRef<str>>,
30+
) -> Result<Self> {
31+
let path = path.as_ref().to_path_buf();
32+
let repository_url = repository_url.map(|url| url.as_ref().to_owned());
2433

25-
Ok(Self {
26-
path,
27-
repository_url: Some(url),
34+
let clone_options = repository_url
35+
.as_ref()
36+
.map(|url| crates_index_diff::index::CloneOptions { url: url.clone() })
37+
.unwrap_or_default();
38+
39+
let index = run_blocking_scoped(THREAD_NAME, || {
40+
Ok(Arc::new(Mutex::new(
41+
#[allow(clippy::disallowed_types)]
42+
crates_index_diff::Index::from_path_or_cloned_with_options(
43+
&path,
44+
gix::progress::Discard,
45+
&AtomicBool::default(),
46+
clone_options,
47+
)
48+
.context("initialising registry index repository")?,
49+
)))
2850
})
29-
}
51+
.await?;
3052

31-
pub fn new(path: PathBuf) -> Result<Self> {
32-
// This initializes the repository, then closes it afterwards to avoid leaking file descriptors.
33-
// See https://github.com/rust-lang/docs.rs/pull/847
34-
crates_index_diff::Index::from_path_or_cloned(&path)
35-
.map(|_| ())
36-
.context("initialising registry index repository")?;
3753
Ok(Self {
54+
index,
3855
path,
39-
repository_url: None,
56+
repository_url,
4057
})
4158
}
4259

43-
pub fn diff(&self) -> Result<crates_index_diff::Index> {
44-
let options = self
45-
.repository_url
46-
.clone()
47-
.map(|url| crates_index_diff::index::CloneOptions { url })
48-
.unwrap_or_default();
49-
let diff = crates_index_diff::Index::from_path_or_cloned_with_options(
50-
&self.path,
51-
gix::progress::Discard,
52-
&AtomicBool::default(),
53-
options,
54-
)
55-
.context("re-opening registry index for diff")?;
56-
Ok(diff)
57-
}
58-
59-
pub(crate) fn crates(&self) -> Result<crates_index::GitIndex> {
60-
tracing::debug!("Opening with `crates_index`");
61-
// crates_index requires the repo url to match the existing origin or it tries to reinitialize the repo
62-
let repo_url = self
63-
.repository_url
64-
.as_deref()
65-
.unwrap_or("https://github.com/rust-lang/crates.io-index");
66-
let mut index = crates_index::GitIndex::with_path(&self.path, repo_url)?;
67-
index.update()?;
68-
Ok(index)
60+
pub async fn new(path: impl AsRef<Path>) -> Result<Self> {
61+
Self::from_url(path, None::<&str>).await
6962
}
7063

71-
pub fn run_git_gc(&self) {
64+
pub async fn run_git_gc(&self) {
7265
let gc = Command::new("git")
7366
.arg("-C")
7467
.arg(&self.path)
7568
.args(["gc", "--auto"])
7669
.output()
70+
.await
7771
.with_context(|| format!("failed to run `git gc --auto`\npath: {:#?}", &self.path));
7872

7973
if let Err(err) = gc {
8074
report_error(&err);
8175
}
8276
}
8377

78+
async fn peek_changes_with_order(
79+
&self,
80+
order: Order,
81+
) -> Result<(Vec<Change>, gix::hash::ObjectId)> {
82+
run_blocking_scoped(THREAD_NAME, || {
83+
let index = self.index.lock().unwrap();
84+
index
85+
.peek_changes_with_options(gix::progress::Discard, &AtomicBool::default(), order)
86+
.map_err(Into::into)
87+
})
88+
.await
89+
}
90+
91+
pub async fn peek_changes(&self) -> Result<(Vec<Change>, gix::hash::ObjectId)> {
92+
self.peek_changes_with_order(Order::ImplementationDefined)
93+
.await
94+
}
95+
96+
pub async fn peek_changes_ordered(&self) -> Result<(Vec<Change>, gix::hash::ObjectId)> {
97+
self.peek_changes_with_order(Order::AsInCratesIndex).await
98+
}
99+
100+
pub async fn set_last_seen_reference(&self, to: gix::hash::ObjectId) -> Result<()> {
101+
run_blocking_scoped(THREAD_NAME, || {
102+
let index = self.index.lock().unwrap();
103+
index.set_last_seen_reference(to).map_err(Into::into)
104+
})
105+
.await
106+
}
107+
108+
pub async fn latest_commit_reference(&self) -> Result<gix::ObjectId> {
109+
let (_, oid) = self.peek_changes().await?;
110+
Ok(oid)
111+
}
112+
84113
pub fn repository_url(&self) -> Option<&str> {
85114
self.repository_url.as_deref()
86115
}

src/utils/consistency/index.rs

Lines changed: 42 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,50 @@
11
use super::data::{Crate, Crates, Release, Releases};
2-
use crate::Index;
2+
use crate::{Config, utils::spawn_blocking};
33
use anyhow::Result;
44
use rayon::iter::ParallelIterator;
5+
use tracing::debug;
56

6-
pub(super) fn load(index: &Index) -> Result<Crates> {
7-
let mut result: Crates = index
8-
.crates()?
9-
.crates_parallel()
10-
.map(|krate| {
11-
krate.map(|krate| {
12-
let mut releases: Releases = krate
13-
.versions()
14-
.iter()
15-
.map(|version| Release {
16-
version: version.version().into(),
17-
yanked: Some(version.is_yanked()),
18-
})
19-
.collect();
20-
releases.sort_by(|lhs, rhs| lhs.version.cmp(&rhs.version));
21-
22-
Crate {
23-
name: krate.name().into(),
24-
releases,
25-
}
7+
pub(super) async fn load(config: &Config) -> Result<Crates> {
8+
let path = config.registry_index_path.clone();
9+
10+
// crates_index requires the repo url to match the existing origin or it tries to reinitialize the repo
11+
let repo_url = config
12+
.registry_url
13+
.as_deref()
14+
.unwrap_or("https://github.com/rust-lang/crates.io-index")
15+
.to_owned();
16+
17+
spawn_blocking(move || {
18+
debug!("Opening with `crates_index`");
19+
let mut index = crates_index::GitIndex::with_path(&path, repo_url)?;
20+
index.update()?;
21+
22+
let mut result: Crates = index
23+
.crates_parallel()
24+
.map(|krate| {
25+
krate.map(|krate| {
26+
let mut releases: Releases = krate
27+
.versions()
28+
.iter()
29+
.map(|version| Release {
30+
version: version.version().into(),
31+
yanked: Some(version.is_yanked()),
32+
})
33+
.collect();
34+
35+
releases.sort_by(|lhs, rhs| lhs.version.cmp(&rhs.version));
36+
37+
Crate {
38+
name: krate.name().into(),
39+
releases,
40+
}
41+
})
2642
})
27-
})
28-
.collect::<Result<_, _>>()?;
43+
.collect::<Result<_, _>>()?;
2944

30-
result.sort_by(|lhs, rhs| lhs.name.cmp(&rhs.name));
45+
result.sort_by(|lhs, rhs| lhs.name.cmp(&rhs.name));
3146

32-
Ok(result)
47+
Ok(result)
48+
})
49+
.await
3350
}

src/utils/consistency/mod.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::{Context, db::delete, utils::spawn_blocking};
1+
use crate::{Context, db::delete};
22
use anyhow::{Context as _, Result};
33
use itertools::Itertools;
44
use tracing::{info, warn};
@@ -32,12 +32,9 @@ pub async fn run_check(ctx: &Context, dry_run: bool) -> Result<()> {
3232
.context("Loading crate data from database for consistency check")?;
3333

3434
tracing::info!("Loading data from index...");
35-
let index_data = spawn_blocking({
36-
let index = ctx.index.clone();
37-
move || index::load(&index)
38-
})
39-
.await
40-
.context("Loading crate data from index for consistency check")?;
35+
let index_data = index::load(&ctx.config)
36+
.await
37+
.context("Loading crate data from index for consistency check")?;
4138

4239
let diff = diff::calculate_diff(db_data.iter(), index_data.iter());
4340
let result = handle_diff(ctx, diff.iter(), dry_run).await?;

src/utils/daemon.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use std::future::Future;
1212
use std::sync::Arc;
1313
use std::thread;
1414
use std::time::Duration;
15-
use tokio::{runtime, task::spawn_blocking, time::Instant};
15+
use tokio::{runtime, time::Instant};
1616
use tracing::{debug, info};
1717

1818
/// Run the registry watcher
@@ -41,11 +41,7 @@ pub async fn watch_registry(
4141
}
4242

4343
if last_gc.elapsed().as_secs() >= config.registry_gc_interval {
44-
spawn_blocking({
45-
let index = index.clone();
46-
move || index.run_git_gc()
47-
})
48-
.await?;
44+
index.run_git_gc().await;
4945
last_gc = Instant::now();
5046
}
5147
tokio::time::sleep(config.delay_between_registry_fetches).await;

0 commit comments

Comments
 (0)