From 941bb79ea66cf667e5951287940de6f1a35021fe Mon Sep 17 00:00:00 2001 From: daimond113 <72147841+daimond113@users.noreply.github.com> Date: Sat, 18 Jan 2025 15:16:36 +0100 Subject: [PATCH] refactor: improve code tidiness Switches to the `cas_path` function when reading CAS files. Asyncifies IO operations when reusing package folders. --- src/download_and_link.rs | 245 +++++++++++++++++++++------------------ src/source/fs.rs | 203 +++++++++++++++++++------------- 2 files changed, 257 insertions(+), 191 deletions(-) diff --git a/src/download_and_link.rs b/src/download_and_link.rs index dfef636..c3dca18 100644 --- a/src/download_and_link.rs +++ b/src/download_and_link.rs @@ -417,32 +417,6 @@ impl Project { } if !force { - let used_paths = Arc::new( - graph - .iter() - .filter(|(_, node)| !node.node.pkg_ref.is_wally_package()) - .map(|(id, node)| { - node.node - .container_folder(id) - .version_folder() - .to_path_buf() - }) - .collect::>(), - ); - #[cfg(feature = "wally-compat")] - let used_wally_paths = Arc::new( - graph - .iter() - .filter(|(_, node)| node.node.pkg_ref.is_wally_package()) - .map(|(id, node)| { - node.node - .container_folder(id) - .version_folder() - .to_path_buf() - }) - .collect::>(), - ); - async fn remove_empty_dir(path: &Path) -> std::io::Result<()> { match fs::remove_dir(path).await { Ok(()) => Ok(()), @@ -452,6 +426,126 @@ impl Project { } } + async fn index_entry( + entry: fs::DirEntry, + packages_index_dir: &Path, + tasks: &mut JoinSet>, + used_paths: &Arc>, + #[cfg(feature = "wally-compat")] used_wally_paths: &Arc>, + ) -> std::io::Result<()> { + let path = entry.path(); + let path_relative = path.strip_prefix(packages_index_dir).unwrap().to_path_buf(); + + let is_wally = entry + .file_name() + .to_str() + .expect("non UTF-8 folder name in packages index") + .contains("@"); + if is_wally { + #[cfg(feature = "wally-compat")] + if !used_wally_paths.contains(&path_relative) { + tasks.spawn(async { fs::remove_dir_all(path).await }); + } + + #[cfg(not(feature = "wally-compat"))] + { + tracing::error!( + "found Wally package in index despite feature being disabled at `{}`", + path.display() + ); + } + + return Ok(()); + } + + let used_paths = used_paths.clone(); + tasks.spawn(async move { + let mut tasks = JoinSet::new(); + + let mut entries = fs::read_dir(&path).await?; + while let Some(entry) = entries.next_entry().await? { + let version = entry.file_name(); + let path_relative = path_relative.join(&version); + + if used_paths.contains(&path_relative) { + continue; + } + + let path = entry.path(); + tasks.spawn(async { fs::remove_dir_all(path).await }); + } + + while let Some(task) = tasks.join_next().await { + task.unwrap()?; + } + + remove_empty_dir(&path).await + }); + + Ok(()) + } + + async fn packages_entry( + entry: fs::DirEntry, + tasks: &mut JoinSet>, + expected_aliases: &Arc>, + ) -> std::io::Result<()> { + let expected_aliases = expected_aliases.clone(); + tasks.spawn(async move { + if !entry.file_type().await?.is_file() { + return Ok(()); + } + + let path = entry.path(); + let name = path + .file_stem() + .unwrap() + .to_str() + .expect("non UTF-8 file name in packages folder"); + let name = name.strip_suffix(".bin").unwrap_or(name); + let name = match name.parse::() { + Ok(name) => name, + Err(e) => { + tracing::error!("invalid alias in packages folder: {e}"); + return Ok(()); + } + }; + + if !expected_aliases.contains(&name) { + fs::remove_file(path).await?; + } + + Ok(()) + }); + + Ok(()) + } + + let used_paths = graph + .iter() + .filter(|(_, node)| !node.node.pkg_ref.is_wally_package()) + .map(|(id, node)| { + node.node + .container_folder(id) + .version_folder() + .to_path_buf() + }) + .collect::>(); + let used_paths = Arc::new(used_paths); + #[cfg(feature = "wally-compat")] + let used_wally_paths = graph + .iter() + .filter(|(_, node)| node.node.pkg_ref.is_wally_package()) + .map(|(id, node)| { + node.node + .container_folder(id) + .version_folder() + .to_path_buf() + }) + .collect::>(); + #[cfg(feature = "wally-compat")] + let used_wally_paths = Arc::new(used_wally_paths); + let mut tasks = all_packages_dirs() .into_iter() .map(|folder| { @@ -461,10 +555,20 @@ impl Project { #[cfg(feature = "wally-compat")] let used_wally_paths = used_wally_paths.clone(); - let expected_aliases = graph.iter() - .filter(|(id, _)| manifest.target.kind().packages_folder(id.version_id().target()) == folder) - .filter_map(|(_, node)| node.node.direct.as_ref().map(|(alias, _, _)| alias.clone())) + let expected_aliases = graph + .iter() + .filter(|(id, _)| { + manifest + .target + .kind() + .packages_folder(id.version_id().target()) + == folder + }) + .filter_map(|(_, node)| { + node.node.direct.as_ref().map(|(alias, _, _)| alias.clone()) + }) .collect::>(); + let expected_aliases = Arc::new(expected_aliases); async move { let mut index_entries = match fs::read_dir(&packages_index_dir).await { @@ -474,89 +578,8 @@ impl Project { }; // we don't handle NotFound here because the upper level will handle it let mut packages_entries = fs::read_dir(&packages_dir).await?; - let mut tasks = JoinSet::new(); - async fn index_entry( - entry: fs::DirEntry, - packages_index_dir: &Path, - tasks: &mut JoinSet>, - used_paths: &HashSet, - #[cfg(feature = "wally-compat")] used_wally_paths: &HashSet, - ) -> std::io::Result<()> { - let path = entry.path(); - let path_relative = path.strip_prefix(packages_index_dir).unwrap(); - - let is_wally = entry.file_name().to_str().expect("non UTF-8 folder name in packages index").contains("@"); - if is_wally { - #[cfg(feature = "wally-compat")] - if !used_wally_paths.contains(path_relative) { - tasks.spawn(async { - fs::remove_dir_all(path).await - }); - } - - #[cfg(not(feature = "wally-compat"))] - { - tracing::error!("found Wally package in index despite feature being disabled at `{}`", path.display()); - } - - return Ok(()); - } - - let mut tasks = JoinSet::new(); - - let mut entries = fs::read_dir(&path).await?; - while let Some(entry) = entries.next_entry().await? { - let version = entry.file_name(); - let path_relative = path_relative.join(&version); - - if used_paths.contains(&path_relative) { - continue; - } - - let path = entry.path(); - tasks.spawn(async { - fs::remove_dir_all(path).await - }); - } - - while let Some(task) = tasks.join_next().await { - task.unwrap()?; - } - - remove_empty_dir(&path).await - } - - async fn packages_entry( - entry: fs::DirEntry, - tasks: &mut JoinSet>, - expected_aliases: &HashSet, - ) -> std::io::Result<()> { - if !entry.file_type().await?.is_file() { - return Ok(()); - } - - let path = entry.path(); - let name= path.file_stem().unwrap().to_str().expect("non UTF-8 file name in packages folder"); - let name = name.strip_suffix(".bin").unwrap_or(name); - let name = match name.parse::() { - Ok(name) => name, - Err(e) => { - tracing::error!("invalid alias in packages folder: {e}"); - return Ok(()) - }, - }; - - if !expected_aliases.contains(&name) { - tasks.spawn(async { - fs::remove_file(path).await - }); - } - - Ok(()) - } - loop { tokio::select! { Some(entry) = index_entries.next_entry().map(Result::transpose) => { @@ -577,7 +600,7 @@ impl Project { ).await?; } else => break, - }; + } } while let Some(task) = tasks.join_next().await { diff --git a/src/source/fs.rs b/src/source/fs.rs index 3d92b2e..5240f64 100644 --- a/src/source/fs.rs +++ b/src/source/fs.rs @@ -117,6 +117,119 @@ pub(crate) async fn store_in_cas Ok(hash) } +async fn package_fs_cas( + entries: BTreeMap, + destination: &Path, + cas_dir_path: &Path, + link: bool, +) -> std::io::Result<()> { + let mut tasks = entries + .iter() + .map(|(path, entry)| { + let destination = destination.to_path_buf(); + let cas_dir_path = cas_dir_path.to_path_buf(); + let path = path.to_path(destination); + let entry = entry.clone(); + + async move { + match entry { + FsEntry::File(hash) => { + if let Some(parent) = path.parent() { + fs::create_dir_all(parent).await?; + } + + let cas_file_path = cas_path(&hash, &cas_dir_path); + + if link { + fs::hard_link(cas_file_path, path).await?; + } else { + fs::copy(cas_file_path, &path).await?; + set_readonly(&path, false).await?; + } + } + FsEntry::Directory => { + fs::create_dir_all(path).await?; + } + } + + Ok::<_, std::io::Error>(()) + } + }) + .collect::>(); + + while let Some(task) = tasks.join_next().await { + task.unwrap()?; + } + + Ok(()) +} + +async fn package_fs_copy( + src: &Path, + target: TargetKind, + destination: &Path, +) -> std::io::Result<()> { + fs::create_dir_all(destination).await?; + + let mut tasks = JoinSet::new(); + let mut read_dir = fs::read_dir(src).await?; + + 'entry: while let Some(entry) = read_dir.next_entry().await? { + let path = entry.path(); + let relative_path = path.strip_prefix(src).unwrap(); + let dest_path = destination.join(relative_path); + let file_name = relative_path + .file_name() + .unwrap() + .to_str() + .ok_or(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "invalid file name", + ))?; + + if entry.file_type().await?.is_dir() { + if IGNORED_DIRS.contains(&file_name) { + continue; + } + + for other_target in TargetKind::VARIANTS { + if target.packages_folder(*other_target) == file_name { + continue 'entry; + } + } + + tasks.spawn(async { + #[cfg(windows)] + let res = fs::symlink_dir(path, dest_path).await; + #[cfg(unix)] + let res = fs::symlink(path, dest_path).await; + + res + }); + continue; + } + + if IGNORED_FILES.contains(&file_name) { + continue; + } + + tasks.spawn(async { + #[cfg(windows)] + let res = fs::symlink_file(path, dest_path).await; + #[cfg(unix)] + let res = fs::symlink(path, dest_path).await; + + res + }); + } + + while let Some(task) = tasks.join_next().await { + task.unwrap()?; + } + + Ok(()) +} + impl PackageFs { /// Write the package to the given destination #[instrument(skip(self), level = "debug")] @@ -128,87 +241,18 @@ impl PackageFs { ) -> std::io::Result<()> { match self { PackageFs::CAS(entries) => { - let mut tasks = entries - .iter() - .map(|(path, entry)| { - let destination = destination.as_ref().to_path_buf(); - let cas_path = cas_path.as_ref().to_path_buf(); - let path = path.to_path(destination); - let entry = entry.clone(); - - async move { - match entry { - FsEntry::File(hash) => { - if let Some(parent) = path.parent() { - fs::create_dir_all(parent).await?; - } - - let (prefix, rest) = hash.split_at(2); - let cas_file_path = cas_path.join(prefix).join(rest); - - if link { - fs::hard_link(cas_file_path, path).await?; - } else { - fs::copy(cas_file_path, &path).await?; - set_readonly(&path, false).await?; - } - } - FsEntry::Directory => { - fs::create_dir_all(path).await?; - } - } - - Ok::<_, std::io::Error>(()) - } - }) - .collect::>(); - - while let Some(task) = tasks.join_next().await { - task.unwrap()?; - } + package_fs_cas( + entries.clone(), + destination.as_ref(), + cas_path.as_ref(), + link, + ) + .await } PackageFs::Copy(src, target) => { - fs::create_dir_all(destination.as_ref()).await?; - - let mut read_dir = fs::read_dir(src).await?; - 'entry: while let Some(entry) = read_dir.next_entry().await? { - let relative_path = - RelativePathBuf::from_path(entry.path().strip_prefix(src).unwrap()) - .unwrap(); - let dest_path = relative_path.to_path(destination.as_ref()); - let file_name = relative_path.file_name().unwrap(); - - if entry.file_type().await?.is_dir() { - if IGNORED_DIRS.contains(&file_name) { - continue; - } - - for other_target in TargetKind::VARIANTS { - if target.packages_folder(*other_target) == file_name { - continue 'entry; - } - } - - #[cfg(windows)] - fs::symlink_dir(entry.path(), dest_path).await?; - #[cfg(unix)] - fs::symlink(entry.path(), dest_path).await?; - continue; - } - - if IGNORED_FILES.contains(&file_name) { - continue; - } - - #[cfg(windows)] - fs::symlink_file(entry.path(), dest_path).await?; - #[cfg(unix)] - fs::symlink(entry.path(), dest_path).await?; - } + package_fs_copy(src, *target, destination.as_ref()).await } } - - Ok(()) } /// Returns the contents of the file with the given hash @@ -216,14 +260,13 @@ impl PackageFs { pub async fn read_file + Debug, H: AsRef + Debug>( &self, file_hash: H, - cas_path: P, + cas_dir_path: P, ) -> Option { if !matches!(self, PackageFs::CAS(_)) { return None; } - let (prefix, rest) = file_hash.as_ref().split_at(2); - let cas_file_path = cas_path.as_ref().join(prefix).join(rest); + let cas_file_path = cas_path(file_hash.as_ref(), cas_dir_path.as_ref()); fs::read_to_string(cas_file_path).await.ok() } }