Skip to content

Commit f5625d2

Browse files
committed
migrate crates-io-index handling to async with spawn_blocking
1 parent de318c5 commit f5625d2

File tree

10 files changed

+197
-109
lines changed

10 files changed

+197
-109
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ sysinfo = { version = "0.37.2", default-features = false, features = ["system"]
6464
derive_builder = "0.20.2"
6565

6666
# Async
67-
tokio = { version = "1.0", features = ["rt-multi-thread", "signal", "macros"] }
67+
tokio = { version = "1.0", features = ["rt-multi-thread", "signal", "macros", "process"] }
6868
tokio-util = { version = "0.7.15", default-features = false, features = ["io"] }
6969
tracing-futures= { version = "0.2.5", features = ["std-future", "futures-03"] }
7070
futures-util = "0.3.5"

clippy.toml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,12 @@
11
[[disallowed-types]]
22
path = "axum::extract::Path"
33
reason = "use our own custom web::extractors::Path for a nicer error response"
4+
5+
[[disallowed-types]]
6+
path = "crates_index_diff::Index"
7+
reason = """
8+
Don't directly use crates_index_diff::Index because it might
9+
lead to issues when used in async contexts.
10+
We have our own wrapper struct (`crate::index::Index`) that is
11+
async, and should be used instead.
12+
"""

src/bin/cratesfyi.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -298,8 +298,7 @@ impl QueueSubcommand {
298298
(Some(reference), false) => reference,
299299
(None, true) => {
300300
println!("Fetching changes to set reference to HEAD");
301-
let (_, oid) = ctx.index.diff()?.peek_changes()?;
302-
oid
301+
ctx.runtime.block_on(ctx.index.latest_commit_reference())?
303302
}
304303
(_, _) => unreachable!(),
305304
};

src/build_queue.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -226,15 +226,13 @@ impl AsyncBuildQueue {
226226
///
227227
/// Returns the number of crates added
228228
pub async fn get_new_crates(&self, index: &Index) -> Result<usize> {
229-
let diff = index.diff()?;
230-
231229
let last_seen_reference = self
232230
.last_seen_reference()
233231
.await?
234232
.context("no last_seen_reference set in database")?;
235-
diff.set_last_seen_reference(last_seen_reference)?;
233+
index.set_last_seen_reference(last_seen_reference).await?;
236234

237-
let (changes, new_reference) = diff.peek_changes_ordered()?;
235+
let (changes, new_reference) = index.peek_changes_ordered().await?;
238236

239237
let mut conn = self.db.get_async().await?;
240238
let mut crates_added = 0;

src/context.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,9 @@ impl Context {
6767

6868
let cdn = Arc::new(CdnBackend::new(&config).await);
6969

70-
let index = Arc::new({
71-
let path = config.registry_index_path.clone();
72-
if let Some(registry_url) = config.registry_url.clone() {
73-
Index::from_url(path, registry_url)
74-
} else {
75-
Index::new(path)
76-
}?
77-
});
70+
let index = Arc::new(
71+
Index::from_url(&config.registry_index_path, config.registry_url.as_deref()).await?,
72+
);
7873

7974
let runtime = runtime::Handle::current();
8075
// sync wrappers around build-queue & storage async resources

src/index.rs

Lines changed: 88 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,86 +1,120 @@
1-
use crate::error::Result;
2-
use crate::utils::report_error;
3-
use anyhow::Context;
4-
use crates_index_diff::gix;
5-
use std::path::PathBuf;
6-
use std::process::Command;
7-
use std::sync::atomic::AtomicBool;
1+
use crate::{
2+
error::Result,
3+
utils::{report_error, run_blocking},
4+
};
5+
use anyhow::Context as _;
6+
use crates_index_diff::{Change, gix, index::diff::Order};
7+
use std::{
8+
path::{Path, PathBuf},
9+
sync::{Arc, Mutex, atomic::AtomicBool},
10+
};
11+
use tokio::process::Command;
812

13+
const THREAD_NAME: &str = "crates-index-diff";
14+
15+
/// async-friendly wrapper around `crates_index_diff::Index`
916
pub struct Index {
1017
path: PathBuf,
1118
repository_url: Option<String>,
19+
// NOTE: we can use a sync mutex here, because we're only locking it
20+
// inside handle_in_thread calls, so the mutex lock won't ever be held
21+
// across await-points.
22+
#[allow(clippy::disallowed_types)]
23+
index: Arc<Mutex<crates_index_diff::Index>>,
1224
}
1325

1426
impl Index {
15-
pub fn from_url(path: PathBuf, url: String) -> Result<Self> {
16-
crates_index_diff::Index::from_path_or_cloned_with_options(
17-
&path,
18-
gix::progress::Discard,
19-
&AtomicBool::default(),
20-
crates_index_diff::index::CloneOptions { url: url.clone() },
21-
)
22-
.map(|_| ())
23-
.context("initialising registry index repository")?;
27+
pub async fn from_url(
28+
path: impl AsRef<Path>,
29+
repository_url: Option<impl AsRef<str>>,
30+
) -> Result<Self> {
31+
let path = path.as_ref().to_path_buf();
32+
let repository_url = repository_url.map(|url| url.as_ref().to_owned());
2433

25-
Ok(Self {
26-
path,
27-
repository_url: Some(url),
34+
let clone_options = repository_url
35+
.as_ref()
36+
.map(|url| crates_index_diff::index::CloneOptions { url: url.clone() })
37+
.unwrap_or_default();
38+
39+
let index = run_blocking(THREAD_NAME, {
40+
let path = path.clone();
41+
move || {
42+
Ok(Arc::new(Mutex::new(
43+
#[allow(clippy::disallowed_types)]
44+
crates_index_diff::Index::from_path_or_cloned_with_options(
45+
&path,
46+
gix::progress::Discard,
47+
&AtomicBool::default(),
48+
clone_options,
49+
)
50+
.context("initialising registry index repository")?,
51+
)))
52+
}
2853
})
29-
}
54+
.await?;
3055

31-
pub fn new(path: PathBuf) -> Result<Self> {
32-
// This initializes the repository, then closes it afterwards to avoid leaking file descriptors.
33-
// See https://github.com/rust-lang/docs.rs/pull/847
34-
crates_index_diff::Index::from_path_or_cloned(&path)
35-
.map(|_| ())
36-
.context("initialising registry index repository")?;
3756
Ok(Self {
57+
index,
3858
path,
39-
repository_url: None,
59+
repository_url,
4060
})
4161
}
4262

43-
pub fn diff(&self) -> Result<crates_index_diff::Index> {
44-
let options = self
45-
.repository_url
46-
.clone()
47-
.map(|url| crates_index_diff::index::CloneOptions { url })
48-
.unwrap_or_default();
49-
let diff = crates_index_diff::Index::from_path_or_cloned_with_options(
50-
&self.path,
51-
gix::progress::Discard,
52-
&AtomicBool::default(),
53-
options,
54-
)
55-
.context("re-opening registry index for diff")?;
56-
Ok(diff)
57-
}
58-
59-
pub(crate) fn crates(&self) -> Result<crates_index::GitIndex> {
60-
tracing::debug!("Opening with `crates_index`");
61-
// crates_index requires the repo url to match the existing origin or it tries to reinitialize the repo
62-
let repo_url = self
63-
.repository_url
64-
.as_deref()
65-
.unwrap_or("https://github.com/rust-lang/crates.io-index");
66-
let mut index = crates_index::GitIndex::with_path(&self.path, repo_url)?;
67-
index.update()?;
68-
Ok(index)
63+
pub async fn new(path: impl AsRef<Path>) -> Result<Self> {
64+
Self::from_url(path, None::<&str>).await
6965
}
7066

71-
pub fn run_git_gc(&self) {
67+
pub async fn run_git_gc(&self) {
7268
let gc = Command::new("git")
7369
.arg("-C")
7470
.arg(&self.path)
7571
.args(["gc", "--auto"])
7672
.output()
73+
.await
7774
.with_context(|| format!("failed to run `git gc --auto`\npath: {:#?}", &self.path));
7875

7976
if let Err(err) = gc {
8077
report_error(&err);
8178
}
8279
}
8380

81+
async fn peek_changes_with_order(
82+
&self,
83+
order: Order,
84+
) -> Result<(Vec<Change>, gix::hash::ObjectId)> {
85+
let index = self.index.clone();
86+
run_blocking(THREAD_NAME, move || {
87+
let index = index.lock().unwrap();
88+
index
89+
.peek_changes_with_options(gix::progress::Discard, &AtomicBool::default(), order)
90+
.map_err(Into::into)
91+
})
92+
.await
93+
}
94+
95+
pub async fn peek_changes(&self) -> Result<(Vec<Change>, gix::hash::ObjectId)> {
96+
self.peek_changes_with_order(Order::ImplementationDefined)
97+
.await
98+
}
99+
100+
pub async fn peek_changes_ordered(&self) -> Result<(Vec<Change>, gix::hash::ObjectId)> {
101+
self.peek_changes_with_order(Order::AsInCratesIndex).await
102+
}
103+
104+
pub async fn set_last_seen_reference(&self, to: gix::hash::ObjectId) -> Result<()> {
105+
let index = self.index.clone();
106+
run_blocking(THREAD_NAME, move || {
107+
let index = index.lock().unwrap();
108+
index.set_last_seen_reference(to).map_err(Into::into)
109+
})
110+
.await
111+
}
112+
113+
pub async fn latest_commit_reference(&self) -> Result<gix::ObjectId> {
114+
let (_, oid) = self.peek_changes().await?;
115+
Ok(oid)
116+
}
117+
84118
pub fn repository_url(&self) -> Option<&str> {
85119
self.repository_url.as_deref()
86120
}

src/utils/consistency/index.rs

Lines changed: 45 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,53 @@
11
use super::data::{Crate, Crates, Release, Releases};
2-
use crate::Index;
2+
use crate::{Config, utils::run_blocking};
33
use anyhow::Result;
44
use rayon::iter::ParallelIterator;
5+
use tracing::debug;
56

6-
pub(super) fn load(index: &Index) -> Result<Crates> {
7-
let mut result: Crates = index
8-
.crates()?
9-
.crates_parallel()
10-
.map(|krate| {
11-
krate.map(|krate| {
12-
let mut releases: Releases = krate
13-
.versions()
14-
.iter()
15-
.map(|version| Release {
16-
version: version.version().into(),
17-
yanked: Some(version.is_yanked()),
18-
})
19-
.collect();
20-
releases.sort_by(|lhs, rhs| lhs.version.cmp(&rhs.version));
21-
22-
Crate {
23-
name: krate.name().into(),
24-
releases,
25-
}
7+
pub(super) async fn load(config: &Config) -> Result<Crates> {
8+
let registry_index_path = config.registry_index_path.clone();
9+
let registry_url = config
10+
.registry_url
11+
.as_deref()
12+
.unwrap_or("https://github.com/rust-lang/crates.io-index")
13+
.to_owned();
14+
15+
run_blocking("load-crates-index", move || {
16+
debug!("Opening with `crates_index`");
17+
let mut index = crates_index::GitIndex::with_path(
18+
&registry_index_path,
19+
// crates_index requires the repo url to match the existing origin or it tries to reinitialize the repo
20+
&registry_url,
21+
)?;
22+
23+
index.update()?;
24+
25+
let mut result: Crates = index
26+
.crates_parallel()
27+
.map(|krate| {
28+
krate.map(|krate| {
29+
let mut releases: Releases = krate
30+
.versions()
31+
.iter()
32+
.map(|version| Release {
33+
version: version.version().into(),
34+
yanked: Some(version.is_yanked()),
35+
})
36+
.collect();
37+
38+
releases.sort_by(|lhs, rhs| lhs.version.cmp(&rhs.version));
39+
40+
Crate {
41+
name: krate.name().into(),
42+
releases,
43+
}
44+
})
2645
})
27-
})
28-
.collect::<Result<_, _>>()?;
46+
.collect::<Result<_, _>>()?;
2947

30-
result.sort_by(|lhs, rhs| lhs.name.cmp(&rhs.name));
48+
result.sort_by(|lhs, rhs| lhs.name.cmp(&rhs.name));
3149

32-
Ok(result)
50+
Ok(result)
51+
})
52+
.await
3353
}

src/utils/consistency/mod.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::{Context, db::delete, utils::spawn_blocking};
1+
use crate::{Context, db::delete};
22
use anyhow::{Context as _, Result};
33
use itertools::Itertools;
44
use tracing::{info, warn};
@@ -32,12 +32,9 @@ pub async fn run_check(ctx: &Context, dry_run: bool) -> Result<()> {
3232
.context("Loading crate data from database for consistency check")?;
3333

3434
tracing::info!("Loading data from index...");
35-
let index_data = spawn_blocking({
36-
let index = ctx.index.clone();
37-
move || index::load(&index)
38-
})
39-
.await
40-
.context("Loading crate data from index for consistency check")?;
35+
let index_data = index::load(&ctx.config)
36+
.await
37+
.context("Loading crate data from index for consistency check")?;
4138

4239
let diff = diff::calculate_diff(db_data.iter(), index_data.iter());
4340
let result = handle_diff(ctx, diff.iter(), dry_run).await?;

src/utils/daemon.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use std::future::Future;
1212
use std::sync::Arc;
1313
use std::thread;
1414
use std::time::Duration;
15-
use tokio::{runtime, task::spawn_blocking, time::Instant};
15+
use tokio::{runtime, time::Instant};
1616
use tracing::{debug, info};
1717

1818
/// Run the registry watcher
@@ -41,11 +41,7 @@ pub async fn watch_registry(
4141
}
4242

4343
if last_gc.elapsed().as_secs() >= config.registry_gc_interval {
44-
spawn_blocking({
45-
let index = index.clone();
46-
move || index.run_git_gc()
47-
})
48-
.await?;
44+
index.run_git_gc().await;
4945
last_gc = Instant::now();
5046
}
5147
tokio::time::sleep(config.delay_between_registry_fetches).await;

0 commit comments

Comments
 (0)