refactor: improve code tidiness

Switches to the `cas_path` function when reading
CAS files. Asyncifies IO operations when reusing
package folders.
This commit is contained in:
daimond113 2025-01-18 15:16:36 +01:00
parent 0dfc3ef5bd
commit 941bb79ea6
No known key found for this signature in database
GPG key ID: 3A8ECE51328B513C
2 changed files with 257 additions and 191 deletions

View file

@ -417,32 +417,6 @@ impl Project {
} }
if !force { if !force {
let used_paths = Arc::new(
graph
.iter()
.filter(|(_, node)| !node.node.pkg_ref.is_wally_package())
.map(|(id, node)| {
node.node
.container_folder(id)
.version_folder()
.to_path_buf()
})
.collect::<HashSet<_>>(),
);
#[cfg(feature = "wally-compat")]
let used_wally_paths = Arc::new(
graph
.iter()
.filter(|(_, node)| node.node.pkg_ref.is_wally_package())
.map(|(id, node)| {
node.node
.container_folder(id)
.version_folder()
.to_path_buf()
})
.collect::<HashSet<_>>(),
);
async fn remove_empty_dir(path: &Path) -> std::io::Result<()> { async fn remove_empty_dir(path: &Path) -> std::io::Result<()> {
match fs::remove_dir(path).await { match fs::remove_dir(path).await {
Ok(()) => Ok(()), Ok(()) => Ok(()),
@ -452,58 +426,40 @@ impl Project {
} }
} }
let mut tasks = all_packages_dirs()
.into_iter()
.map(|folder| {
let packages_dir = self.package_dir().join(&folder);
let packages_index_dir = packages_dir.join(PACKAGES_CONTAINER_NAME);
let used_paths = used_paths.clone();
#[cfg(feature = "wally-compat")]
let used_wally_paths = used_wally_paths.clone();
let expected_aliases = graph.iter()
.filter(|(id, _)| manifest.target.kind().packages_folder(id.version_id().target()) == folder)
.filter_map(|(_, node)| node.node.direct.as_ref().map(|(alias, _, _)| alias.clone()))
.collect::<HashSet<_>>();
async move {
let mut index_entries = match fs::read_dir(&packages_index_dir).await {
Ok(entries) => entries,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()),
Err(e) => return Err(e),
};
// we don't handle NotFound here because the upper level will handle it
let mut packages_entries = fs::read_dir(&packages_dir).await?;
let mut tasks = JoinSet::new();
async fn index_entry( async fn index_entry(
entry: fs::DirEntry, entry: fs::DirEntry,
packages_index_dir: &Path, packages_index_dir: &Path,
tasks: &mut JoinSet<std::io::Result<()>>, tasks: &mut JoinSet<std::io::Result<()>>,
used_paths: &HashSet<PathBuf>, used_paths: &Arc<HashSet<PathBuf>>,
#[cfg(feature = "wally-compat")] used_wally_paths: &HashSet<PathBuf>, #[cfg(feature = "wally-compat")] used_wally_paths: &Arc<HashSet<PathBuf>>,
) -> std::io::Result<()> { ) -> std::io::Result<()> {
let path = entry.path(); let path = entry.path();
let path_relative = path.strip_prefix(packages_index_dir).unwrap(); let path_relative = path.strip_prefix(packages_index_dir).unwrap().to_path_buf();
let is_wally = entry.file_name().to_str().expect("non UTF-8 folder name in packages index").contains("@"); let is_wally = entry
.file_name()
.to_str()
.expect("non UTF-8 folder name in packages index")
.contains("@");
if is_wally { if is_wally {
#[cfg(feature = "wally-compat")] #[cfg(feature = "wally-compat")]
if !used_wally_paths.contains(path_relative) { if !used_wally_paths.contains(&path_relative) {
tasks.spawn(async { tasks.spawn(async { fs::remove_dir_all(path).await });
fs::remove_dir_all(path).await
});
} }
#[cfg(not(feature = "wally-compat"))] #[cfg(not(feature = "wally-compat"))]
{ {
tracing::error!("found Wally package in index despite feature being disabled at `{}`", path.display()); tracing::error!(
"found Wally package in index despite feature being disabled at `{}`",
path.display()
);
} }
return Ok(()); return Ok(());
} }
let used_paths = used_paths.clone();
tasks.spawn(async move {
let mut tasks = JoinSet::new(); let mut tasks = JoinSet::new();
let mut entries = fs::read_dir(&path).await?; let mut entries = fs::read_dir(&path).await?;
@ -516,9 +472,7 @@ impl Project {
} }
let path = entry.path(); let path = entry.path();
tasks.spawn(async { tasks.spawn(async { fs::remove_dir_all(path).await });
fs::remove_dir_all(path).await
});
} }
while let Some(task) = tasks.join_next().await { while let Some(task) = tasks.join_next().await {
@ -526,37 +480,106 @@ impl Project {
} }
remove_empty_dir(&path).await remove_empty_dir(&path).await
});
Ok(())
} }
async fn packages_entry( async fn packages_entry(
entry: fs::DirEntry, entry: fs::DirEntry,
tasks: &mut JoinSet<std::io::Result<()>>, tasks: &mut JoinSet<std::io::Result<()>>,
expected_aliases: &HashSet<Alias>, expected_aliases: &Arc<HashSet<Alias>>,
) -> std::io::Result<()> { ) -> std::io::Result<()> {
let expected_aliases = expected_aliases.clone();
tasks.spawn(async move {
if !entry.file_type().await?.is_file() { if !entry.file_type().await?.is_file() {
return Ok(()); return Ok(());
} }
let path = entry.path(); let path = entry.path();
let name= path.file_stem().unwrap().to_str().expect("non UTF-8 file name in packages folder"); let name = path
.file_stem()
.unwrap()
.to_str()
.expect("non UTF-8 file name in packages folder");
let name = name.strip_suffix(".bin").unwrap_or(name); let name = name.strip_suffix(".bin").unwrap_or(name);
let name = match name.parse::<Alias>() { let name = match name.parse::<Alias>() {
Ok(name) => name, Ok(name) => name,
Err(e) => { Err(e) => {
tracing::error!("invalid alias in packages folder: {e}"); tracing::error!("invalid alias in packages folder: {e}");
return Ok(()) return Ok(());
}, }
}; };
if !expected_aliases.contains(&name) { if !expected_aliases.contains(&name) {
tasks.spawn(async { fs::remove_file(path).await?;
fs::remove_file(path).await
});
} }
Ok(())
});
Ok(()) Ok(())
} }
let used_paths = graph
.iter()
.filter(|(_, node)| !node.node.pkg_ref.is_wally_package())
.map(|(id, node)| {
node.node
.container_folder(id)
.version_folder()
.to_path_buf()
})
.collect::<HashSet<_>>();
let used_paths = Arc::new(used_paths);
#[cfg(feature = "wally-compat")]
let used_wally_paths = graph
.iter()
.filter(|(_, node)| node.node.pkg_ref.is_wally_package())
.map(|(id, node)| {
node.node
.container_folder(id)
.version_folder()
.to_path_buf()
})
.collect::<HashSet<_>>();
#[cfg(feature = "wally-compat")]
let used_wally_paths = Arc::new(used_wally_paths);
let mut tasks = all_packages_dirs()
.into_iter()
.map(|folder| {
let packages_dir = self.package_dir().join(&folder);
let packages_index_dir = packages_dir.join(PACKAGES_CONTAINER_NAME);
let used_paths = used_paths.clone();
#[cfg(feature = "wally-compat")]
let used_wally_paths = used_wally_paths.clone();
let expected_aliases = graph
.iter()
.filter(|(id, _)| {
manifest
.target
.kind()
.packages_folder(id.version_id().target())
== folder
})
.filter_map(|(_, node)| {
node.node.direct.as_ref().map(|(alias, _, _)| alias.clone())
})
.collect::<HashSet<_>>();
let expected_aliases = Arc::new(expected_aliases);
async move {
let mut index_entries = match fs::read_dir(&packages_index_dir).await {
Ok(entries) => entries,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()),
Err(e) => return Err(e),
};
// we don't handle NotFound here because the upper level will handle it
let mut packages_entries = fs::read_dir(&packages_dir).await?;
let mut tasks = JoinSet::new();
loop { loop {
tokio::select! { tokio::select! {
Some(entry) = index_entries.next_entry().map(Result::transpose) => { Some(entry) = index_entries.next_entry().map(Result::transpose) => {
@ -577,7 +600,7 @@ impl Project {
).await?; ).await?;
} }
else => break, else => break,
}; }
} }
while let Some(task) = tasks.join_next().await { while let Some(task) = tasks.join_next().await {

View file

@ -117,22 +117,17 @@ pub(crate) async fn store_in_cas<R: tokio::io::AsyncRead + Unpin, P: AsRef<Path>
Ok(hash) Ok(hash)
} }
impl PackageFs { async fn package_fs_cas(
/// Write the package to the given destination entries: BTreeMap<RelativePathBuf, FsEntry>,
#[instrument(skip(self), level = "debug")] destination: &Path,
pub async fn write_to<P: AsRef<Path> + Debug, Q: AsRef<Path> + Debug>( cas_dir_path: &Path,
&self,
destination: P,
cas_path: Q,
link: bool, link: bool,
) -> std::io::Result<()> { ) -> std::io::Result<()> {
match self {
PackageFs::CAS(entries) => {
let mut tasks = entries let mut tasks = entries
.iter() .iter()
.map(|(path, entry)| { .map(|(path, entry)| {
let destination = destination.as_ref().to_path_buf(); let destination = destination.to_path_buf();
let cas_path = cas_path.as_ref().to_path_buf(); let cas_dir_path = cas_dir_path.to_path_buf();
let path = path.to_path(destination); let path = path.to_path(destination);
let entry = entry.clone(); let entry = entry.clone();
@ -143,8 +138,7 @@ impl PackageFs {
fs::create_dir_all(parent).await?; fs::create_dir_all(parent).await?;
} }
let (prefix, rest) = hash.split_at(2); let cas_file_path = cas_path(&hash, &cas_dir_path);
let cas_file_path = cas_path.join(prefix).join(rest);
if link { if link {
fs::hard_link(cas_file_path, path).await?; fs::hard_link(cas_file_path, path).await?;
@ -166,17 +160,32 @@ impl PackageFs {
while let Some(task) = tasks.join_next().await { while let Some(task) = tasks.join_next().await {
task.unwrap()?; task.unwrap()?;
} }
}
PackageFs::Copy(src, target) => {
fs::create_dir_all(destination.as_ref()).await?;
Ok(())
}
async fn package_fs_copy(
src: &Path,
target: TargetKind,
destination: &Path,
) -> std::io::Result<()> {
fs::create_dir_all(destination).await?;
let mut tasks = JoinSet::new();
let mut read_dir = fs::read_dir(src).await?; let mut read_dir = fs::read_dir(src).await?;
'entry: while let Some(entry) = read_dir.next_entry().await? { 'entry: while let Some(entry) = read_dir.next_entry().await? {
let relative_path = let path = entry.path();
RelativePathBuf::from_path(entry.path().strip_prefix(src).unwrap()) let relative_path = path.strip_prefix(src).unwrap();
.unwrap(); let dest_path = destination.join(relative_path);
let dest_path = relative_path.to_path(destination.as_ref()); let file_name = relative_path
let file_name = relative_path.file_name().unwrap(); .file_name()
.unwrap()
.to_str()
.ok_or(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"invalid file name",
))?;
if entry.file_type().await?.is_dir() { if entry.file_type().await?.is_dir() {
if IGNORED_DIRS.contains(&file_name) { if IGNORED_DIRS.contains(&file_name) {
@ -189,10 +198,14 @@ impl PackageFs {
} }
} }
tasks.spawn(async {
#[cfg(windows)] #[cfg(windows)]
fs::symlink_dir(entry.path(), dest_path).await?; let res = fs::symlink_dir(path, dest_path).await;
#[cfg(unix)] #[cfg(unix)]
fs::symlink(entry.path(), dest_path).await?; let res = fs::symlink(path, dest_path).await;
res
});
continue; continue;
} }
@ -200,30 +213,60 @@ impl PackageFs {
continue; continue;
} }
tasks.spawn(async {
#[cfg(windows)] #[cfg(windows)]
fs::symlink_file(entry.path(), dest_path).await?; let res = fs::symlink_file(path, dest_path).await;
#[cfg(unix)] #[cfg(unix)]
fs::symlink(entry.path(), dest_path).await?; let res = fs::symlink(path, dest_path).await;
}
res
});
} }
while let Some(task) = tasks.join_next().await {
task.unwrap()?;
} }
Ok(()) Ok(())
} }
impl PackageFs {
/// Write the package to the given destination
#[instrument(skip(self), level = "debug")]
pub async fn write_to<P: AsRef<Path> + Debug, Q: AsRef<Path> + Debug>(
&self,
destination: P,
cas_path: Q,
link: bool,
) -> std::io::Result<()> {
match self {
PackageFs::CAS(entries) => {
package_fs_cas(
entries.clone(),
destination.as_ref(),
cas_path.as_ref(),
link,
)
.await
}
PackageFs::Copy(src, target) => {
package_fs_copy(src, *target, destination.as_ref()).await
}
}
}
/// Returns the contents of the file with the given hash /// Returns the contents of the file with the given hash
#[instrument(skip(self), ret(level = "trace"), level = "debug")] #[instrument(skip(self), ret(level = "trace"), level = "debug")]
pub async fn read_file<P: AsRef<Path> + Debug, H: AsRef<str> + Debug>( pub async fn read_file<P: AsRef<Path> + Debug, H: AsRef<str> + Debug>(
&self, &self,
file_hash: H, file_hash: H,
cas_path: P, cas_dir_path: P,
) -> Option<String> { ) -> Option<String> {
if !matches!(self, PackageFs::CAS(_)) { if !matches!(self, PackageFs::CAS(_)) {
return None; return None;
} }
let (prefix, rest) = file_hash.as_ref().split_at(2); let cas_file_path = cas_path(file_hash.as_ref(), cas_dir_path.as_ref());
let cas_file_path = cas_path.as_ref().join(prefix).join(rest);
fs::read_to_string(cas_file_path).await.ok() fs::read_to_string(cas_file_path).await.ok()
} }
} }