Skip to content

Commit 04edae0

Browse files
committed
migrate crates-io-index handling to async with spawn_blocking
1 parent e247adb commit 04edae0

File tree

9 files changed

+172
-106
lines changed

9 files changed

+172
-106
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ sysinfo = { version = "0.37.2", default-features = false, features = ["system"]
6464
derive_builder = "0.20.2"
6565

6666
# Async
67-
tokio = { version = "1.0", features = ["rt-multi-thread", "signal", "macros"] }
67+
tokio = { version = "1.0", features = ["rt-multi-thread", "signal", "macros", "process"] }
6868
tokio-util = { version = "0.7.15", default-features = false, features = ["io"] }
6969
futures-util = "0.3.5"
7070
async-stream = "0.3.5"

clippy.toml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,12 @@
11
[[disallowed-types]]
22
path = "axum::extract::Path"
33
reason = "use our own custom web::extractors::Path for a nicer error response"
4+
5+
[[disallowed-types]]
6+
path = "crates_index_diff::Index"
7+
reason = """
8+
Don't directly use crates_index_diff::Index because it might
9+
lead to issues when used in async contexts.
10+
We have our own wrapper struct (`crate::index::Index`) that is
11+
async, and should be used instead.
12+
"""

src/bin/cratesfyi.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -298,8 +298,8 @@ impl QueueSubcommand {
298298
(Some(reference), false) => reference,
299299
(None, true) => {
300300
println!("Fetching changes to set reference to HEAD");
301-
let (_, oid) = ctx.index.diff()?.peek_changes()?;
302-
oid
301+
ctx.runtime
302+
.block_on(async { ctx.index.latest_commit_reference().await })?
303303
}
304304
(_, _) => unreachable!(),
305305
};

src/build_queue.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -226,15 +226,13 @@ impl AsyncBuildQueue {
226226
///
227227
/// Returns the number of crates added
228228
pub async fn get_new_crates(&self, index: &Index) -> Result<usize> {
229-
let diff = index.diff()?;
230-
231229
let last_seen_reference = self
232230
.last_seen_reference()
233231
.await?
234232
.context("no last_seen_reference set in database")?;
235-
diff.set_last_seen_reference(last_seen_reference)?;
233+
index.set_last_seen_reference(last_seen_reference).await?;
236234

237-
let (changes, new_reference) = diff.peek_changes_ordered()?;
235+
let (changes, new_reference) = index.peek_changes_ordered().await?;
238236

239237
let mut conn = self.db.get_async().await?;
240238
let mut crates_added = 0;

src/context.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,9 @@ impl Context {
6767

6868
let cdn = Arc::new(CdnBackend::new(&config).await);
6969

70-
let index = Arc::new({
71-
let path = config.registry_index_path.clone();
72-
if let Some(registry_url) = config.registry_url.clone() {
73-
Index::from_url(path, registry_url)
74-
} else {
75-
Index::new(path)
76-
}?
77-
});
70+
let index = Arc::new(
71+
Index::from_url(&config.registry_index_path, config.registry_url.as_deref()).await?,
72+
);
7873

7974
let runtime = runtime::Handle::current();
8075
// sync wrappers around build-queue & storage async resources

src/index.rs

Lines changed: 107 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,87 +1,141 @@
1-
use crate::error::Result;
2-
use crate::utils::report_error;
1+
use crate::{
2+
error::Result,
3+
utils::{report_error, spawn_blocking},
4+
};
35
use anyhow::Context;
4-
use crates_index_diff::gix;
5-
use std::path::PathBuf;
6-
use std::process::Command;
7-
use std::sync::atomic::AtomicBool;
6+
use crates_index_diff::{Change, gix, index::diff::Order};
7+
use std::{
8+
path::{Path, PathBuf},
9+
sync::{Arc, Mutex, atomic::AtomicBool},
10+
};
11+
use tokio::process::Command;
812

13+
/// async-friendly wrapper around `crates_index_diff::Index`
914
pub struct Index {
1015
path: PathBuf,
1116
repository_url: Option<String>,
17+
// NOTE: we can use a sync mutex here, because we're only locking it
18+
// inside spawn_blocking calls, so the mutex lock won't ever be held
19+
// across await-points.
20+
#[allow(clippy::disallowed_types)]
21+
index: Arc<Mutex<crates_index_diff::Index>>,
1222
}
1323

1424
impl Index {
15-
pub fn from_url(path: PathBuf, url: String) -> Result<Self> {
16-
crates_index_diff::Index::from_path_or_cloned_with_options(
17-
&path,
18-
gix::progress::Discard,
19-
&AtomicBool::default(),
20-
crates_index_diff::index::CloneOptions { url: url.clone() },
21-
)
22-
.map(|_| ())
23-
.context("initialising registry index repository")?;
25+
pub async fn from_url(
26+
path: impl AsRef<Path>,
27+
repository_url: Option<impl AsRef<str>>,
28+
) -> Result<Self> {
29+
let path = path.as_ref().to_path_buf();
30+
let repository_url = repository_url.map(|url| url.as_ref().to_owned());
2431

25-
Ok(Self {
26-
path,
27-
repository_url: Some(url),
32+
let clone_options = repository_url
33+
.as_ref()
34+
.map(|url| crates_index_diff::index::CloneOptions { url: url.clone() })
35+
.unwrap_or_default();
36+
37+
let index = spawn_blocking({
38+
let path = path.clone();
39+
move || {
40+
Ok(Arc::new(Mutex::new(
41+
#[allow(clippy::disallowed_types)]
42+
crates_index_diff::Index::from_path_or_cloned_with_options(
43+
&path,
44+
gix::progress::Discard,
45+
&AtomicBool::default(),
46+
clone_options,
47+
)
48+
.context("initialising registry index repository")?,
49+
)))
50+
}
2851
})
29-
}
52+
.await?;
3053

31-
pub fn new(path: PathBuf) -> Result<Self> {
32-
// This initializes the repository, then closes it afterwards to avoid leaking file descriptors.
33-
// See https://github.com/rust-lang/docs.rs/pull/847
34-
crates_index_diff::Index::from_path_or_cloned(&path)
35-
.map(|_| ())
36-
.context("initialising registry index repository")?;
3754
Ok(Self {
55+
index,
3856
path,
39-
repository_url: None,
57+
repository_url,
4058
})
4159
}
4260

43-
pub fn diff(&self) -> Result<crates_index_diff::Index> {
44-
let options = self
45-
.repository_url
46-
.clone()
47-
.map(|url| crates_index_diff::index::CloneOptions { url })
48-
.unwrap_or_default();
49-
let diff = crates_index_diff::Index::from_path_or_cloned_with_options(
50-
&self.path,
51-
gix::progress::Discard,
52-
&AtomicBool::default(),
53-
options,
54-
)
55-
.context("re-opening registry index for diff")?;
56-
Ok(diff)
61+
pub async fn new(path: impl AsRef<Path>) -> Result<Self> {
62+
Self::from_url(path, None::<&str>).await
5763
}
5864

59-
pub(crate) fn crates(&self) -> Result<crates_index::GitIndex> {
60-
tracing::debug!("Opening with `crates_index`");
61-
// crates_index requires the repo url to match the existing origin or it tries to reinitialize the repo
62-
let repo_url = self
63-
.repository_url
64-
.as_deref()
65-
.unwrap_or("https://github.com/rust-lang/crates.io-index");
66-
let mut index = crates_index::GitIndex::with_path(&self.path, repo_url)?;
67-
index.update()?;
68-
Ok(index)
69-
}
70-
71-
pub fn run_git_gc(&self) {
65+
pub async fn run_git_gc(&self) {
7266
let gc = Command::new("git")
7367
.arg("-C")
7468
.arg(&self.path)
7569
.args(["gc", "--auto"])
7670
.output()
71+
.await
7772
.with_context(|| format!("failed to run `git gc --auto`\npath: {:#?}", &self.path));
7873

7974
if let Err(err) = gc {
8075
report_error(&err);
8176
}
8277
}
8378

79+
async fn peek_changes_with_order(
80+
&self,
81+
order: Order,
82+
) -> Result<(Vec<Change>, gix::hash::ObjectId)> {
83+
spawn_blocking({
84+
let index = self.index.clone();
85+
move || {
86+
let index = index.lock().unwrap();
87+
index
88+
.peek_changes_with_options(
89+
gix::progress::Discard,
90+
&AtomicBool::default(),
91+
order,
92+
)
93+
.map_err(Into::into)
94+
}
95+
})
96+
.await
97+
}
98+
99+
pub async fn peek_changes(&self) -> Result<(Vec<Change>, gix::hash::ObjectId)> {
100+
self.peek_changes_with_order(Order::ImplementationDefined)
101+
.await
102+
}
103+
104+
pub async fn peek_changes_ordered(&self) -> Result<(Vec<Change>, gix::hash::ObjectId)> {
105+
self.peek_changes_with_order(Order::AsInCratesIndex).await
106+
}
107+
108+
pub async fn set_last_seen_reference(&self, to: gix::hash::ObjectId) -> Result<()> {
109+
spawn_blocking({
110+
let index = self.index.clone();
111+
move || {
112+
let index = index.lock().unwrap();
113+
index.set_last_seen_reference(to).map_err(Into::into)
114+
}
115+
})
116+
.await
117+
}
118+
119+
pub async fn latest_commit_reference(&self) -> Result<gix::ObjectId> {
120+
let (_, oid) = self.peek_changes().await?;
121+
Ok(oid)
122+
}
123+
84124
pub fn repository_url(&self) -> Option<&str> {
85125
self.repository_url.as_deref()
86126
}
87127
}
128+
129+
#[cfg(test)]
130+
mod tests {
131+
use super::*;
132+
133+
// Compile-time check that Index implements Send + Sync.
134+
fn assert_send_sync<T: Send + Sync>() {}
135+
136+
#[test]
137+
fn index_is_send_and_sync() {
138+
// This will fail to compile if `Index` is not `Send` and `Sync`.
139+
assert_send_sync::<Index>();
140+
}
141+
}

src/utils/consistency/index.rs

Lines changed: 42 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,50 @@
11
use super::data::{Crate, Crates, Release, Releases};
2-
use crate::Index;
2+
use crate::{Config, utils::spawn_blocking};
33
use anyhow::Result;
44
use rayon::iter::ParallelIterator;
5+
use tracing::debug;
56

6-
pub(super) fn load(index: &Index) -> Result<Crates> {
7-
let mut result: Crates = index
8-
.crates()?
9-
.crates_parallel()
10-
.map(|krate| {
11-
krate.map(|krate| {
12-
let mut releases: Releases = krate
13-
.versions()
14-
.iter()
15-
.map(|version| Release {
16-
version: version.version().into(),
17-
yanked: Some(version.is_yanked()),
18-
})
19-
.collect();
20-
releases.sort_by(|lhs, rhs| lhs.version.cmp(&rhs.version));
21-
22-
Crate {
23-
name: krate.name().into(),
24-
releases,
25-
}
7+
pub(super) async fn load(config: &Config) -> Result<Crates> {
8+
let path = config.registry_index_path.clone();
9+
10+
// crates_index requires the repo url to match the existing origin or it tries to reinitialize the repo
11+
let repo_url = config
12+
.registry_url
13+
.as_deref()
14+
.unwrap_or("https://github.com/rust-lang/crates.io-index")
15+
.to_owned();
16+
17+
spawn_blocking(move || {
18+
debug!("Opening with `crates_index`");
19+
let mut index = crates_index::GitIndex::with_path(&path, repo_url)?;
20+
index.update()?;
21+
22+
let mut result: Crates = index
23+
.crates_parallel()
24+
.map(|krate| {
25+
krate.map(|krate| {
26+
let mut releases: Releases = krate
27+
.versions()
28+
.iter()
29+
.map(|version| Release {
30+
version: version.version().into(),
31+
yanked: Some(version.is_yanked()),
32+
})
33+
.collect();
34+
35+
releases.sort_by(|lhs, rhs| lhs.version.cmp(&rhs.version));
36+
37+
Crate {
38+
name: krate.name().into(),
39+
releases,
40+
}
41+
})
2642
})
27-
})
28-
.collect::<Result<_, _>>()?;
43+
.collect::<Result<_, _>>()?;
2944

30-
result.sort_by(|lhs, rhs| lhs.name.cmp(&rhs.name));
45+
result.sort_by(|lhs, rhs| lhs.name.cmp(&rhs.name));
3146

32-
Ok(result)
47+
Ok(result)
48+
})
49+
.await
3350
}

src/utils/consistency/mod.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::{Context, db::delete, utils::spawn_blocking};
1+
use crate::{Context, db::delete};
22
use anyhow::{Context as _, Result};
33
use itertools::Itertools;
44
use tracing::{info, warn};
@@ -32,12 +32,9 @@ pub async fn run_check(ctx: &Context, dry_run: bool) -> Result<()> {
3232
.context("Loading crate data from database for consistency check")?;
3333

3434
tracing::info!("Loading data from index...");
35-
let index_data = spawn_blocking({
36-
let index = ctx.index.clone();
37-
move || index::load(&index)
38-
})
39-
.await
40-
.context("Loading crate data from index for consistency check")?;
35+
let index_data = index::load(&ctx.config)
36+
.await
37+
.context("Loading crate data from index for consistency check")?;
4138

4239
let diff = diff::calculate_diff(db_data.iter(), index_data.iter());
4340
let result = handle_diff(ctx, diff.iter(), dry_run).await?;

src/utils/daemon.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use std::future::Future;
1212
use std::sync::Arc;
1313
use std::thread;
1414
use std::time::Duration;
15-
use tokio::{runtime, task::spawn_blocking, time::Instant};
15+
use tokio::{runtime, time::Instant};
1616
use tracing::{debug, info};
1717

1818
/// Run the registry watcher
@@ -41,11 +41,7 @@ pub async fn watch_registry(
4141
}
4242

4343
if last_gc.elapsed().as_secs() >= config.registry_gc_interval {
44-
spawn_blocking({
45-
let index = index.clone();
46-
move || index.run_git_gc()
47-
})
48-
.await?;
44+
index.run_git_gc().await;
4945
last_gc = Instant::now();
5046
}
5147
tokio::time::sleep(config.delay_between_registry_fetches).await;

0 commit comments

Comments
 (0)