diff --git a/src/cli/commands/cas/prune.rs b/src/cli/commands/cas/prune.rs index 7410531..448cd0b 100644 --- a/src/cli/commands/cas/prune.rs +++ b/src/cli/commands/cas/prune.rs @@ -1,13 +1,38 @@ +use crate::cli::{ + reporters::run_with_reporter, + style::{INFO_STYLE, SUCCESS_STYLE}, +}; use anyhow::Context; +use async_stream::try_stream; use clap::Args; use fs_err::tokio as fs; -use pesde::Project; -use std::{collections::HashSet, path::Path}; +use futures::{future::BoxFuture, FutureExt, Stream, StreamExt}; +use pesde::{ + source::fs::{FsEntry, PackageFs}, + Project, +}; +use std::{ + collections::{HashMap, HashSet}, + future::Future, + path::{Path, PathBuf}, +}; use tokio::task::JoinSet; #[derive(Debug, Args)] pub struct PruneCommand {} +async fn read_dir_stream( + dir: &Path, +) -> std::io::Result>> { + let mut read_dir = fs::read_dir(dir).await?; + + Ok(try_stream! { + while let Some(entry) = read_dir.next_entry().await? { + yield entry; + } + }) +} + #[allow(unreachable_code)] async fn get_nlinks(path: &Path) -> anyhow::Result { #[cfg(unix)] @@ -67,64 +92,138 @@ async fn get_nlinks(path: &Path) -> anyhow::Result { anyhow::bail!("unsupported platform") } -async fn remove_hashes(cas_dir: &Path) -> anyhow::Result> { - let mut tasks = JoinSet::new(); +#[derive(Debug)] +struct ExtendJoinSet(JoinSet); - let mut cas_entries = fs::read_dir(cas_dir) - .await - .context("failed to read directory")?; - - while let Some(cas_entry) = cas_entries - .next_entry() - .await - .context("failed to read dir entry")? - { - let prefix = cas_entry.file_name(); - let Some(prefix) = prefix.to_str() else { - continue; - }; - // we only want hash directories - if prefix.len() != 2 { - continue; +impl + Send + 'static> Extend for ExtendJoinSet { + fn extend>(&mut self, iter: I) { + for item in iter { + self.0.spawn(item); } + } +} - let prefix = prefix.to_string(); +impl Default for ExtendJoinSet { + fn default() -> Self { + Self(JoinSet::new()) + } +} - tasks.spawn(async move { - let mut hash_entries = fs::read_dir(cas_entry.path()) +async fn discover_cas_packages(cas_dir: &Path) -> anyhow::Result> { + fn read_entry( + entry: fs::DirEntry, + ) -> BoxFuture<'static, anyhow::Result>> { + async move { + if entry + .metadata() .await - .context("failed to read hash directory")?; - - let mut tasks = JoinSet::new(); - - while let Some(hash_entry) = hash_entries - .next_entry() - .await - .context("failed to read hash dir entry")? + .context("failed to read entry metadata")? + .is_dir() { - let hash = hash_entry.file_name(); - let hash = hash.to_str().expect("non-UTF-8 hash").to_string(); - let hash = format!("{prefix}{hash}"); + let mut tasks = read_dir_stream(&entry.path()) + .await + .context("failed to read entry directory")? + .map(|entry| async move { + read_entry(entry.context("failed to read inner cas index dir entry")?).await + }) + .collect::>>() + .await + .0; - let path = hash_entry.path(); - tasks.spawn(async move { - let nlinks = get_nlinks(&path) - .await - .context("failed to count file usage")?; - if nlinks != 1 { - return Ok::<_, anyhow::Error>(None); - } + let mut res = HashMap::new(); + while let Some(entry) = tasks.join_next().await { + res.extend(entry.unwrap()?); + } - fs::remove_file(path) - .await - .context("failed to remove unused file")?; + return Ok(res); + }; - Ok::<_, anyhow::Error>(Some(hash)) - }); + let contents = fs::read_to_string(entry.path()).await?; + let fs = toml::from_str(&contents).context("failed to deserialize PackageFs")?; + + Ok(HashMap::from([(entry.path(), fs)])) + } + .boxed() + } + + let mut tasks = ["index", "wally_index", "git_index"] + .into_iter() + .map(|index| cas_dir.join(index)) + .map(|index| async move { + let mut tasks = read_dir_stream(&index) + .await + .context("failed to read index directory")? + .map(|entry| async move { + read_entry(entry.context("failed to read cas index dir entry")?).await + }) + .collect::>>() + .await + .0; + + let mut res = HashMap::new(); + + while let Some(task) = tasks.join_next().await { + res.extend(task.unwrap()?); } - let mut removed_hashes = HashSet::new(); + Ok(res) + }) + .collect::>>(); + let mut cas_entries = HashMap::new(); + + while let Some(task) = tasks.join_next().await { + cas_entries.extend(task.unwrap()?); + } + + Ok(cas_entries) +} + +async fn remove_hashes(cas_dir: &Path) -> anyhow::Result> { + let mut tasks = read_dir_stream(cas_dir) + .await? + .map(|cas_entry| async move { + let cas_entry = cas_entry.context("failed to read cas dir entry")?; + let prefix = cas_entry.file_name(); + let Some(prefix) = prefix.to_str() else { + return Ok(None); + }; + // we only want hash directories + if prefix.len() != 2 { + return Ok(None); + } + + let mut tasks = read_dir_stream(&cas_entry.path()) + .await + .context("failed to read hash directory")? + .map(|hash_entry| { + let prefix = prefix.to_string(); + async move { + let hash_entry = hash_entry.context("failed to read hash dir entry")?; + let hash = hash_entry.file_name(); + let hash = hash.to_str().expect("non-UTF-8 hash").to_string(); + let hash = format!("{prefix}{hash}"); + + let path = hash_entry.path(); + let nlinks = get_nlinks(&path) + .await + .context("failed to count file usage")?; + if nlinks > 1 { + return Ok(None); + } + + fs::remove_file(path) + .await + .context("failed to remove unused file")?; + + Ok(Some(hash)) + } + }) + .collect::>>() + .await + .0; + + let mut removed_hashes = HashSet::new(); while let Some(removed_hash) = tasks.join_next().await { let Some(hash) = removed_hash.unwrap()? else { continue; @@ -133,14 +232,20 @@ async fn remove_hashes(cas_dir: &Path) -> anyhow::Result> { removed_hashes.insert(hash); } - Ok::<_, anyhow::Error>(removed_hashes) - }); - } + Ok(Some(removed_hashes)) + }) + .collect::>>() + .await + .0; let mut res = HashSet::new(); while let Some(removed_hashes) = tasks.join_next().await { - res.extend(removed_hashes.unwrap()?); + let Some(removed_hashes) = removed_hashes.unwrap()? else { + continue; + }; + + res.extend(removed_hashes); } Ok(res) @@ -154,8 +259,58 @@ impl PruneCommand { // /wally_index/hash/name/version // /git_index/hash/hash // the last thing in the path is the serialized PackageFs - let _ = remove_hashes(project.cas_dir()).await?; - todo!("remove unused index entries"); + let (cas_entries, removed_hashes) = run_with_reporter(|_, root_progress, _| async { + let root_progress = root_progress; + root_progress.reset(); + root_progress.set_message("discover packages"); + let cas_entries = discover_cas_packages(project.cas_dir()).await?; + root_progress.reset(); + root_progress.set_message("remove unused files"); + let removed_hashes = remove_hashes(project.cas_dir()).await?; + + Ok::<_, anyhow::Error>((cas_entries, removed_hashes)) + }) + .await?; + + let mut tasks = JoinSet::new(); + + let mut removed_packages = 0usize; + + 'entry: for (path, fs) in cas_entries { + let PackageFs::CAS(entries) = fs else { + continue; + }; + + for entry in entries.into_values() { + let FsEntry::File(hash) = entry else { + continue; + }; + + if removed_hashes.contains(&hash) { + tasks.spawn(async move { + fs::remove_file(path) + .await + .context("failed to remove unused file") + }); + removed_packages += 1; + // if at least one file is removed, the package is not used + continue 'entry; + } + } + } + + while let Some(task) = tasks.join_next().await { + task.unwrap()?; + } + + println!( + "{} removed {} unused packages and {} individual files!", + SUCCESS_STYLE.apply_to("done!"), + INFO_STYLE.apply_to(removed_packages), + INFO_STYLE.apply_to(removed_hashes.len()) + ); + + Ok(()) } }