feat: cas pruning

Squashed commit of the following:

commit 82b4b858e5
Author: daimond113 <contact@daimond113.com>
Date:   Sat Feb 1 00:46:31 2025 +0100

    feat: remove unused directories when purging cas

    Now purging the CAS will also clean up unused
    folders. Additionally, since concurrent removal
    of directories seems to throw a PermissionDenied
    error on Windows those are ignored. Needs
    investigation on why that happens.

commit 75d6aa5443
Author: daimond113 <contact@daimond113.com>
Date:   Fri Jan 31 23:24:11 2025 +0100

    feat: finish prune command implementation

    The prune command now discovers packages in the
    CAS, removes individual unused files and then
    packages which use those files, since that means
    they're unused.

commit 333eb3bdd9
Author: daimond113 <contact@daimond113.com>
Date:   Sun Jan 26 23:30:52 2025 +0100

    chore: fix clippy lint

commit a38da43670
Author: daimond113 <contact@daimond113.com>
Date:   Sun Jan 26 23:02:52 2025 +0100

    feat: add cas pruning command

    Removes unused files from the CAS. Still needs to
    remove individual package index entries to be
    complete.
This commit is contained in:
daimond113 2025-02-01 00:51:43 +01:00
parent 5cc64f38ec
commit b30f9ecdeb
No known key found for this signature in database
GPG key ID: 640DC95EC1190354
8 changed files with 442 additions and 16 deletions

52
Cargo.lock generated
View file

@ -2502,7 +2502,7 @@ checksum = "f9c7c7c8ac16c798734b8a24560c1362120597c40d5e1459f09498f8f6c8f2ba"
dependencies = [
"cfg-if",
"libc",
"windows",
"windows 0.52.0",
]
[[package]]
@ -2652,7 +2652,7 @@ dependencies = [
"iana-time-zone-haiku",
"js-sys",
"wasm-bindgen",
"windows-core",
"windows-core 0.52.0",
]
[[package]]
@ -3656,6 +3656,7 @@ dependencies = [
"url",
"urlencoding",
"wax",
"windows 0.59.0",
"windows-registry 0.4.0",
]
@ -5669,10 +5670,20 @@ version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be"
dependencies = [
"windows-core",
"windows-core 0.52.0",
"windows-targets 0.52.6",
]
[[package]]
name = "windows"
version = "0.59.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f919aee0a93304be7f62e8e5027811bbba96bcb1de84d6618be56e43f8a32a1"
dependencies = [
"windows-core 0.59.0",
"windows-targets 0.53.0",
]
[[package]]
name = "windows-core"
version = "0.52.0"
@ -5682,6 +5693,41 @@ dependencies = [
"windows-targets 0.52.6",
]
[[package]]
name = "windows-core"
version = "0.59.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "810ce18ed2112484b0d4e15d022e5f598113e220c53e373fb31e67e21670c1ce"
dependencies = [
"windows-implement",
"windows-interface",
"windows-result 0.3.0",
"windows-strings 0.3.0",
"windows-targets 0.53.0",
]
[[package]]
name = "windows-implement"
version = "0.59.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83577b051e2f49a058c308f17f273b570a6a758386fc291b5f6a934dd84e48c1"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.90",
]
[[package]]
name = "windows-interface"
version = "0.59.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb26fd936d991781ea39e87c3a27285081e3c0da5ca0fcbc02d368cc6f52ff01"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.90",
]
[[package]]
name = "windows-registry"
version = "0.2.0"

View file

@ -24,6 +24,7 @@ bin = [
"dep:paste",
"dep:serde_json",
"dep:windows-registry",
"dep:windows",
"gix/worktree-mutation",
"fs-err/expose_original_error",
"tokio/rt",
@ -91,6 +92,7 @@ paste = { version = "1.0.15", optional = true }
[target.'cfg(target_os = "windows")'.dependencies]
windows-registry = { version = "0.4.0", optional = true }
windows = { version = "0.59.0", features = ["Win32_Storage", "Win32_Storage_FileSystem", "Win32_Security"], optional = true }
[workspace]
resolver = "2"

View file

@ -0,0 +1,18 @@
use clap::Subcommand;
use pesde::Project;
mod prune;
#[derive(Debug, Subcommand)]
pub enum CasCommands {
/// Removes unused files from the CAS
Prune(prune::PruneCommand),
}
impl CasCommands {
pub async fn run(self, project: Project) -> anyhow::Result<()> {
match self {
CasCommands::Prune(prune) => prune.run(project).await,
}
}
}

View file

@ -0,0 +1,346 @@
use crate::{
cli::{
reporters::run_with_reporter,
style::{INFO_STYLE, SUCCESS_STYLE},
},
util::remove_empty_dir,
};
use anyhow::Context;
use async_stream::try_stream;
use clap::Args;
use fs_err::tokio as fs;
use futures::{future::BoxFuture, FutureExt, Stream, StreamExt};
use pesde::{
source::fs::{FsEntry, PackageFs},
Project,
};
use std::{
collections::{HashMap, HashSet},
future::Future,
path::{Path, PathBuf},
};
use tokio::task::JoinSet;
#[derive(Debug, Args)]
pub struct PruneCommand {}
async fn read_dir_stream(
dir: &Path,
) -> std::io::Result<impl Stream<Item = std::io::Result<fs::DirEntry>>> {
let mut read_dir = fs::read_dir(dir).await?;
Ok(try_stream! {
while let Some(entry) = read_dir.next_entry().await? {
yield entry;
}
})
}
#[allow(unreachable_code)]
async fn get_nlinks(path: &Path) -> anyhow::Result<u64> {
#[cfg(unix)]
{
use std::os::unix::fs::MetadataExt;
let metadata = fs::metadata(path).await?;
return Ok(metadata.nlink());
}
// life if rust stabilized the nightly feature from 2019
#[cfg(windows)]
{
use std::os::windows::ffi::OsStrExt;
use windows::{
core::PWSTR,
Win32::{
Foundation::CloseHandle,
Storage::FileSystem::{
CreateFileW, GetFileInformationByHandle, FILE_ATTRIBUTE_NORMAL,
FILE_GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING,
},
},
};
let path = path.to_path_buf();
return tokio::task::spawn_blocking(move || unsafe {
let handle = CreateFileW(
PWSTR(
path.as_os_str()
.encode_wide()
.chain(std::iter::once(0))
.collect::<Vec<_>>()
.as_mut_ptr(),
),
FILE_GENERIC_READ.0,
FILE_SHARE_READ,
None,
OPEN_EXISTING,
FILE_ATTRIBUTE_NORMAL,
None,
)?;
let mut info =
windows::Win32::Storage::FileSystem::BY_HANDLE_FILE_INFORMATION::default();
let res = GetFileInformationByHandle(handle, &mut info);
CloseHandle(handle)?;
res?;
Ok(info.nNumberOfLinks as u64)
})
.await
.unwrap();
}
#[cfg(not(any(unix, windows)))]
{
compile_error!("unsupported platform");
}
anyhow::bail!("unsupported platform")
}
#[derive(Debug)]
struct ExtendJoinSet<T: Send + 'static>(JoinSet<T>);
impl<T: Send + 'static, F: Future<Output = T> + Send + 'static> Extend<F> for ExtendJoinSet<T> {
fn extend<I: IntoIterator<Item = F>>(&mut self, iter: I) {
for item in iter {
self.0.spawn(item);
}
}
}
impl<T: Send + 'static> Default for ExtendJoinSet<T> {
fn default() -> Self {
Self(JoinSet::new())
}
}
async fn discover_cas_packages(cas_dir: &Path) -> anyhow::Result<HashMap<PathBuf, PackageFs>> {
fn read_entry(
entry: fs::DirEntry,
) -> BoxFuture<'static, anyhow::Result<HashMap<PathBuf, PackageFs>>> {
async move {
if entry
.metadata()
.await
.context("failed to read entry metadata")?
.is_dir()
{
let mut tasks = read_dir_stream(&entry.path())
.await
.context("failed to read entry directory")?
.map(|entry| async move {
read_entry(entry.context("failed to read inner cas index dir entry")?).await
})
.collect::<ExtendJoinSet<Result<_, anyhow::Error>>>()
.await
.0;
let mut res = HashMap::new();
while let Some(entry) = tasks.join_next().await {
res.extend(entry.unwrap()?);
}
return Ok(res);
};
let contents = fs::read_to_string(entry.path()).await?;
let fs = toml::from_str(&contents).context("failed to deserialize PackageFs")?;
Ok(HashMap::from([(entry.path(), fs)]))
}
.boxed()
}
let mut tasks = ["index", "wally_index", "git_index"]
.into_iter()
.map(|index| cas_dir.join(index))
.map(|index| async move {
let mut res = HashMap::new();
let tasks = match read_dir_stream(&index).await {
Ok(tasks) => tasks,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(res),
Err(e) => return Err(e).context("failed to read cas index directory"),
};
let mut tasks = tasks
.map(|entry| async move {
read_entry(entry.context("failed to read cas index dir entry")?).await
})
.collect::<ExtendJoinSet<Result<_, anyhow::Error>>>()
.await
.0;
while let Some(task) = tasks.join_next().await {
res.extend(task.unwrap()?);
}
Ok(res)
})
.collect::<JoinSet<Result<_, anyhow::Error>>>();
let mut cas_entries = HashMap::new();
while let Some(task) = tasks.join_next().await {
cas_entries.extend(task.unwrap()?);
}
Ok(cas_entries)
}
async fn remove_hashes(cas_dir: &Path) -> anyhow::Result<HashSet<String>> {
let mut res = HashSet::new();
let tasks = match read_dir_stream(cas_dir).await {
Ok(tasks) => tasks,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(res),
Err(e) => return Err(e).context("failed to read cas directory"),
};
let mut tasks = tasks
.map(|cas_entry| async move {
let cas_entry = cas_entry.context("failed to read cas dir entry")?;
let prefix = cas_entry.file_name();
let Some(prefix) = prefix.to_str() else {
return Ok(None);
};
// we only want hash directories
if prefix.len() != 2 {
return Ok(None);
}
let mut tasks = read_dir_stream(&cas_entry.path())
.await
.context("failed to read hash directory")?
.map(|hash_entry| {
let prefix = prefix.to_string();
async move {
let hash_entry = hash_entry.context("failed to read hash dir entry")?;
let hash = hash_entry.file_name();
let hash = hash.to_str().expect("non-UTF-8 hash").to_string();
let hash = format!("{prefix}{hash}");
let path = hash_entry.path();
let nlinks = get_nlinks(&path)
.await
.context("failed to count file usage")?;
if nlinks > 1 {
return Ok(None);
}
fs::remove_file(&path)
.await
.context("failed to remove unused file")?;
if let Some(parent) = path.parent() {
remove_empty_dir(parent).await?;
}
Ok(Some(hash))
}
})
.collect::<ExtendJoinSet<Result<_, anyhow::Error>>>()
.await
.0;
let mut removed_hashes = HashSet::new();
while let Some(removed_hash) = tasks.join_next().await {
let Some(hash) = removed_hash.unwrap()? else {
continue;
};
removed_hashes.insert(hash);
}
Ok(Some(removed_hashes))
})
.collect::<ExtendJoinSet<Result<_, anyhow::Error>>>()
.await
.0;
while let Some(removed_hashes) = tasks.join_next().await {
let Some(removed_hashes) = removed_hashes.unwrap()? else {
continue;
};
res.extend(removed_hashes);
}
Ok(res)
}
impl PruneCommand {
pub async fn run(self, project: Project) -> anyhow::Result<()> {
// CAS structure:
// /2 first chars of hash/rest of hash
// /index/hash/name/version/target
// /wally_index/hash/name/version
// /git_index/hash/hash
// the last thing in the path is the serialized PackageFs
let (cas_entries, removed_hashes) = run_with_reporter(|_, root_progress, _| async {
let root_progress = root_progress;
root_progress.reset();
root_progress.set_message("discover packages");
let cas_entries = discover_cas_packages(project.cas_dir()).await?;
root_progress.reset();
root_progress.set_message("remove unused files");
let removed_hashes = remove_hashes(project.cas_dir()).await?;
Ok::<_, anyhow::Error>((cas_entries, removed_hashes))
})
.await?;
let mut tasks = JoinSet::new();
let mut removed_packages = 0usize;
'entry: for (path, fs) in cas_entries {
let PackageFs::CAS(entries) = fs else {
continue;
};
for entry in entries.into_values() {
let FsEntry::File(hash) = entry else {
continue;
};
if removed_hashes.contains(&hash) {
let cas_dir = project.cas_dir().to_path_buf();
tasks.spawn(async move {
fs::remove_file(dbg!(&path))
.await
.context("failed to remove unused file")?;
// remove empty directories up to the cas dir
let mut path = &*path;
while let Some(parent) = path.parent() {
if parent == cas_dir {
break;
}
remove_empty_dir(parent).await?;
path = parent;
}
Ok::<_, anyhow::Error>(())
});
removed_packages += 1;
// if at least one file is removed, the package is not used
continue 'entry;
}
}
}
while let Some(task) = tasks.join_next().await {
task.unwrap()?;
}
println!(
"{} removed {} unused packages and {} individual files!",
SUCCESS_STYLE.apply_to("done!"),
INFO_STYLE.apply_to(removed_packages),
INFO_STYLE.apply_to(removed_hashes.len())
);
Ok(())
}
}

View file

@ -2,6 +2,7 @@ use pesde::Project;
mod add;
mod auth;
mod cas;
mod config;
mod deprecate;
mod execute;
@ -30,6 +31,10 @@ pub enum Subcommand {
#[command(subcommand)]
Config(config::ConfigCommands),
/// CAS-related commands
#[command(subcommand)]
Cas(cas::CasCommands),
/// Initializes a manifest file in the current directory
Init(init::InitCommand),
@ -83,6 +88,7 @@ impl Subcommand {
match self {
Subcommand::Auth(auth) => auth.run(project, reqwest).await,
Subcommand::Config(config) => config.run().await,
Subcommand::Cas(cas) => cas.run(project).await,
Subcommand::Init(init) => init.run(project).await,
Subcommand::Run(run) => run.run(project).await,
Subcommand::Install(install) => install.run(project, reqwest).await,

View file

@ -297,7 +297,7 @@ pub fn display_err(result: anyhow::Result<()>, prefix: &str) {
if !cause.is_empty() {
eprintln!("{}:", ERROR_STYLE.apply_to("caused by"));
for err in cause {
eprintln!("\t- {}", ERROR_STYLE.apply_to(err));
eprintln!("\t- {err}");
}
}

View file

@ -1,6 +1,6 @@
use crate::{
all_packages_dirs, graph::DependencyGraphWithTarget, manifest::Alias, Project,
PACKAGES_CONTAINER_NAME, SCRIPTS_LINK_FOLDER,
all_packages_dirs, graph::DependencyGraphWithTarget, manifest::Alias, util::remove_empty_dir,
Project, PACKAGES_CONTAINER_NAME, SCRIPTS_LINK_FOLDER,
};
use fs_err::tokio as fs;
use futures::FutureExt;
@ -11,15 +11,6 @@ use std::{
};
use tokio::task::JoinSet;
async fn remove_empty_dir(path: &Path) -> std::io::Result<()> {
match fs::remove_dir(path).await {
Ok(()) => Ok(()),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(e) if e.kind() == std::io::ErrorKind::DirectoryNotEmpty => Ok(()),
Err(e) => Err(e),
}
}
fn index_entry(
entry: fs::DirEntry,
packages_index_dir: &Path,

View file

@ -1,9 +1,13 @@
use crate::AuthConfig;
use fs_err::tokio as fs;
use gix::bstr::BStr;
use semver::Version;
use serde::{Deserialize, Deserializer, Serializer};
use sha2::{Digest, Sha256};
use std::collections::{BTreeMap, HashSet};
use std::{
collections::{BTreeMap, HashSet},
path::Path,
};
pub fn authenticate_conn(
conn: &mut gix::remote::Connection<
@ -95,3 +99,16 @@ pub fn no_build_metadata(version: &Version) -> Version {
version.build = semver::BuildMetadata::EMPTY;
version
}
pub async fn remove_empty_dir(path: &Path) -> std::io::Result<()> {
match fs::remove_dir(path).await {
Ok(()) => Ok(()),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(e) if e.kind() == std::io::ErrorKind::DirectoryNotEmpty => Ok(()),
// concurrent removal on Windows seems to fail with PermissionDenied
// TODO: investigate why this happens and whether we can avoid it without ignoring all PermissionDenied errors
#[cfg(windows)]
Err(e) if e.kind() == std::io::ErrorKind::PermissionDenied => Ok(()),
Err(e) => Err(e),
}
}