diff --git a/Cargo.lock b/Cargo.lock index 93c5a95..8ec495c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -385,9 +385,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.91" +version = "1.0.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c042108f3ed77fd83760a5fd79b53be043192bb3b9dba91d8c574c0ada7850c8" +checksum = "4c95c10ba0b00a02636238b814946408b1322d5ac4760326e6fb8ec956d85775" [[package]] name = "arc-swap" @@ -3596,6 +3596,7 @@ dependencies = [ "actix-multipart", "actix-web", "async-compression", + "async-stream", "chrono", "constant_time_eq", "convert_case 0.6.0", diff --git a/docs/src/content/docs/reference/cli.mdx b/docs/src/content/docs/reference/cli.mdx index 7748cc8..ac516c5 100644 --- a/docs/src/content/docs/reference/cli.mdx +++ b/docs/src/content/docs/reference/cli.mdx @@ -96,7 +96,6 @@ pesde run foo -- --arg1 --arg2 Installs dependencies for the current project. -- `-t, --threads`: The number of threads to use for downloading dependencies. - `--locked`: Whether to error if the lockfile is out of date. - `--prod`: Whether to skip installing dev dependencies. @@ -171,8 +170,6 @@ pesde add https://git.acme.local/package.git#aeff6 Updates the dependencies of the current project. -- `-t, --threads`: The number of threads to use for downloading dependencies. - ## `pesde x` Runs a one-off binary package. diff --git a/registry/Cargo.toml b/registry/Cargo.toml index d0f81c4..f1fa0b2 100644 --- a/registry/Cargo.toml +++ b/registry/Cargo.toml @@ -20,6 +20,7 @@ futures = "0.3.31" tokio = "1.41.0" tempfile = "3.13.0" fs-err = { version = "3.0.0", features = ["tokio"] } +async-stream = "0.3.6" git2 = "0.19.0" gix = { version = "0.67.0", default-features = false, features = [ diff --git a/registry/src/endpoints/package_version.rs b/registry/src/endpoints/package_version.rs index 685a749..825ac12 100644 --- a/registry/src/endpoints/package_version.rs +++ b/registry/src/endpoints/package_version.rs @@ -7,7 +7,7 @@ use pesde::{ manifest::target::TargetKind, names::PackageName, source::{ - git_index::GitBasedSource, + git_index::{read_file, root_tree, GitBasedSource}, pesde::{DocEntryKind, IndexFile}, }, }; @@ -73,8 +73,10 @@ pub async fn get_package_version( let entries: IndexFile = { let source = app_state.source.lock().await; + let repo = gix::open(source.path(&app_state.project))?; + let tree = root_tree(&repo)?; - match source.read_file([scope, name_part], &app_state.project, None)? { + match read_file(&tree, [scope, name_part])? { Some(versions) => toml::de::from_str(&versions)?, None => return Ok(HttpResponse::NotFound().finish()), } diff --git a/registry/src/endpoints/package_versions.rs b/registry/src/endpoints/package_versions.rs index ef39f93..4226807 100644 --- a/registry/src/endpoints/package_versions.rs +++ b/registry/src/endpoints/package_versions.rs @@ -2,13 +2,15 @@ use std::collections::{BTreeMap, BTreeSet}; use actix_web::{web, HttpResponse, Responder}; +use crate::{error::Error, package::PackageResponse, AppState}; use pesde::{ names::PackageName, - source::{git_index::GitBasedSource, pesde::IndexFile}, + source::{ + git_index::{read_file, root_tree, GitBasedSource}, + pesde::IndexFile, + }, }; -use crate::{error::Error, package::PackageResponse, AppState}; - pub async fn get_package_versions( app_state: web::Data, path: web::Path, @@ -17,12 +19,16 @@ pub async fn get_package_versions( let (scope, name_part) = name.as_str(); - let source = app_state.source.lock().await; - let versions: IndexFile = - match source.read_file([scope, name_part], &app_state.project, None)? { + let versions: IndexFile = { + let source = app_state.source.lock().await; + let repo = gix::open(source.path(&app_state.project))?; + let tree = root_tree(&repo)?; + + match read_file(&tree, [scope, name_part])? { Some(versions) => toml::de::from_str(&versions)?, None => return Ok(HttpResponse::NotFound().finish()), - }; + } + }; let mut responses = BTreeMap::new(); diff --git a/registry/src/endpoints/publish_version.rs b/registry/src/endpoints/publish_version.rs index 5c9e446..ddc3b29 100644 --- a/registry/src/endpoints/publish_version.rs +++ b/registry/src/endpoints/publish_version.rs @@ -16,7 +16,7 @@ use git2::{Remote, Repository, Signature}; use pesde::{ manifest::Manifest, source::{ - git_index::GitBasedSource, + git_index::{read_file, root_tree, GitBasedSource}, pesde::{DocEntry, DocEntryKind, IndexFile, IndexFileEntry, ScopeInfo, SCOPE_INFO_FILE}, specifiers::DependencySpecifiers, version_id::VersionId, @@ -73,7 +73,7 @@ pub async fn publish_package( ) -> Result { let source = app_state.source.lock().await; source.refresh(&app_state.project).await.map_err(Box::new)?; - let config = source.config(&app_state.project)?; + let config = source.config(&app_state.project).await?; let bytes = body .next() @@ -316,11 +316,13 @@ pub async fn publish_package( } let repo = source.repo_git2(&app_state.project)?; + let gix_repo = gix::open(repo.path())?; + let gix_tree = root_tree(&gix_repo)?; let (scope, name) = manifest.name.as_str(); let mut oids = vec![]; - match source.read_file([scope, SCOPE_INFO_FILE], &app_state.project, None)? { + match read_file(&gix_tree, [scope, SCOPE_INFO_FILE])? { Some(info) => { let info: ScopeInfo = toml::de::from_str(&info)?; if !info.owners.contains(&user_id.0) { @@ -338,11 +340,8 @@ pub async fn publish_package( } }; - let mut entries: IndexFile = toml::de::from_str( - &source - .read_file([scope, name], &app_state.project, None)? - .unwrap_or_default(), - )?; + let mut entries: IndexFile = + toml::de::from_str(&read_file(&gix_tree, [scope, name])?.unwrap_or_default())?; let new_entry = IndexFileEntry { target: manifest.target.clone(), diff --git a/registry/src/endpoints/search.rs b/registry/src/endpoints/search.rs index 3fb6f07..cb59eeb 100644 --- a/registry/src/endpoints/search.rs +++ b/registry/src/endpoints/search.rs @@ -4,13 +4,15 @@ use actix_web::{web, HttpResponse, Responder}; use serde::Deserialize; use tantivy::{collector::Count, query::AllQuery, schema::Value, DateTime, Order}; +use crate::{error::Error, package::PackageResponse, AppState}; use pesde::{ names::PackageName, - source::{git_index::GitBasedSource, pesde::IndexFile}, + source::{ + git_index::{read_file, root_tree, GitBasedSource}, + pesde::IndexFile, + }, }; -use crate::{error::Error, package::PackageResponse, AppState}; - #[derive(Deserialize)] pub struct Request { #[serde(default)] @@ -49,6 +51,8 @@ pub async fn search_packages( .unwrap(); let source = app_state.source.lock().await; + let repo = gix::open(source.path(&app_state.project))?; + let tree = root_tree(&repo)?; let top_docs = top_docs .into_iter() @@ -64,13 +68,8 @@ pub async fn search_packages( .unwrap(); let (scope, name) = id.as_str(); - let versions: IndexFile = toml::de::from_str( - &source - .read_file([scope, name], &app_state.project, None) - .unwrap() - .unwrap(), - ) - .unwrap(); + let versions: IndexFile = + toml::de::from_str(&read_file(&tree, [scope, name]).unwrap().unwrap()).unwrap(); let (latest_version, entry) = versions .iter() diff --git a/registry/src/error.rs b/registry/src/error.rs index a5e7dcd..abd37ec 100644 --- a/registry/src/error.rs +++ b/registry/src/error.rs @@ -1,6 +1,6 @@ use actix_web::{body::BoxBody, HttpResponse, ResponseError}; use log::error; -use pesde::source::git_index::errors::{ReadFile, RefreshError}; +use pesde::source::git_index::errors::{ReadFile, RefreshError, TreeError}; use serde::Serialize; use thiserror::Error; @@ -38,6 +38,12 @@ pub enum Error { #[error("failed to serialize struct")] SerializeJson(#[from] serde_json::Error), + + #[error("failed to open git repo")] + OpenRepo(#[from] gix::open::Error), + + #[error("failed to get root tree")] + RootTree(#[from] TreeError), } #[derive(Debug, Serialize)] diff --git a/registry/src/main.rs b/registry/src/main.rs index a9bebac..bc60cd9 100644 --- a/registry/src/main.rs +++ b/registry/src/main.rs @@ -106,7 +106,7 @@ async fn run() -> std::io::Result<()> { .await .expect("failed to refresh source"); - let (search_reader, search_writer, query_parser) = make_search(&project, &source); + let (search_reader, search_writer, query_parser) = make_search(&project, &source).await; let app_data = web::Data::new(AppState { storage: { @@ -115,8 +115,12 @@ async fn run() -> std::io::Result<()> { storage }, auth: { - let auth = - get_auth_from_env(source.config(&project).expect("failed to get index config")); + let auth = get_auth_from_env( + source + .config(&project) + .await + .expect("failed to get index config"), + ); info!("auth: {auth}"); auth }, diff --git a/registry/src/search.rs b/registry/src/search.rs index 008013a..0af22ee 100644 --- a/registry/src/search.rs +++ b/registry/src/search.rs @@ -1,7 +1,12 @@ use crate::AppState; +use async_stream::stream; +use futures::{Stream, StreamExt}; use pesde::{ names::PackageName, - source::pesde::{IndexFileEntry, PesdePackageSource}, + source::{ + git_index::{root_tree, GitBasedSource}, + pesde::{IndexFile, IndexFileEntry, PesdePackageSource, SCOPE_INFO_FILE}, + }, Project, }; use tantivy::{ @@ -11,8 +16,58 @@ use tantivy::{ tokenizer::TextAnalyzer, DateTime, IndexReader, IndexWriter, Term, }; +use tokio::pin; -pub fn make_search( +pub async fn all_packages( + source: &PesdePackageSource, + project: &Project, +) -> impl Stream { + let path = source.path(project); + + stream! { + let repo = gix::open(&path).expect("failed to open index"); + let tree = root_tree(&repo).expect("failed to get root tree"); + + for entry in tree.iter() { + let entry = entry.expect("failed to read entry"); + let object = entry.object().expect("failed to get object"); + + // directories will be trees, and files will be blobs + if !matches!(object.kind, gix::object::Kind::Tree) { + continue; + } + + let package_scope = entry.filename().to_string(); + + for inner_entry in object.into_tree().iter() { + let inner_entry = inner_entry.expect("failed to read inner entry"); + let object = inner_entry.object().expect("failed to get object"); + + if !matches!(object.kind, gix::object::Kind::Blob) { + continue; + } + + let package_name = inner_entry.filename().to_string(); + + if package_name == SCOPE_INFO_FILE { + continue; + } + + let blob = object.into_blob(); + let string = String::from_utf8(blob.data.clone()).expect("failed to parse utf8"); + + let file: IndexFile = toml::from_str(&string).expect("failed to parse index file"); + + // if this panics, it's an issue with the index. + let name = format!("{package_scope}/{package_name}").parse().unwrap(); + + yield (name, file); + } + } + } +} + +pub async fn make_search( project: &Project, source: &PesdePackageSource, ) -> (IndexReader, IndexWriter, QueryParser) { @@ -45,7 +100,10 @@ pub fn make_search( .unwrap(); let mut search_writer = search_index.writer(50_000_000).unwrap(); - for (pkg_name, mut file) in source.all_packages(project).unwrap() { + let stream = all_packages(source, project).await; + pin!(stream); + + while let Some((pkg_name, mut file)) = stream.next().await { let Some((_, latest_entry)) = file.pop_last() else { log::warn!("no versions found for {pkg_name}"); continue; diff --git a/src/cli/commands/auth/login.rs b/src/cli/commands/auth/login.rs index c569ea2..b38f2eb 100644 --- a/src/cli/commands/auth/login.rs +++ b/src/cli/commands/auth/login.rs @@ -63,6 +63,7 @@ impl LoginCommand { let config = source .config(project) + .await .context("failed to read index config")?; let Some(client_id) = config.github_oauth_client_id else { anyhow::bail!("index not configured for Github oauth."); diff --git a/src/cli/commands/install.rs b/src/cli/commands/install.rs index 7dd001b..5cea9eb 100644 --- a/src/cli/commands/install.rs +++ b/src/cli/commands/install.rs @@ -1,18 +1,22 @@ use crate::cli::{ - bin_dir, download_graph, files::make_executable, repos::update_scripts, - run_on_workspace_members, up_to_date_lockfile, + bin_dir, files::make_executable, progress_bar, repos::update_scripts, run_on_workspace_members, + up_to_date_lockfile, }; use anyhow::Context; use clap::Args; use colored::{ColoredString, Colorize}; use fs_err::tokio as fs; +use futures::future::try_join_all; use indicatif::MultiProgress; use pesde::{ lockfile::Lockfile, manifest::{target::TargetKind, DependencyType}, Project, MANIFEST_FILE_NAME, }; -use std::collections::{BTreeSet, HashSet}; +use std::{ + collections::{BTreeSet, HashMap, HashSet}, + sync::Arc, +}; #[derive(Debug, Args, Copy, Clone)] pub struct InstallCommand { @@ -131,6 +135,9 @@ impl InstallCommand { } }; + let project_2 = project.clone(); + let update_scripts_handle = tokio::spawn(async move { update_scripts(&project_2).await }); + println!( "\n{}\n", format!("[now installing {} {}]", manifest.name, manifest.target) @@ -141,23 +148,32 @@ impl InstallCommand { println!("{} โŒ removing current package folders", job(1)); { - let mut deleted_folders = HashSet::new(); + let mut deleted_folders = HashMap::new(); for target_kind in TargetKind::VARIANTS { let folder = manifest.target.kind().packages_folder(target_kind); + let package_dir = project.package_dir(); - if deleted_folders.insert(folder.to_string()) { - log::debug!("deleting the {folder} folder"); + deleted_folders + .entry(folder.to_string()) + .or_insert_with(|| async move { + log::debug!("deleting the {folder} folder"); - if let Some(e) = fs::remove_dir_all(project.package_dir().join(&folder)) - .await - .err() - .filter(|e| e.kind() != std::io::ErrorKind::NotFound) - { - return Err(e).context(format!("failed to remove the {folder} folder")); - }; - } + if let Some(e) = fs::remove_dir_all(package_dir.join(&folder)) + .await + .err() + .filter(|e| e.kind() != std::io::ErrorKind::NotFound) + { + return Err(e).context(format!("failed to remove the {folder} folder")); + }; + + Ok(()) + }); } + + try_join_all(deleted_folders.into_values()) + .await + .context("failed to remove package folders")?; } let old_graph = lockfile.map(|lockfile| { @@ -183,20 +199,28 @@ impl InstallCommand { .await .context("failed to build dependency graph")?; - update_scripts(&project).await?; + update_scripts_handle.await??; - let downloaded_graph = download_graph( - &project, - &mut refreshed_sources, - &graph, - &multi, - &reqwest, - self.prod, - true, - format!("{} ๐Ÿ“ฅ downloading dependencies", job(3)), - format!("{} ๐Ÿ“ฅ downloaded dependencies", job(3)), - ) - .await?; + let downloaded_graph = { + let (rx, downloaded_graph) = project + .download_graph(&graph, &mut refreshed_sources, &reqwest, self.prod, true) + .await + .context("failed to download dependencies")?; + + progress_bar( + graph.values().map(|versions| versions.len() as u64).sum(), + rx, + &multi, + format!("{} ๐Ÿ“ฅ downloading dependencies", job(3)), + format!("{} ๐Ÿ“ฅ downloaded dependencies", job(3)), + ) + .await?; + + Arc::into_inner(downloaded_graph) + .unwrap() + .into_inner() + .unwrap() + }; let filtered_graph = if self.prod { downloaded_graph @@ -263,12 +287,19 @@ impl InstallCommand { #[cfg(feature = "patches")] { - println!("{} ๐Ÿฉน applying patches", job(5)); - - project + let rx = project .apply_patches(&filtered_graph) .await .context("failed to apply patches")?; + + progress_bar( + manifest.patches.values().map(|v| v.len() as u64).sum(), + rx, + &multi, + format!("{} ๐Ÿฉน applying patches", job(5)), + format!("{} ๐Ÿฉน applied patches", job(5)), + ) + .await?; } println!("{} ๐Ÿงน finishing up", job(JOBS)); diff --git a/src/cli/commands/outdated.rs b/src/cli/commands/outdated.rs index dae004d..8ca531a 100644 --- a/src/cli/commands/outdated.rs +++ b/src/cli/commands/outdated.rs @@ -2,10 +2,14 @@ use std::collections::HashSet; use anyhow::Context; use clap::Args; +use futures::future::try_join_all; use semver::VersionReq; +use crate::cli::up_to_date_lockfile; use pesde::{ + refresh_sources, source::{ + refs::PackageRefs, specifiers::DependencySpecifiers, traits::{PackageRef, PackageSource}, }, @@ -21,62 +25,97 @@ pub struct OutdatedCommand { impl OutdatedCommand { pub async fn run(self, project: Project) -> anyhow::Result<()> { - let graph = project.deser_lockfile().await?.graph; + let graph = match up_to_date_lockfile(&project).await? { + Some(file) => file.graph, + None => { + anyhow::bail!( + "lockfile is out of sync, run `{} install` to update it", + env!("CARGO_BIN_NAME") + ); + } + }; let manifest = project .deser_manifest() .await .context("failed to read manifest")?; + let manifest_target_kind = manifest.target.kind(); let mut refreshed_sources = HashSet::new(); - for (name, versions) in graph { - for (current_version_id, node) in versions { - let Some((alias, mut specifier)) = node.node.direct else { - continue; - }; + refresh_sources( + &project, + graph + .iter() + .flat_map(|(_, versions)| versions.iter()) + .map(|(_, node)| node.node.pkg_ref.source()), + &mut refreshed_sources, + ) + .await?; - if matches!( - specifier, - DependencySpecifiers::Git(_) | DependencySpecifiers::Workspace(_) - ) { - continue; - } + try_join_all( + graph + .into_iter() + .flat_map(|(_, versions)| versions.into_iter()) + .map(|(current_version_id, node)| { + let project = project.clone(); + async move { + let Some((alias, mut specifier)) = node.node.direct else { + return Ok::<(), anyhow::Error>(()); + }; - let source = node.node.pkg_ref.source(); - - if refreshed_sources.insert(source.clone()) { - source.refresh(&project).await?; - } - - if !self.strict { - match specifier { - DependencySpecifiers::Pesde(ref mut spec) => { - spec.version = VersionReq::STAR; + if matches!( + specifier, + DependencySpecifiers::Git(_) | DependencySpecifiers::Workspace(_) + ) { + return Ok(()); } - #[cfg(feature = "wally-compat")] - DependencySpecifiers::Wally(ref mut spec) => { - spec.version = VersionReq::STAR; + + let source = node.node.pkg_ref.source(); + + if !self.strict { + match specifier { + DependencySpecifiers::Pesde(ref mut spec) => { + spec.version = VersionReq::STAR; + } + #[cfg(feature = "wally-compat")] + DependencySpecifiers::Wally(ref mut spec) => { + spec.version = VersionReq::STAR; + } + DependencySpecifiers::Git(_) => {} + DependencySpecifiers::Workspace(_) => {} + }; } - DependencySpecifiers::Git(_) => {} - DependencySpecifiers::Workspace(_) => {} - }; - } - let version_id = source - .resolve(&specifier, &project, manifest.target.kind()) - .await - .context("failed to resolve package versions")? - .1 - .pop_last() - .map(|(v_id, _)| v_id) - .context(format!("no versions of {specifier} found"))?; + let version_id = source + .resolve(&specifier, &project, manifest_target_kind) + .await + .context("failed to resolve package versions")? + .1 + .pop_last() + .map(|(v_id, _)| v_id) + .context(format!("no versions of {specifier} found"))?; - if version_id != current_version_id { - println!("{name} ({alias}) {current_version_id} -> {version_id}"); - } - } - } + if version_id != current_version_id { + println!( + "{} {} ({alias}) {} -> {}", + match node.node.pkg_ref { + PackageRefs::Pesde(pkg_ref) => pkg_ref.name.to_string(), + #[cfg(feature = "wally-compat")] + PackageRefs::Wally(pkg_ref) => pkg_ref.name.to_string(), + _ => unreachable!(), + }, + current_version_id.target(), + current_version_id.version(), + version_id.version() + ); + } + + Ok(()) + } + }), + ) + .await?; Ok(()) } diff --git a/src/cli/commands/publish.rs b/src/cli/commands/publish.rs index 233e148..4d84d79 100644 --- a/src/cli/commands/publish.rs +++ b/src/cli/commands/publish.rs @@ -495,6 +495,7 @@ impl PublishCommand { .context("failed to refresh source")?; let config = source .config(project) + .await .context("failed to get source config")?; if archive.len() > config.max_archive_size { diff --git a/src/cli/commands/run.rs b/src/cli/commands/run.rs index bda06e9..0b0b1ed 100644 --- a/src/cli/commands/run.rs +++ b/src/cli/commands/run.rs @@ -24,9 +24,13 @@ pub struct RunCommand { impl RunCommand { pub async fn run(self, project: Project) -> anyhow::Result<()> { + let project_2 = project.clone(); + let update_scripts_handle = tokio::spawn(async move { update_scripts(&project_2).await }); + let run = |path: PathBuf| { Handle::current() - .block_on(update_scripts(&project)) + .block_on(update_scripts_handle) + .unwrap() .expect("failed to update scripts"); let mut caller = tempfile::NamedTempFile::new().expect("failed to create tempfile"); @@ -98,13 +102,15 @@ impl RunCommand { version_id.version(), ); - run(bin_path.to_path(&container_folder)) + run(bin_path.to_path(&container_folder)); + return Ok(()); } } if let Ok(manifest) = project.deser_manifest().await { if let Some(script_path) = manifest.scripts.get(&package_or_script) { - run(script_path.to_path(project.package_dir())) + run(script_path.to_path(project.package_dir())); + return Ok(()); } }; diff --git a/src/cli/commands/update.rs b/src/cli/commands/update.rs index 8f2674b..ae8a19e 100644 --- a/src/cli/commands/update.rs +++ b/src/cli/commands/update.rs @@ -1,10 +1,10 @@ -use crate::cli::{download_graph, repos::update_scripts, run_on_workspace_members}; +use crate::cli::{progress_bar, repos::update_scripts, run_on_workspace_members}; use anyhow::Context; use clap::Args; use colored::Colorize; use indicatif::MultiProgress; use pesde::{lockfile::Lockfile, Project}; -use std::collections::HashSet; +use std::{collections::HashSet, sync::Arc}; #[derive(Debug, Args, Copy, Clone)] pub struct UpdateCommand {} @@ -44,18 +44,26 @@ impl UpdateCommand { target: manifest.target.kind(), overrides: manifest.overrides, - graph: download_graph( - &project, - &mut refreshed_sources, - &graph, - &multi, - &reqwest, - false, - false, - "๐Ÿ“ฅ downloading dependencies".to_string(), - "๐Ÿ“ฅ downloaded dependencies".to_string(), - ) - .await?, + graph: { + let (rx, downloaded_graph) = project + .download_graph(&graph, &mut refreshed_sources, &reqwest, false, false) + .await + .context("failed to download dependencies")?; + + progress_bar( + graph.values().map(|versions| versions.len() as u64).sum(), + rx, + &multi, + "๐Ÿ“ฅ downloading dependencies".to_string(), + "๐Ÿ“ฅ downloaded dependencies".to_string(), + ) + .await?; + + Arc::into_inner(downloaded_graph) + .unwrap() + .into_inner() + .unwrap() + }, workspace: run_on_workspace_members(&project, |project| { let multi = multi.clone(); @@ -67,6 +75,12 @@ impl UpdateCommand { .await .context("failed to write lockfile")?; + println!( + "\n{}\nrun `{} install` in order to install the new dependencies", + "โœ… done".green(), + env!("CARGO_BIN_NAME") + ); + Ok(()) } } diff --git a/src/cli/mod.rs b/src/cli/mod.rs index e705f10..3f357b5 100644 --- a/src/cli/mod.rs +++ b/src/cli/mod.rs @@ -4,10 +4,10 @@ use fs_err::tokio as fs; use futures::StreamExt; use indicatif::MultiProgress; use pesde::{ - lockfile::{DependencyGraph, DownloadedGraph, Lockfile}, + lockfile::{DownloadedGraph, Lockfile}, manifest::target::TargetKind, names::{PackageName, PackageNames}, - source::{version_id::VersionId, workspace::specifier::VersionTypeOrReq, PackageSources}, + source::{version_id::VersionId, workspace::specifier::VersionTypeOrReq}, Project, }; use relative_path::RelativePathBuf; @@ -16,7 +16,6 @@ use std::{ future::Future, path::PathBuf, str::FromStr, - sync::Arc, time::Duration, }; use tokio::pin; @@ -191,20 +190,15 @@ pub fn parse_gix_url(s: &str) -> Result { s.try_into() } -#[allow(clippy::too_many_arguments)] -pub async fn download_graph( - project: &Project, - refreshed_sources: &mut HashSet, - graph: &DependencyGraph, +pub async fn progress_bar>( + len: u64, + mut rx: tokio::sync::mpsc::Receiver>, multi: &MultiProgress, - reqwest: &reqwest::Client, - prod: bool, - write: bool, progress_msg: String, finish_msg: String, -) -> anyhow::Result { +) -> anyhow::Result<()> { let bar = multi.add( - indicatif::ProgressBar::new(graph.values().map(|versions| versions.len() as u64).sum()) + indicatif::ProgressBar::new(len) .with_style( indicatif::ProgressStyle::default_bar() .template("{msg} {bar:40.208/166} {pos}/{len} {percent}% {elapsed_precise}")?, @@ -213,11 +207,6 @@ pub async fn download_graph( ); bar.enable_steady_tick(Duration::from_millis(100)); - let (mut rx, downloaded_graph) = project - .download_graph(graph, refreshed_sources, reqwest, prod, write) - .await - .context("failed to download dependencies")?; - while let Some(result) = rx.recv().await { bar.inc(1); @@ -229,10 +218,7 @@ pub async fn download_graph( bar.finish_with_message(finish_msg); - Ok(Arc::into_inner(downloaded_graph) - .unwrap() - .into_inner() - .unwrap()) + Ok(()) } pub fn shift_project_dir(project: &Project, pkg_dir: PathBuf) -> Project { diff --git a/src/download.rs b/src/download.rs index c9eb0ce..474f6cc 100644 --- a/src/download.rs +++ b/src/download.rs @@ -1,6 +1,7 @@ use crate::{ lockfile::{DependencyGraph, DownloadedDependencyGraphNode, DownloadedGraph}, manifest::DependencyType, + refresh_sources, source::{ traits::{PackageRef, PackageSource}, PackageSources, @@ -31,35 +32,29 @@ impl Project { write: bool, ) -> Result { let manifest = self.deser_manifest().await?; + let manifest_target_kind = manifest.target.kind(); let downloaded_graph: MultithreadedGraph = Arc::new(Mutex::new(Default::default())); - let (tx, rx) = - tokio::sync::mpsc::channel(graph.iter().map(|(_, versions)| versions.len()).sum()); + let (tx, rx) = tokio::sync::mpsc::channel( + graph + .iter() + .map(|(_, versions)| versions.len()) + .sum::() + .max(1), + ); + + refresh_sources( + self, + graph + .iter() + .flat_map(|(_, versions)| versions.iter()) + .map(|(_, node)| node.pkg_ref.source()), + refreshed_sources, + ) + .await?; for (name, versions) in graph { for (version_id, node) in versions { - let source = node.pkg_ref.source(); - - if refreshed_sources.insert(source.clone()) { - source.refresh(self).await.map_err(Box::new)?; - } - - let container_folder = node.container_folder( - &self - .package_dir() - .join( - manifest - .target - .kind() - .packages_folder(&node.pkg_ref.target_kind()), - ) - .join(PACKAGES_CONTAINER_NAME), - name, - version_id.version(), - ); - - fs::create_dir_all(&container_folder).await?; - let tx = tx.clone(); let name = name.clone(); @@ -70,7 +65,29 @@ impl Project { let reqwest = reqwest.clone(); let downloaded_graph = downloaded_graph.clone(); + let package_dir = self.package_dir().to_path_buf(); + tokio::spawn(async move { + let source = node.pkg_ref.source(); + + let container_folder = node.container_folder( + &package_dir + .join(manifest_target_kind.packages_folder(&node.pkg_ref.target_kind())) + .join(PACKAGES_CONTAINER_NAME), + &name, + version_id.version(), + ); + + match fs::create_dir_all(&container_folder).await { + Ok(_) => {} + Err(e) => { + tx.send(Err(errors::DownloadGraphError::Io(e))) + .await + .unwrap(); + return; + } + } + let project = project.clone(); log::debug!("downloading {name}@{version_id}"); diff --git a/src/lib.rs b/src/lib.rs index 870d94d..7c17047 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,13 +3,17 @@ //! pesde has its own registry, however it can also use Wally, and Git repositories as package sources. //! It has been designed with multiple targets in mind, namely Roblox, Lune, and Luau. -use crate::{lockfile::Lockfile, manifest::Manifest}; +use crate::{ + lockfile::Lockfile, + manifest::Manifest, + source::{traits::PackageSource, PackageSources}, +}; use async_stream::stream; use fs_err::tokio as fs; -use futures::Stream; +use futures::{future::try_join_all, Stream}; use gix::sec::identity::Account; use std::{ - collections::HashMap, + collections::{HashMap, HashSet}, path::{Path, PathBuf}, }; @@ -213,6 +217,26 @@ impl Project { } } +/// Refreshes the sources asynchronously +pub async fn refresh_sources>( + project: &Project, + sources: I, + refreshed_sources: &mut HashSet, +) -> Result<(), Box> { + try_join_all(sources.map(|source| { + let needs_refresh = refreshed_sources.insert(source.clone()); + async move { + if needs_refresh { + source.refresh(project).await.map_err(Box::new) + } else { + Ok(()) + } + } + })) + .await + .map(|_| ()) +} + /// Errors that can occur when using the pesde library pub mod errors { use std::path::PathBuf; diff --git a/src/patches.rs b/src/patches.rs index cf2c4a1..0f8ff52 100644 --- a/src/patches.rs +++ b/src/patches.rs @@ -75,17 +75,26 @@ impl Project { pub async fn apply_patches( &self, graph: &DownloadedGraph, - ) -> Result<(), errors::ApplyPatchesError> { + ) -> Result< + tokio::sync::mpsc::Receiver>, + errors::ApplyPatchesError, + > { let manifest = self.deser_manifest().await?; + let (tx, rx) = tokio::sync::mpsc::channel( + manifest + .patches + .values() + .map(|v| v.len()) + .sum::() + .max(1), + ); for (name, versions) in manifest.patches { for (version_id, patch_path) in versions { + let tx = tx.clone(); + + let name = name.clone(); let patch_path = patch_path.to_path(self.package_dir()); - let patch = Diff::from_buffer( - &fs::read(&patch_path) - .await - .map_err(errors::ApplyPatchesError::PatchRead)?, - )?; let Some(node) = graph .get(&name) @@ -94,6 +103,7 @@ impl Project { log::warn!( "patch for {name}@{version_id} not applied because it is not in the graph" ); + tx.send(Ok(())).await.unwrap(); continue; }; @@ -111,46 +121,103 @@ impl Project { version_id.version(), ); - log::debug!("applying patch to {name}@{version_id}"); + tokio::spawn(async move { + log::debug!("applying patch to {name}@{version_id}"); - { - let repo = setup_patches_repo(&container_folder)?; - for delta in patch.deltas() { - if !matches!(delta.status(), git2::Delta::Modified) { - continue; + let patch = match fs::read(&patch_path).await { + Ok(patch) => patch, + Err(e) => { + tx.send(Err(errors::ApplyPatchesError::PatchRead(e))) + .await + .unwrap(); + return; } + }; - let file = delta.new_file(); - let Some(relative_path) = file.path() else { - continue; + let patch = match Diff::from_buffer(&patch) { + Ok(patch) => patch, + Err(e) => { + tx.send(Err(errors::ApplyPatchesError::Git(e))) + .await + .unwrap(); + return; + } + }; + + { + let repo = match setup_patches_repo(&container_folder) { + Ok(repo) => repo, + Err(e) => { + tx.send(Err(errors::ApplyPatchesError::Git(e))) + .await + .unwrap(); + return; + } }; - let relative_path = RelativePathBuf::from_path(relative_path).unwrap(); - let path = relative_path.to_path(&container_folder); + let modified_files = patch + .deltas() + .filter(|delta| matches!(delta.status(), git2::Delta::Modified)) + .filter_map(|delta| delta.new_file().path()) + .map(|path| { + RelativePathBuf::from_path(path) + .unwrap() + .to_path(&container_folder) + }) + .filter(|path| path.is_file()) + .collect::>(); - if !path.is_file() { - continue; + for path in modified_files { + // there is no way (as far as I know) to check if it's hardlinked + // so, we always unlink it + let content = match fs::read(&path).await { + Ok(content) => content, + Err(e) => { + tx.send(Err(errors::ApplyPatchesError::File(e))) + .await + .unwrap(); + return; + } + }; + + if let Err(e) = fs::remove_file(&path).await { + tx.send(Err(errors::ApplyPatchesError::File(e))) + .await + .unwrap(); + return; + } + + if let Err(e) = fs::write(path, content).await { + tx.send(Err(errors::ApplyPatchesError::File(e))) + .await + .unwrap(); + return; + } } - // there is no way (as far as I know) to check if it's hardlinked - // so, we always unlink it - let content = fs::read(&path).await.unwrap(); - fs::remove_file(&path).await.unwrap(); - fs::write(path, content).await.unwrap(); + if let Err(e) = repo.apply(&patch, ApplyLocation::Both, None) { + tx.send(Err(errors::ApplyPatchesError::Git(e))) + .await + .unwrap(); + return; + } } - repo.apply(&patch, ApplyLocation::Both, None)?; - } + log::debug!("patch applied to {name}@{version_id}, removing .git directory"); - log::debug!("patch applied to {name}@{version_id}, removing .git directory"); + if let Err(e) = fs::remove_dir_all(container_folder.join(".git")).await { + tx.send(Err(errors::ApplyPatchesError::DotGitRemove(e))) + .await + .unwrap(); + return; + } - fs::remove_dir_all(container_folder.join(".git")) - .await - .map_err(errors::ApplyPatchesError::DotGitRemove)?; + tx.send(Ok(())).await.unwrap(); + }); } } - Ok(()) + Ok(rx) } } @@ -177,5 +244,9 @@ pub mod errors { /// Error removing the .git directory #[error("error removing .git directory")] DotGitRemove(#[source] std::io::Error), + + /// Error interacting with a patched file + #[error("error interacting with a patched file")] + File(#[source] std::io::Error), } } diff --git a/src/source/fs.rs b/src/source/fs.rs index 9a1c1a8..7807c0e 100644 --- a/src/source/fs.rs +++ b/src/source/fs.rs @@ -3,6 +3,7 @@ use crate::{ source::{IGNORED_DIRS, IGNORED_FILES}, }; use fs_err::tokio as fs; +use futures::future::try_join_all; use relative_path::RelativePathBuf; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; @@ -13,7 +14,7 @@ use std::{ }; use tempfile::Builder; use tokio::{ - io::{AsyncReadExt, AsyncWriteExt, BufWriter}, + io::{AsyncReadExt, AsyncWriteExt}, pin, }; @@ -75,7 +76,7 @@ pub(crate) async fn store_in_cas< let temp_path = Builder::new() .make_in(&tmp_dir, |_| Ok(()))? .into_temp_path(); - let mut file_writer = BufWriter::new(fs::File::create(temp_path.to_path_buf()).await?); + let mut file_writer = fs::File::create(temp_path.to_path_buf()).await?; loop { let bytes_future = contents.read(&mut buf); @@ -99,7 +100,7 @@ pub(crate) async fn store_in_cas< match temp_path.persist_noclobber(&cas_path) { Ok(_) => { - make_readonly(&file_writer.into_inner()).await?; + make_readonly(&file_writer).await?; } Err(e) if e.error.kind() == std::io::ErrorKind::AlreadyExists => {} Err(e) => return Err(e.error), @@ -118,38 +119,46 @@ impl PackageFS { ) -> std::io::Result<()> { match self { PackageFS::CAS(entries) => { - for (path, entry) in entries { - let path = path.to_path(destination.as_ref()); + try_join_all(entries.iter().map(|(path, entry)| { + let destination = destination.as_ref().to_path_buf(); + let cas_path = cas_path.as_ref().to_path_buf(); - match entry { - FSEntry::File(hash) => { - if let Some(parent) = path.parent() { - fs::create_dir_all(parent).await?; - } + async move { + let path = path.to_path(destination); - let (prefix, rest) = hash.split_at(2); - let cas_file_path = cas_path.as_ref().join(prefix).join(rest); + match entry { + FSEntry::File(hash) => { + if let Some(parent) = path.parent() { + fs::create_dir_all(parent).await?; + } - if link { - fs::hard_link(cas_file_path, path).await?; - } else { - let mut f = fs::File::create(&path).await?; - f.write_all(&fs::read(cas_file_path).await?).await?; + let (prefix, rest) = hash.split_at(2); + let cas_file_path = cas_path.join(prefix).join(rest); - #[cfg(unix)] - { - let mut permissions = f.metadata().await?.permissions(); - use std::os::unix::fs::PermissionsExt; - permissions.set_mode(permissions.mode() | 0o644); - f.set_permissions(permissions).await?; + if link { + fs::hard_link(cas_file_path, path).await?; + } else { + fs::copy(cas_file_path, &path).await?; + + #[cfg(unix)] + { + let f = fs::File::open(&path).await?; + let mut permissions = f.metadata().await?.permissions(); + use std::os::unix::fs::PermissionsExt; + permissions.set_mode(permissions.mode() | 0o644); + f.set_permissions(permissions).await?; + } } } + FSEntry::Directory => { + fs::create_dir_all(path).await?; + } } - FSEntry::Directory => { - fs::create_dir_all(path).await?; - } + + Ok::<_, std::io::Error>(()) } - } + })) + .await?; } PackageFS::Copy(src, target) => { fs::create_dir_all(destination.as_ref()).await?; diff --git a/src/source/git/mod.rs b/src/source/git/mod.rs index bcdbf3c..3b8a15b 100644 --- a/src/source/git/mod.rs +++ b/src/source/git/mod.rs @@ -1,7 +1,6 @@ -use std::{collections::BTreeMap, fmt::Debug, hash::Hash, path::PathBuf}; - -use gix::{bstr::BStr, traverse::tree::Recorder, Url}; +use gix::{bstr::BStr, traverse::tree::Recorder, ObjectId, Url}; use relative_path::RelativePathBuf; +use std::{collections::BTreeMap, fmt::Debug, hash::Hash, path::PathBuf, sync::Arc}; use crate::{ manifest::{ @@ -10,9 +9,9 @@ use crate::{ }, names::PackageNames, source::{ - fs::{FSEntry, PackageFS}, + fs::{store_in_cas, FSEntry, PackageFS}, git::{pkg_ref::GitPackageRef, specifier::GitDependencySpecifier}, - git_index::GitBasedSource, + git_index::{read_file, GitBasedSource}, specifiers::DependencySpecifiers, traits::PackageRef, PackageSource, ResolveResult, VersionId, IGNORED_DIRS, IGNORED_FILES, @@ -21,6 +20,8 @@ use crate::{ Project, DEFAULT_INDEX_NAME, LOCKFILE_FILE_NAME, MANIFEST_FILE_NAME, }; use fs_err::tokio as fs; +use futures::future::try_join_all; +use tokio::{sync::Mutex, task::spawn_blocking}; /// The Git package reference pub mod pkg_ref; @@ -126,8 +127,7 @@ impl PackageSource for GitPackageSource { root_tree.clone() }; - let manifest = match self - .read_file([MANIFEST_FILE_NAME], project, Some(tree.clone())) + let manifest = match read_file(&tree, [MANIFEST_FILE_NAME]) .map_err(|e| errors::ResolveError::ReadManifest(Box::new(self.repo_url.clone()), e))? { Some(m) => match toml::from_str::(&m) { @@ -196,12 +196,7 @@ impl PackageSource for GitPackageSource { } DependencySpecifiers::Git(_) => {} DependencySpecifiers::Workspace(specifier) => { - let lockfile = self - .read_file( - [LOCKFILE_FILE_NAME], - project, - Some(root_tree.clone()), - ) + let lockfile = read_file(&root_tree, [LOCKFILE_FILE_NAME]) .map_err(|e| { errors::ResolveError::ReadLockfile( Box::new(self.repo_url.clone()), @@ -260,15 +255,13 @@ impl PackageSource for GitPackageSource { #[cfg(feature = "wally-compat")] None => { - match self - .read_file( - [crate::source::wally::compat_util::WALLY_MANIFEST_FILE_NAME], - project, - Some(tree.clone()), - ) - .map_err(|e| { - errors::ResolveError::ReadManifest(Box::new(self.repo_url.clone()), e) - })? { + match read_file( + &tree, + [crate::source::wally::compat_util::WALLY_MANIFEST_FILE_NAME], + ) + .map_err(|e| { + errors::ResolveError::ReadManifest(Box::new(self.repo_url.clone()), e) + })? { Some(m) => { match toml::from_str::(&m) { Ok(manifest) => { @@ -396,84 +389,128 @@ impl PackageSource for GitPackageSource { Err(e) => return Err(errors::DownloadError::Io(e)), } - let mut entries = BTreeMap::new(); - let mut manifest = None; - let repo = gix::open(self.path(project)) - .map_err(|e| errors::DownloadError::OpenRepo(Box::new(self.repo_url.clone()), e))?; + .map_err(|e| errors::DownloadError::OpenRepo(Box::new(self.repo_url.clone()), e))? + .into_sync(); + let repo_url = self.repo_url.clone(); + let tree_id = pkg_ref.tree_id.clone(); - let recorder = { - let rev = repo - .rev_parse_single(BStr::new(&pkg_ref.tree_id)) - .map_err(|e| { - errors::DownloadError::ParseRev( - pkg_ref.tree_id.clone(), + let (repo, records) = spawn_blocking(move || { + let repo = repo.to_thread_local(); + + let mut recorder = Recorder::default(); + + { + let object_id = match tree_id.parse::() { + Ok(oid) => oid, + Err(e) => { + return Err(errors::DownloadError::ParseTreeId(Box::new(repo_url), e)) + } + }; + let object = match repo.find_object(object_id) { + Ok(object) => object, + Err(e) => { + return Err(errors::DownloadError::ParseOidToObject( + object_id, + Box::new(repo_url), + e, + )) + } + }; + + let tree = match object.peel_to_tree() { + Ok(tree) => tree, + Err(e) => { + return Err(errors::DownloadError::ParseObjectToTree( + Box::new(repo_url), + e, + )) + } + }; + + if let Err(e) = tree.traverse().breadthfirst(&mut recorder) { + return Err(errors::DownloadError::TraverseTree(Box::new(repo_url), e)); + } + } + + Ok::<_, errors::DownloadError>((repo.into_sync(), recorder.records)) + }) + .await + .unwrap()?; + + let repo = repo.to_thread_local(); + + let records = records + .into_iter() + .map(|entry| { + let object = repo.find_object(entry.oid).map_err(|e| { + errors::DownloadError::ParseOidToObject( + entry.oid, Box::new(self.repo_url.clone()), e, ) })?; - let tree = rev - .object() - .map_err(|e| { - errors::DownloadError::ParseEntryToObject(Box::new(self.repo_url.clone()), e) - })? - .peel_to_tree() - .map_err(|e| { - errors::DownloadError::ParseObjectToTree(Box::new(self.repo_url.clone()), e) - })?; + Ok::<_, errors::DownloadError>(( + RelativePathBuf::from(entry.filepath.to_string()), + if matches!(object.kind, gix::object::Kind::Tree) { + None + } else { + Some(object.data.clone()) + }, + )) + }) + .collect::, _>>()?; - let mut recorder = Recorder::default(); - tree.traverse().breadthfirst(&mut recorder).map_err(|e| { - errors::DownloadError::TraverseTree(Box::new(self.repo_url.clone()), e) - })?; + let manifest = Arc::new(Mutex::new(None::>)); + let entries = try_join_all( + records + .into_iter() + .filter(|(path, contents)| { + let name = path.file_name().unwrap_or(""); + if contents.is_none() { + return !IGNORED_DIRS.contains(&name); + } - recorder - }; + if IGNORED_FILES.contains(&name) { + return false; + } - for entry in recorder.records { - let path = RelativePathBuf::from(entry.filepath.to_string()); - let name = path.file_name().unwrap_or(""); - let object = repo.find_object(entry.oid).map_err(|e| { - errors::DownloadError::ParseEntryToObject(Box::new(self.repo_url.clone()), e) - })?; + if pkg_ref.use_new_structure() && name == "default.project.json" { + log::debug!( + "removing default.project.json from {}#{} at {path} - using new structure", + pkg_ref.repo, + pkg_ref.tree_id + ); + return false; + } - if matches!(object.kind, gix::object::Kind::Tree) { - if IGNORED_DIRS.contains(&name) { - continue; - } + true + }) + .map(|(path, contents)| { + let manifest = manifest.clone(); + async move { + let Some(contents) = contents else { + return Ok::<_, errors::DownloadError>((path, FSEntry::Directory)); + }; - entries.insert(path, FSEntry::Directory); + let hash = + store_in_cas(project.cas_dir(), contents.as_slice(), |_| async { Ok(()) }) + .await?; - continue; - } + if path == MANIFEST_FILE_NAME { + manifest.lock().await.replace(contents); + } - if IGNORED_FILES.contains(&name) { - continue; - } + Ok((path, FSEntry::File(hash))) + } + }), + ) + .await? + .into_iter() + .collect::>(); - if pkg_ref.use_new_structure() && name == "default.project.json" { - log::debug!( - "removing default.project.json from {}#{} at {path} - using new structure", - pkg_ref.repo, - pkg_ref.tree_id - ); - continue; - } - - let data = object.into_blob().data.clone(); - // let hash = - // store_reader_in_cas(project.cas_dir(), data.as_slice(), |_| async { Ok(()) }) - // .await?; - - if path == MANIFEST_FILE_NAME { - manifest = Some(data); - } - - // entries.insert(path, FSEntry::File(hash)); - } - - let manifest = match manifest { + let manifest = match Arc::into_inner(manifest).unwrap().into_inner() { Some(data) => match String::from_utf8(data.to_vec()) { Ok(s) => match toml::from_str::(&s) { Ok(m) => Some(m), @@ -528,6 +565,7 @@ impl PackageSource for GitPackageSource { /// Errors that can occur when interacting with the Git package source pub mod errors { use crate::manifest::target::TargetKind; + use gix::ObjectId; use relative_path::RelativePathBuf; use thiserror::Error; @@ -646,14 +684,6 @@ pub mod errors { #[error("error opening Git repository for url {0}")] OpenRepo(Box, #[source] gix::open::Error), - /// An error occurred parsing rev - #[error("error parsing rev {0} for repository {1}")] - ParseRev( - String, - Box, - #[source] gix::revision::spec::parse::single::Error, - ), - /// An error occurred while traversing the tree #[error("error traversing tree for repository {0}")] TraverseTree( @@ -661,22 +691,18 @@ pub mod errors { #[source] gix::traverse::tree::breadthfirst::Error, ), - /// An error occurred parsing an entry to object - #[error("error parsing an entry to object for repository {0}")] - ParseEntryToObject(Box, #[source] gix::object::find::existing::Error), + /// An error occurred parsing an object id to object + #[error("error parsing object id {0} to object for repository {1}")] + ParseOidToObject( + ObjectId, + Box, + #[source] gix::object::find::existing::Error, + ), /// An error occurred parsing object to tree #[error("error parsing object to tree for repository {0}")] ParseObjectToTree(Box, #[source] gix::object::peel::to_kind::Error), - /// An error occurred reading a tree entry - #[error("error reading tree entry for repository {0} at {1}")] - ReadTreeEntry( - Box, - RelativePathBuf, - #[source] gix::objs::decode::Error, - ), - /// An error occurred parsing the pesde manifest to UTF-8 #[error("error parsing the manifest for repository {0} to UTF-8")] ParseManifest(#[source] std::string::FromUtf8Error), @@ -684,5 +710,9 @@ pub mod errors { /// An error occurred while serializing the index file #[error("error serializing the index file for repository {0}")] SerializeIndex(Box, #[source] toml::ser::Error), + + /// An error occurred while parsing tree_id to ObjectId + #[error("error parsing tree_id to ObjectId for repository {0}")] + ParseTreeId(Box, #[source] gix::hash::decode::Error), } } diff --git a/src/source/git_index.rs b/src/source/git_index.rs index 903669a..f03e385 100644 --- a/src/source/git_index.rs +++ b/src/source/git_index.rs @@ -12,100 +12,6 @@ pub trait GitBasedSource { /// The URL of the repository fn repo_url(&self) -> &gix::Url; - // TODO: make this function async - /// Gets the tree of the repository - fn tree<'a>(&'a self, repo: &'a gix::Repository) -> Result, errors::TreeError> { - // this is a bare repo, so this is the actual path - let path = repo.path().to_path_buf(); - - let remote = match repo.find_default_remote(Direction::Fetch) { - Some(Ok(remote)) => remote, - Some(Err(e)) => return Err(errors::TreeError::GetDefaultRemote(path, Box::new(e))), - None => { - return Err(errors::TreeError::NoDefaultRemote(path)); - } - }; - - let refspec = match remote.refspecs(Direction::Fetch).first() { - Some(head) => head, - None => return Err(errors::TreeError::NoRefSpecs(path)), - }; - - let spec_ref = refspec.to_ref(); - let local_ref = match spec_ref.local() { - Some(local) => local - .to_string() - .replace('*', repo.branch_names().first().unwrap_or(&"main")), - None => return Err(errors::TreeError::NoLocalRefSpec(path)), - }; - - let reference = match repo.find_reference(&local_ref) { - Ok(reference) => reference, - Err(e) => return Err(errors::TreeError::NoReference(local_ref.to_string(), e)), - }; - - let reference_name = reference.name().as_bstr().to_string(); - let id = match reference.into_fully_peeled_id() { - Ok(id) => id, - Err(e) => return Err(errors::TreeError::CannotPeel(reference_name, e)), - }; - - let id_str = id.to_string(); - let object = match id.object() { - Ok(object) => object, - Err(e) => return Err(errors::TreeError::CannotConvertToObject(id_str, e)), - }; - - match object.peel_to_tree() { - Ok(tree) => Ok(tree), - Err(e) => Err(errors::TreeError::CannotPeelToTree(id_str, e)), - } - } - - /// Reads a file from the repository - fn read_file + Clone, P: ToString + PartialEq>( - &self, - file_path: I, - project: &Project, - tree: Option, - ) -> Result, errors::ReadFile> { - let path = self.path(project); - - let repo = match gix::open(&path) { - Ok(repo) => repo, - Err(e) => return Err(errors::ReadFile::Open(path, Box::new(e))), - }; - - let tree = match tree.map_or_else(|| self.tree(&repo), Ok) { - Ok(tree) => tree, - Err(e) => return Err(errors::ReadFile::Tree(path, Box::new(e))), - }; - - let file_path_str = file_path - .clone() - .into_iter() - .map(|s| s.to_string()) - .collect::>() - .join(std::path::MAIN_SEPARATOR_STR); - - let entry = match tree.lookup_entry(file_path) { - Ok(Some(entry)) => entry, - Ok(None) => return Ok(None), - Err(e) => return Err(errors::ReadFile::Lookup(file_path_str, e)), - }; - - let object = match entry.object() { - Ok(object) => object, - Err(e) => return Err(errors::ReadFile::Lookup(file_path_str, e)), - }; - - let blob = object.into_blob(); - let string = String::from_utf8(blob.data.clone()) - .map_err(|e| errors::ReadFile::Utf8(file_path_str, e))?; - - Ok(Some(string)) - } - /// Refreshes the repository async fn refresh(&self, project: &Project) -> Result<(), errors::RefreshError> { let path = self.path(project); @@ -183,6 +89,85 @@ pub trait GitBasedSource { } } +/// Reads a file from a tree +pub fn read_file + Clone, P: ToString + PartialEq>( + tree: &gix::Tree, + file_path: I, +) -> Result, errors::ReadFile> { + let file_path_str = file_path + .clone() + .into_iter() + .map(|s| s.to_string()) + .collect::>() + .join(std::path::MAIN_SEPARATOR_STR); + + let entry = match tree.lookup_entry(file_path) { + Ok(Some(entry)) => entry, + Ok(None) => return Ok(None), + Err(e) => return Err(errors::ReadFile::Lookup(file_path_str, e)), + }; + + let object = match entry.object() { + Ok(object) => object, + Err(e) => return Err(errors::ReadFile::Lookup(file_path_str, e)), + }; + + let blob = object.into_blob(); + let string = String::from_utf8(blob.data.clone()) + .map_err(|e| errors::ReadFile::Utf8(file_path_str, e))?; + + Ok(Some(string)) +} + +/// Gets the root tree of a repository +pub fn root_tree(repo: &gix::Repository) -> Result { + // this is a bare repo, so this is the actual path + let path = repo.path().to_path_buf(); + + let remote = match repo.find_default_remote(Direction::Fetch) { + Some(Ok(remote)) => remote, + Some(Err(e)) => return Err(errors::TreeError::GetDefaultRemote(path, Box::new(e))), + None => { + return Err(errors::TreeError::NoDefaultRemote(path)); + } + }; + + let refspec = match remote.refspecs(Direction::Fetch).first() { + Some(head) => head, + None => return Err(errors::TreeError::NoRefSpecs(path)), + }; + + let spec_ref = refspec.to_ref(); + let local_ref = match spec_ref.local() { + Some(local) => local + .to_string() + .replace('*', repo.branch_names().first().unwrap_or(&"main")), + None => return Err(errors::TreeError::NoLocalRefSpec(path)), + }; + + let reference = match repo.find_reference(&local_ref) { + Ok(reference) => reference, + Err(e) => return Err(errors::TreeError::NoReference(local_ref.to_string(), e)), + }; + + let reference_name = reference.name().as_bstr().to_string(); + let id = match reference.into_fully_peeled_id() { + Ok(id) => id, + Err(e) => return Err(errors::TreeError::CannotPeel(reference_name, e)), + }; + + let id_str = id.to_string(); + let object = match id.object() { + Ok(object) => object, + Err(e) => return Err(errors::TreeError::CannotConvertToObject(id_str, e)), + }; + + match object.peel_to_tree() { + Ok(tree) => Ok(tree), + Err(e) => Err(errors::TreeError::CannotPeelToTree(id_str, e)), + } +} + /// Errors that can occur when interacting with a git-based package source pub mod errors { use std::path::PathBuf; diff --git a/src/source/pesde/mod.rs b/src/source/pesde/mod.rs index 22f7371..512ae31 100644 --- a/src/source/pesde/mod.rs +++ b/src/source/pesde/mod.rs @@ -18,10 +18,10 @@ use crate::{ target::{Target, TargetKind}, DependencyType, }, - names::{PackageName, PackageNames}, + names::PackageNames, source::{ fs::{store_in_cas, FSEntry, PackageFS}, - git_index::GitBasedSource, + git_index::{read_file, root_tree, GitBasedSource}, DependencySpecifiers, PackageSource, ResolveResult, VersionId, IGNORED_DIRS, IGNORED_FILES, }, util::hash, @@ -29,8 +29,7 @@ use crate::{ }; use fs_err::tokio as fs; use futures::StreamExt; - -// TODO: make more of these functions async +use tokio::task::spawn_blocking; /// The pesde package reference pub mod pkg_ref; @@ -74,104 +73,22 @@ impl PesdePackageSource { } /// Reads the config file - pub fn config(&self, project: &Project) -> Result { - let file = self - .read_file(["config.toml"], project, None) - .map_err(Box::new)?; - - let string = match file { - Some(s) => s, - None => { - return Err(errors::ConfigError::Missing(Box::new( - self.repo_url.clone(), - ))) - } - }; - - toml::from_str(&string).map_err(Into::into) - } - - /// Reads all packages from the index - pub fn all_packages( - &self, - project: &Project, - ) -> Result, errors::AllPackagesError> { + pub async fn config(&self, project: &Project) -> Result { + let repo_url = self.repo_url.clone(); let path = self.path(project); - let repo = match gix::open(&path) { - Ok(repo) => repo, - Err(e) => return Err(errors::AllPackagesError::Open(path, Box::new(e))), - }; + spawn_blocking(move || { + let repo = gix::open(&path).map_err(Box::new)?; + let tree = root_tree(&repo).map_err(Box::new)?; + let file = read_file(&tree, ["config.toml"]).map_err(Box::new)?; - let tree = match self.tree(&repo) { - Ok(tree) => tree, - Err(e) => return Err(errors::AllPackagesError::Tree(path, Box::new(e))), - }; - - let mut packages = BTreeMap::::new(); - - for entry in tree.iter() { - let entry = match entry { - Ok(entry) => entry, - Err(e) => return Err(errors::AllPackagesError::Decode(path, e)), - }; - - let object = match entry.object() { - Ok(object) => object, - Err(e) => return Err(errors::AllPackagesError::Convert(path, e)), - }; - - // directories will be trees, and files will be blobs - if !matches!(object.kind, gix::object::Kind::Tree) { - continue; + match file { + Some(s) => toml::from_str(&s).map_err(Into::into), + None => Err(errors::ConfigError::Missing(Box::new(repo_url))), } - - let package_scope = entry.filename().to_string(); - - for inner_entry in object.into_tree().iter() { - let inner_entry = match inner_entry { - Ok(entry) => entry, - Err(e) => return Err(errors::AllPackagesError::Decode(path, e)), - }; - - let object = match inner_entry.object() { - Ok(object) => object, - Err(e) => return Err(errors::AllPackagesError::Convert(path, e)), - }; - - if !matches!(object.kind, gix::object::Kind::Blob) { - continue; - } - - let package_name = inner_entry.filename().to_string(); - - if package_name == SCOPE_INFO_FILE { - continue; - } - - let blob = object.into_blob(); - let string = String::from_utf8(blob.data.clone()) - .map_err(|e| errors::AllPackagesError::Utf8(package_name.to_string(), e))?; - - let file: IndexFile = match toml::from_str(&string) { - Ok(file) => file, - Err(e) => { - return Err(errors::AllPackagesError::Deserialize( - package_name, - path, - Box::new(e), - )) - } - }; - - // if this panics, it's an issue with the index. - let name = format!("{package_scope}/{package_name}").parse().unwrap(); - - packages.insert(name, file); - } - } - - Ok(packages) + }) + .await + .unwrap() } /// The git2 repository for the index @@ -201,7 +118,9 @@ impl PackageSource for PesdePackageSource { package_target: TargetKind, ) -> Result, Self::ResolveError> { let (scope, name) = specifier.name.as_str(); - let string = match self.read_file([scope, name], project, None) { + let repo = gix::open(self.path(project)).map_err(Box::new)?; + let tree = root_tree(&repo).map_err(Box::new)?; + let string = match read_file(&tree, [scope, name]) { Ok(Some(s)) => s, Ok(None) => return Err(Self::ResolveError::NotFound(specifier.name.to_string())), Err(e) => { @@ -249,7 +168,7 @@ impl PackageSource for PesdePackageSource { project: &Project, reqwest: &reqwest::Client, ) -> Result<(PackageFS, Target), Self::DownloadError> { - let config = self.config(project).map_err(Box::new)?; + let config = self.config(project).await.map_err(Box::new)?; let index_file = project .cas_dir .join("index") @@ -292,9 +211,9 @@ impl PackageSource for PesdePackageSource { let mut entries = BTreeMap::new(); - while let Some(entry) = archive - .entries() - .map_err(errors::DownloadError::Unpack)? + let mut archive_entries = archive.entries().map_err(errors::DownloadError::Unpack)?; + + while let Some(entry) = archive_entries .next() .await .transpose() @@ -474,8 +393,6 @@ pub type IndexFile = BTreeMap; /// Errors that can occur when interacting with the pesde package source pub mod errors { - use std::path::PathBuf; - use thiserror::Error; use crate::source::git_index::errors::{ReadFile, TreeError}; @@ -484,9 +401,13 @@ pub mod errors { #[derive(Debug, Error)] #[non_exhaustive] pub enum ResolveError { - /// Error interacting with the filesystem - #[error("error interacting with the filesystem")] - Io(#[from] std::io::Error), + /// Error opening repository + #[error("error opening repository")] + Open(#[from] Box), + + /// Error getting tree + #[error("error getting tree")] + Tree(#[from] Box), /// Package not found in index #[error("package {0} not found")] @@ -499,16 +420,20 @@ pub mod errors { /// Error parsing file for package #[error("error parsing file for {0}")] Parse(String, #[source] toml::de::Error), - - /// Error parsing file for package as utf8 - #[error("error parsing file for {0} to utf8")] - Utf8(String, #[source] std::string::FromUtf8Error), } /// Errors that can occur when reading the config file for a pesde package source #[derive(Debug, Error)] #[non_exhaustive] pub enum ConfigError { + /// Error opening repository + #[error("error opening repository")] + Open(#[from] Box), + + /// Error getting tree + #[error("error getting tree")] + Tree(#[from] Box), + /// Error reading file #[error("error reading config file")] ReadFile(#[from] Box), @@ -522,35 +447,6 @@ pub mod errors { Missing(Box), } - /// Errors that can occur when reading all packages from a pesde package source - #[derive(Debug, Error)] - #[non_exhaustive] - pub enum AllPackagesError { - /// Error opening the repository - #[error("error opening repository at {0}")] - Open(PathBuf, #[source] Box), - - /// Error reading tree from repository - #[error("error getting tree from repository at {0}")] - Tree(PathBuf, #[source] Box), - - /// Error decoding entry in repository - #[error("error decoding entry in repository at {0}")] - Decode(PathBuf, #[source] gix::objs::decode::Error), - - /// Error converting entry in repository - #[error("error converting entry in repository at {0}")] - Convert(PathBuf, #[source] gix::object::find::existing::Error), - - /// Error deserializing file in repository - #[error("error deserializing file {0} in repository at {1}")] - Deserialize(String, PathBuf, #[source] Box), - - /// Error parsing file in repository as utf8 - #[error("error parsing file for {0} as utf8")] - Utf8(String, #[source] std::string::FromUtf8Error), - } - /// Errors that can occur when downloading a package from a pesde package source #[derive(Debug, Error)] #[non_exhaustive] diff --git a/src/source/wally/mod.rs b/src/source/wally/mod.rs index cff5295..88ba96c 100644 --- a/src/source/wally/mod.rs +++ b/src/source/wally/mod.rs @@ -3,7 +3,7 @@ use crate::{ names::PackageNames, source::{ fs::{store_in_cas, FSEntry, PackageFS}, - git_index::GitBasedSource, + git_index::{read_file, root_tree, GitBasedSource}, traits::PackageSource, version_id::VersionId, wally::{compat_util::get_target, manifest::WallyManifest, pkg_ref::WallyPackageRef}, @@ -13,13 +13,14 @@ use crate::{ Project, }; use fs_err::tokio as fs; +use futures::future::try_join_all; use gix::Url; use relative_path::RelativePathBuf; use reqwest::header::AUTHORIZATION; use serde::Deserialize; use std::{collections::BTreeMap, path::PathBuf, sync::Arc}; use tempfile::tempdir; -use tokio::{io::AsyncWriteExt, sync::Mutex}; +use tokio::{io::AsyncWriteExt, sync::Mutex, task::spawn_blocking}; use tokio_util::compat::FuturesAsyncReadCompatExt; pub(crate) mod compat_util; @@ -59,21 +60,22 @@ impl WallyPackageSource { } /// Reads the config file - pub fn config(&self, project: &Project) -> Result { - let file = self - .read_file(["config.json"], project, None) - .map_err(Box::new)?; + pub async fn config(&self, project: &Project) -> Result { + let repo_url = self.repo_url.clone(); + let path = self.path(project); - let string = match file { - Some(s) => s, - None => { - return Err(errors::ConfigError::Missing(Box::new( - self.repo_url.clone(), - ))) + spawn_blocking(move || { + let repo = gix::open(&path).map_err(Box::new)?; + let tree = root_tree(&repo).map_err(Box::new)?; + let file = read_file(&tree, ["config.json"]).map_err(Box::new)?; + + match file { + Some(s) => serde_json::from_str(&s).map_err(Into::into), + None => Err(errors::ConfigError::Missing(Box::new(repo_url))), } - }; - - serde_json::from_str(&string).map_err(Into::into) + }) + .await + .unwrap() } } @@ -94,8 +96,10 @@ impl PackageSource for WallyPackageSource { project: &Project, _package_target: TargetKind, ) -> Result, Self::ResolveError> { + let repo = gix::open(self.path(project)).map_err(Box::new)?; + let tree = root_tree(&repo).map_err(Box::new)?; let (scope, name) = specifier.name.as_str(); - let string = match self.read_file([scope, name], project, None) { + let string = match read_file(&tree, [scope, name]) { Ok(Some(s)) => s, Ok(None) => return Err(Self::ResolveError::NotFound(specifier.name.to_string())), Err(e) => { @@ -142,7 +146,7 @@ impl PackageSource for WallyPackageSource { project: &Project, reqwest: &reqwest::Client, ) -> Result<(PackageFS, Target), Self::DownloadError> { - let config = self.config(project).map_err(Box::new)?; + let config = self.config(project).await.map_err(Box::new)?; let index_file = project .cas_dir .join("wally_index") @@ -170,18 +174,18 @@ impl PackageSource for WallyPackageSource { let (scope, name) = pkg_ref.name.as_str(); - let url = format!( - "{}/v1/package-contents/{scope}/{name}/{}", - config.api.as_str().trim_end_matches('/'), - pkg_ref.version - ); - - let mut request = reqwest.get(&url).header( - "Wally-Version", - std::env::var("PESDE_WALLY_VERSION") - .as_deref() - .unwrap_or("0.3.2"), - ); + let mut request = reqwest + .get(format!( + "{}/v1/package-contents/{scope}/{name}/{}", + config.api.as_str().trim_end_matches('/'), + pkg_ref.version + )) + .header( + "Wally-Version", + std::env::var("PESDE_WALLY_VERSION") + .as_deref() + .unwrap_or("0.3.2"), + ); if let Some(token) = project.auth_config.tokens().get(&self.repo_url) { log::debug!("using token for {}", self.repo_url); @@ -191,50 +195,69 @@ impl PackageSource for WallyPackageSource { let response = request.send().await?.error_for_status()?; let mut bytes = response.bytes().await?; - let mut entries = BTreeMap::new(); - let mut archive = async_zip::tokio::read::seek::ZipFileReader::with_tokio( + let archive = async_zip::tokio::read::seek::ZipFileReader::with_tokio( std::io::Cursor::new(&mut bytes), ) .await?; - for index in 0..archive.file().entries().len() { - let entry = archive.file().entries().get(index).unwrap(); - - let relative_path = RelativePathBuf::from_path(entry.filename().as_str()?).unwrap(); - let path = relative_path.to_path(tempdir.path()); - let name = relative_path.file_name().unwrap_or(""); - - let entry_is_dir = entry.dir()?; - let entry_reader = archive.reader_without_entry(index).await?; - - if entry_is_dir { - if IGNORED_DIRS.contains(&name) { - continue; - } - - entries.insert(relative_path, FSEntry::Directory); - fs::create_dir_all(&path).await?; - - continue; - } - - if IGNORED_FILES.contains(&name) { - continue; - } - - if let Some(parent) = path.parent() { - fs::create_dir_all(parent).await?; - } - - let writer = Arc::new(Mutex::new(fs::File::create(&path).await?)); - let hash = store_in_cas(project.cas_dir(), entry_reader.compat(), |bytes| { - let writer = writer.clone(); - async move { writer.lock().await.write_all(&bytes).await } + let entries = (0..archive.file().entries().len()) + .map(|index| { + let entry = archive.file().entries().get(index).unwrap(); + let relative_path = RelativePathBuf::from_path(entry.filename().as_str()?).unwrap(); + Ok::<_, errors::DownloadError>((index, entry.dir()?, relative_path)) }) - .await?; + .collect::, _>>()?; - entries.insert(relative_path, FSEntry::File(hash)); - } + let archive = Arc::new(Mutex::new(archive)); + + let entries = try_join_all( + entries + .into_iter() + .filter(|(_, is_dir, relative_path)| { + let name = relative_path.file_name().unwrap_or(""); + + if *is_dir { + return !IGNORED_DIRS.contains(&name); + } + + !IGNORED_FILES.contains(&name) + }) + .map(|(index, is_dir, relative_path)| { + let tempdir_path = tempdir.path().to_path_buf(); + let archive = archive.clone(); + + async move { + let path = relative_path.to_path(tempdir_path); + + if is_dir { + fs::create_dir_all(&path).await?; + return Ok::<_, errors::DownloadError>(( + relative_path, + FSEntry::Directory, + )); + } + + let mut archive = archive.lock().await; + let entry_reader = archive.reader_without_entry(index).await?; + if let Some(parent) = path.parent() { + fs::create_dir_all(parent).await?; + } + + let writer = Arc::new(Mutex::new(fs::File::create(&path).await?)); + let hash = + store_in_cas(project.cas_dir(), entry_reader.compat(), |bytes| { + let writer = writer.clone(); + async move { writer.lock().await.write_all(&bytes).await } + }) + .await?; + + Ok((relative_path, FSEntry::File(hash))) + } + }), + ) + .await? + .into_iter() + .collect::>(); let fs = PackageFS::CAS(entries); @@ -268,9 +291,13 @@ pub mod errors { #[derive(Debug, Error)] #[non_exhaustive] pub enum ResolveError { - /// Error interacting with the filesystem - #[error("error interacting with the filesystem")] - Io(#[from] std::io::Error), + /// Error opening repository + #[error("error opening repository")] + Open(#[from] Box), + + /// Error getting tree + #[error("error getting tree")] + Tree(#[from] Box), /// Package not found in index #[error("package {0} not found")] @@ -284,10 +311,6 @@ pub mod errors { #[error("error parsing file for {0}")] Parse(String, #[source] serde_json::Error), - /// Error parsing file for package as utf8 - #[error("error parsing file for {0} to utf8")] - Utf8(String, #[source] std::string::FromUtf8Error), - /// Error parsing all dependencies #[error("error parsing all dependencies for {0}")] AllDependencies( @@ -300,6 +323,14 @@ pub mod errors { #[derive(Debug, Error)] #[non_exhaustive] pub enum ConfigError { + /// Error opening repository + #[error("error opening repository")] + Open(#[from] Box), + + /// Error getting tree + #[error("error getting tree")] + Tree(#[from] Box), + /// Error reading file #[error("error reading config file")] ReadFile(#[from] Box), @@ -341,10 +372,6 @@ pub mod errors { #[error("error interacting with the filesystem")] Io(#[from] std::io::Error), - /// Error stripping prefix from path - #[error("error stripping prefix from path")] - StripPrefix(#[from] std::path::StripPrefixError), - /// Error serializing index file #[error("error serializing index file")] SerializeIndex(#[from] toml::ser::Error),