Skip to content

Commit ec2f316

Browse files
committed
new threading
1 parent 04edae0 commit ec2f316

File tree

1 file changed

+44
-38
lines changed

1 file changed

+44
-38
lines changed

src/index.rs

Lines changed: 44 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
1-
use crate::{
2-
error::Result,
3-
utils::{report_error, spawn_blocking},
4-
};
5-
use anyhow::Context;
1+
use crate::{error::Result, utils::report_error};
2+
use anyhow::{Context as _, anyhow};
63
use crates_index_diff::{Change, gix, index::diff::Order};
74
use std::{
85
path::{Path, PathBuf},
96
sync::{Arc, Mutex, atomic::AtomicBool},
7+
thread,
108
};
119
use tokio::process::Command;
1210

@@ -15,7 +13,7 @@ pub struct Index {
1513
path: PathBuf,
1614
repository_url: Option<String>,
1715
// NOTE: we can use a sync mutex here, because we're only locking it
18-
// inside spawn_blocking calls, so the mutex lock won't ever be held
16+
// inside handle_in_thread calls, so the mutex lock won't ever be held
1917
// across await-points.
2018
#[allow(clippy::disallowed_types)]
2119
index: Arc<Mutex<crates_index_diff::Index>>,
@@ -34,20 +32,17 @@ impl Index {
3432
.map(|url| crates_index_diff::index::CloneOptions { url: url.clone() })
3533
.unwrap_or_default();
3634

37-
let index = spawn_blocking({
38-
let path = path.clone();
39-
move || {
40-
Ok(Arc::new(Mutex::new(
41-
#[allow(clippy::disallowed_types)]
42-
crates_index_diff::Index::from_path_or_cloned_with_options(
43-
&path,
44-
gix::progress::Discard,
45-
&AtomicBool::default(),
46-
clone_options,
47-
)
48-
.context("initialising registry index repository")?,
49-
)))
50-
}
35+
let index = handle_in_thread(|| {
36+
Ok(Arc::new(Mutex::new(
37+
#[allow(clippy::disallowed_types)]
38+
crates_index_diff::Index::from_path_or_cloned_with_options(
39+
&path,
40+
gix::progress::Discard,
41+
&AtomicBool::default(),
42+
clone_options,
43+
)
44+
.context("initialising registry index repository")?,
45+
)))
5146
})
5247
.await?;
5348

@@ -80,18 +75,11 @@ impl Index {
8075
&self,
8176
order: Order,
8277
) -> Result<(Vec<Change>, gix::hash::ObjectId)> {
83-
spawn_blocking({
84-
let index = self.index.clone();
85-
move || {
86-
let index = index.lock().unwrap();
87-
index
88-
.peek_changes_with_options(
89-
gix::progress::Discard,
90-
&AtomicBool::default(),
91-
order,
92-
)
93-
.map_err(Into::into)
94-
}
78+
handle_in_thread(|| {
79+
let index = self.index.lock().unwrap();
80+
index
81+
.peek_changes_with_options(gix::progress::Discard, &AtomicBool::default(), order)
82+
.map_err(Into::into)
9583
})
9684
.await
9785
}
@@ -106,12 +94,9 @@ impl Index {
10694
}
10795

10896
pub async fn set_last_seen_reference(&self, to: gix::hash::ObjectId) -> Result<()> {
109-
spawn_blocking({
110-
let index = self.index.clone();
111-
move || {
112-
let index = index.lock().unwrap();
113-
index.set_last_seen_reference(to).map_err(Into::into)
114-
}
97+
handle_in_thread(|| {
98+
let index = self.index.lock().unwrap();
99+
index.set_last_seen_reference(to).map_err(Into::into)
115100
})
116101
.await
117102
}
@@ -126,6 +111,27 @@ impl Index {
126111
}
127112
}
128113

114+
/// Move the execution of a blocking function into a separate, new thread.
115+
///
116+
/// Only for long-running / expensive operations that would block the async runtime or its
117+
/// blocking workerpool.
118+
async fn handle_in_thread<R, F>(f: F) -> Result<R>
119+
where
120+
F: FnOnce() -> Result<R> + Send,
121+
R: Send,
122+
{
123+
thread::scope(|scope| {
124+
let handle = thread::Builder::new()
125+
.name("crates-index-diff".into())
126+
.spawn_scoped(scope, move || f())
127+
.context("couldn't spawn worker thread")?;
128+
129+
handle
130+
.join()
131+
.map_err(|_| anyhow!("failed to join worker thread"))?
132+
})
133+
}
134+
129135
#[cfg(test)]
130136
mod tests {
131137
use super::*;

0 commit comments

Comments
 (0)