feat: continue change to async

This commit is contained in:
daimond113 2024-11-10 16:43:25 +01:00
parent e2fe1c50b8
commit d490c0a6f3
No known key found for this signature in database
GPG key ID: 3A8ECE51328B513C
25 changed files with 862 additions and 652 deletions

5
Cargo.lock generated
View file

@ -385,9 +385,9 @@ dependencies = [
[[package]]
name = "anyhow"
version = "1.0.91"
version = "1.0.93"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c042108f3ed77fd83760a5fd79b53be043192bb3b9dba91d8c574c0ada7850c8"
checksum = "4c95c10ba0b00a02636238b814946408b1322d5ac4760326e6fb8ec956d85775"
[[package]]
name = "arc-swap"
@ -3596,6 +3596,7 @@ dependencies = [
"actix-multipart",
"actix-web",
"async-compression",
"async-stream",
"chrono",
"constant_time_eq",
"convert_case 0.6.0",

View file

@ -96,7 +96,6 @@ pesde run foo -- --arg1 --arg2
Installs dependencies for the current project.
- `-t, --threads`: The number of threads to use for downloading dependencies.
- `--locked`: Whether to error if the lockfile is out of date.
- `--prod`: Whether to skip installing dev dependencies.
@ -171,8 +170,6 @@ pesde add https://git.acme.local/package.git#aeff6
Updates the dependencies of the current project.
- `-t, --threads`: The number of threads to use for downloading dependencies.
## `pesde x`
Runs a one-off binary package.

View file

@ -20,6 +20,7 @@ futures = "0.3.31"
tokio = "1.41.0"
tempfile = "3.13.0"
fs-err = { version = "3.0.0", features = ["tokio"] }
async-stream = "0.3.6"
git2 = "0.19.0"
gix = { version = "0.67.0", default-features = false, features = [

View file

@ -7,7 +7,7 @@ use pesde::{
manifest::target::TargetKind,
names::PackageName,
source::{
git_index::GitBasedSource,
git_index::{read_file, root_tree, GitBasedSource},
pesde::{DocEntryKind, IndexFile},
},
};
@ -73,8 +73,10 @@ pub async fn get_package_version(
let entries: IndexFile = {
let source = app_state.source.lock().await;
let repo = gix::open(source.path(&app_state.project))?;
let tree = root_tree(&repo)?;
match source.read_file([scope, name_part], &app_state.project, None)? {
match read_file(&tree, [scope, name_part])? {
Some(versions) => toml::de::from_str(&versions)?,
None => return Ok(HttpResponse::NotFound().finish()),
}

View file

@ -2,13 +2,15 @@ use std::collections::{BTreeMap, BTreeSet};
use actix_web::{web, HttpResponse, Responder};
use crate::{error::Error, package::PackageResponse, AppState};
use pesde::{
names::PackageName,
source::{git_index::GitBasedSource, pesde::IndexFile},
source::{
git_index::{read_file, root_tree, GitBasedSource},
pesde::IndexFile,
},
};
use crate::{error::Error, package::PackageResponse, AppState};
pub async fn get_package_versions(
app_state: web::Data<AppState>,
path: web::Path<PackageName>,
@ -17,11 +19,15 @@ pub async fn get_package_versions(
let (scope, name_part) = name.as_str();
let versions: IndexFile = {
let source = app_state.source.lock().await;
let versions: IndexFile =
match source.read_file([scope, name_part], &app_state.project, None)? {
let repo = gix::open(source.path(&app_state.project))?;
let tree = root_tree(&repo)?;
match read_file(&tree, [scope, name_part])? {
Some(versions) => toml::de::from_str(&versions)?,
None => return Ok(HttpResponse::NotFound().finish()),
}
};
let mut responses = BTreeMap::new();

View file

@ -16,7 +16,7 @@ use git2::{Remote, Repository, Signature};
use pesde::{
manifest::Manifest,
source::{
git_index::GitBasedSource,
git_index::{read_file, root_tree, GitBasedSource},
pesde::{DocEntry, DocEntryKind, IndexFile, IndexFileEntry, ScopeInfo, SCOPE_INFO_FILE},
specifiers::DependencySpecifiers,
version_id::VersionId,
@ -73,7 +73,7 @@ pub async fn publish_package(
) -> Result<impl Responder, Error> {
let source = app_state.source.lock().await;
source.refresh(&app_state.project).await.map_err(Box::new)?;
let config = source.config(&app_state.project)?;
let config = source.config(&app_state.project).await?;
let bytes = body
.next()
@ -316,11 +316,13 @@ pub async fn publish_package(
}
let repo = source.repo_git2(&app_state.project)?;
let gix_repo = gix::open(repo.path())?;
let gix_tree = root_tree(&gix_repo)?;
let (scope, name) = manifest.name.as_str();
let mut oids = vec![];
match source.read_file([scope, SCOPE_INFO_FILE], &app_state.project, None)? {
match read_file(&gix_tree, [scope, SCOPE_INFO_FILE])? {
Some(info) => {
let info: ScopeInfo = toml::de::from_str(&info)?;
if !info.owners.contains(&user_id.0) {
@ -338,11 +340,8 @@ pub async fn publish_package(
}
};
let mut entries: IndexFile = toml::de::from_str(
&source
.read_file([scope, name], &app_state.project, None)?
.unwrap_or_default(),
)?;
let mut entries: IndexFile =
toml::de::from_str(&read_file(&gix_tree, [scope, name])?.unwrap_or_default())?;
let new_entry = IndexFileEntry {
target: manifest.target.clone(),

View file

@ -4,13 +4,15 @@ use actix_web::{web, HttpResponse, Responder};
use serde::Deserialize;
use tantivy::{collector::Count, query::AllQuery, schema::Value, DateTime, Order};
use crate::{error::Error, package::PackageResponse, AppState};
use pesde::{
names::PackageName,
source::{git_index::GitBasedSource, pesde::IndexFile},
source::{
git_index::{read_file, root_tree, GitBasedSource},
pesde::IndexFile,
},
};
use crate::{error::Error, package::PackageResponse, AppState};
#[derive(Deserialize)]
pub struct Request {
#[serde(default)]
@ -49,6 +51,8 @@ pub async fn search_packages(
.unwrap();
let source = app_state.source.lock().await;
let repo = gix::open(source.path(&app_state.project))?;
let tree = root_tree(&repo)?;
let top_docs = top_docs
.into_iter()
@ -64,13 +68,8 @@ pub async fn search_packages(
.unwrap();
let (scope, name) = id.as_str();
let versions: IndexFile = toml::de::from_str(
&source
.read_file([scope, name], &app_state.project, None)
.unwrap()
.unwrap(),
)
.unwrap();
let versions: IndexFile =
toml::de::from_str(&read_file(&tree, [scope, name]).unwrap().unwrap()).unwrap();
let (latest_version, entry) = versions
.iter()

View file

@ -1,6 +1,6 @@
use actix_web::{body::BoxBody, HttpResponse, ResponseError};
use log::error;
use pesde::source::git_index::errors::{ReadFile, RefreshError};
use pesde::source::git_index::errors::{ReadFile, RefreshError, TreeError};
use serde::Serialize;
use thiserror::Error;
@ -38,6 +38,12 @@ pub enum Error {
#[error("failed to serialize struct")]
SerializeJson(#[from] serde_json::Error),
#[error("failed to open git repo")]
OpenRepo(#[from] gix::open::Error),
#[error("failed to get root tree")]
RootTree(#[from] TreeError),
}
#[derive(Debug, Serialize)]

View file

@ -106,7 +106,7 @@ async fn run() -> std::io::Result<()> {
.await
.expect("failed to refresh source");
let (search_reader, search_writer, query_parser) = make_search(&project, &source);
let (search_reader, search_writer, query_parser) = make_search(&project, &source).await;
let app_data = web::Data::new(AppState {
storage: {
@ -115,8 +115,12 @@ async fn run() -> std::io::Result<()> {
storage
},
auth: {
let auth =
get_auth_from_env(source.config(&project).expect("failed to get index config"));
let auth = get_auth_from_env(
source
.config(&project)
.await
.expect("failed to get index config"),
);
info!("auth: {auth}");
auth
},

View file

@ -1,7 +1,12 @@
use crate::AppState;
use async_stream::stream;
use futures::{Stream, StreamExt};
use pesde::{
names::PackageName,
source::pesde::{IndexFileEntry, PesdePackageSource},
source::{
git_index::{root_tree, GitBasedSource},
pesde::{IndexFile, IndexFileEntry, PesdePackageSource, SCOPE_INFO_FILE},
},
Project,
};
use tantivy::{
@ -11,8 +16,58 @@ use tantivy::{
tokenizer::TextAnalyzer,
DateTime, IndexReader, IndexWriter, Term,
};
use tokio::pin;
pub fn make_search(
pub async fn all_packages(
source: &PesdePackageSource,
project: &Project,
) -> impl Stream<Item = (PackageName, IndexFile)> {
let path = source.path(project);
stream! {
let repo = gix::open(&path).expect("failed to open index");
let tree = root_tree(&repo).expect("failed to get root tree");
for entry in tree.iter() {
let entry = entry.expect("failed to read entry");
let object = entry.object().expect("failed to get object");
// directories will be trees, and files will be blobs
if !matches!(object.kind, gix::object::Kind::Tree) {
continue;
}
let package_scope = entry.filename().to_string();
for inner_entry in object.into_tree().iter() {
let inner_entry = inner_entry.expect("failed to read inner entry");
let object = inner_entry.object().expect("failed to get object");
if !matches!(object.kind, gix::object::Kind::Blob) {
continue;
}
let package_name = inner_entry.filename().to_string();
if package_name == SCOPE_INFO_FILE {
continue;
}
let blob = object.into_blob();
let string = String::from_utf8(blob.data.clone()).expect("failed to parse utf8");
let file: IndexFile = toml::from_str(&string).expect("failed to parse index file");
// if this panics, it's an issue with the index.
let name = format!("{package_scope}/{package_name}").parse().unwrap();
yield (name, file);
}
}
}
}
pub async fn make_search(
project: &Project,
source: &PesdePackageSource,
) -> (IndexReader, IndexWriter, QueryParser) {
@ -45,7 +100,10 @@ pub fn make_search(
.unwrap();
let mut search_writer = search_index.writer(50_000_000).unwrap();
for (pkg_name, mut file) in source.all_packages(project).unwrap() {
let stream = all_packages(source, project).await;
pin!(stream);
while let Some((pkg_name, mut file)) = stream.next().await {
let Some((_, latest_entry)) = file.pop_last() else {
log::warn!("no versions found for {pkg_name}");
continue;

View file

@ -63,6 +63,7 @@ impl LoginCommand {
let config = source
.config(project)
.await
.context("failed to read index config")?;
let Some(client_id) = config.github_oauth_client_id else {
anyhow::bail!("index not configured for Github oauth.");

View file

@ -1,18 +1,22 @@
use crate::cli::{
bin_dir, download_graph, files::make_executable, repos::update_scripts,
run_on_workspace_members, up_to_date_lockfile,
bin_dir, files::make_executable, progress_bar, repos::update_scripts, run_on_workspace_members,
up_to_date_lockfile,
};
use anyhow::Context;
use clap::Args;
use colored::{ColoredString, Colorize};
use fs_err::tokio as fs;
use futures::future::try_join_all;
use indicatif::MultiProgress;
use pesde::{
lockfile::Lockfile,
manifest::{target::TargetKind, DependencyType},
Project, MANIFEST_FILE_NAME,
};
use std::collections::{BTreeSet, HashSet};
use std::{
collections::{BTreeSet, HashMap, HashSet},
sync::Arc,
};
#[derive(Debug, Args, Copy, Clone)]
pub struct InstallCommand {
@ -131,6 +135,9 @@ impl InstallCommand {
}
};
let project_2 = project.clone();
let update_scripts_handle = tokio::spawn(async move { update_scripts(&project_2).await });
println!(
"\n{}\n",
format!("[now installing {} {}]", manifest.name, manifest.target)
@ -141,23 +148,32 @@ impl InstallCommand {
println!("{} ❌ removing current package folders", job(1));
{
let mut deleted_folders = HashSet::new();
let mut deleted_folders = HashMap::new();
for target_kind in TargetKind::VARIANTS {
let folder = manifest.target.kind().packages_folder(target_kind);
let package_dir = project.package_dir();
if deleted_folders.insert(folder.to_string()) {
deleted_folders
.entry(folder.to_string())
.or_insert_with(|| async move {
log::debug!("deleting the {folder} folder");
if let Some(e) = fs::remove_dir_all(project.package_dir().join(&folder))
if let Some(e) = fs::remove_dir_all(package_dir.join(&folder))
.await
.err()
.filter(|e| e.kind() != std::io::ErrorKind::NotFound)
{
return Err(e).context(format!("failed to remove the {folder} folder"));
};
Ok(())
});
}
}
try_join_all(deleted_folders.into_values())
.await
.context("failed to remove package folders")?;
}
let old_graph = lockfile.map(|lockfile| {
@ -183,21 +199,29 @@ impl InstallCommand {
.await
.context("failed to build dependency graph")?;
update_scripts(&project).await?;
update_scripts_handle.await??;
let downloaded_graph = download_graph(
&project,
&mut refreshed_sources,
&graph,
let downloaded_graph = {
let (rx, downloaded_graph) = project
.download_graph(&graph, &mut refreshed_sources, &reqwest, self.prod, true)
.await
.context("failed to download dependencies")?;
progress_bar(
graph.values().map(|versions| versions.len() as u64).sum(),
rx,
&multi,
&reqwest,
self.prod,
true,
format!("{} 📥 downloading dependencies", job(3)),
format!("{} 📥 downloaded dependencies", job(3)),
)
.await?;
Arc::into_inner(downloaded_graph)
.unwrap()
.into_inner()
.unwrap()
};
let filtered_graph = if self.prod {
downloaded_graph
.clone()
@ -263,12 +287,19 @@ impl InstallCommand {
#[cfg(feature = "patches")]
{
println!("{} 🩹 applying patches", job(5));
project
let rx = project
.apply_patches(&filtered_graph)
.await
.context("failed to apply patches")?;
progress_bar(
manifest.patches.values().map(|v| v.len() as u64).sum(),
rx,
&multi,
format!("{} 🩹 applying patches", job(5)),
format!("{} 🩹 applied patches", job(5)),
)
.await?;
}
println!("{} 🧹 finishing up", job(JOBS));

View file

@ -2,10 +2,14 @@ use std::collections::HashSet;
use anyhow::Context;
use clap::Args;
use futures::future::try_join_all;
use semver::VersionReq;
use crate::cli::up_to_date_lockfile;
use pesde::{
refresh_sources,
source::{
refs::PackageRefs,
specifiers::DependencySpecifiers,
traits::{PackageRef, PackageSource},
},
@ -21,34 +25,54 @@ pub struct OutdatedCommand {
impl OutdatedCommand {
pub async fn run(self, project: Project) -> anyhow::Result<()> {
let graph = project.deser_lockfile().await?.graph;
let graph = match up_to_date_lockfile(&project).await? {
Some(file) => file.graph,
None => {
anyhow::bail!(
"lockfile is out of sync, run `{} install` to update it",
env!("CARGO_BIN_NAME")
);
}
};
let manifest = project
.deser_manifest()
.await
.context("failed to read manifest")?;
let manifest_target_kind = manifest.target.kind();
let mut refreshed_sources = HashSet::new();
for (name, versions) in graph {
for (current_version_id, node) in versions {
refresh_sources(
&project,
graph
.iter()
.flat_map(|(_, versions)| versions.iter())
.map(|(_, node)| node.node.pkg_ref.source()),
&mut refreshed_sources,
)
.await?;
try_join_all(
graph
.into_iter()
.flat_map(|(_, versions)| versions.into_iter())
.map(|(current_version_id, node)| {
let project = project.clone();
async move {
let Some((alias, mut specifier)) = node.node.direct else {
continue;
return Ok::<(), anyhow::Error>(());
};
if matches!(
specifier,
DependencySpecifiers::Git(_) | DependencySpecifiers::Workspace(_)
) {
continue;
return Ok(());
}
let source = node.node.pkg_ref.source();
if refreshed_sources.insert(source.clone()) {
source.refresh(&project).await?;
}
if !self.strict {
match specifier {
DependencySpecifiers::Pesde(ref mut spec) => {
@ -64,7 +88,7 @@ impl OutdatedCommand {
}
let version_id = source
.resolve(&specifier, &project, manifest.target.kind())
.resolve(&specifier, &project, manifest_target_kind)
.await
.context("failed to resolve package versions")?
.1
@ -73,11 +97,26 @@ impl OutdatedCommand {
.context(format!("no versions of {specifier} found"))?;
if version_id != current_version_id {
println!("{name} ({alias}) {current_version_id} -> {version_id}");
}
}
println!(
"{} {} ({alias}) {} -> {}",
match node.node.pkg_ref {
PackageRefs::Pesde(pkg_ref) => pkg_ref.name.to_string(),
#[cfg(feature = "wally-compat")]
PackageRefs::Wally(pkg_ref) => pkg_ref.name.to_string(),
_ => unreachable!(),
},
current_version_id.target(),
current_version_id.version(),
version_id.version()
);
}
Ok(())
}
}),
)
.await?;
Ok(())
}
}

View file

@ -495,6 +495,7 @@ impl PublishCommand {
.context("failed to refresh source")?;
let config = source
.config(project)
.await
.context("failed to get source config")?;
if archive.len() > config.max_archive_size {

View file

@ -24,9 +24,13 @@ pub struct RunCommand {
impl RunCommand {
pub async fn run(self, project: Project) -> anyhow::Result<()> {
let project_2 = project.clone();
let update_scripts_handle = tokio::spawn(async move { update_scripts(&project_2).await });
let run = |path: PathBuf| {
Handle::current()
.block_on(update_scripts(&project))
.block_on(update_scripts_handle)
.unwrap()
.expect("failed to update scripts");
let mut caller = tempfile::NamedTempFile::new().expect("failed to create tempfile");
@ -98,13 +102,15 @@ impl RunCommand {
version_id.version(),
);
run(bin_path.to_path(&container_folder))
run(bin_path.to_path(&container_folder));
return Ok(());
}
}
if let Ok(manifest) = project.deser_manifest().await {
if let Some(script_path) = manifest.scripts.get(&package_or_script) {
run(script_path.to_path(project.package_dir()))
run(script_path.to_path(project.package_dir()));
return Ok(());
}
};

View file

@ -1,10 +1,10 @@
use crate::cli::{download_graph, repos::update_scripts, run_on_workspace_members};
use crate::cli::{progress_bar, repos::update_scripts, run_on_workspace_members};
use anyhow::Context;
use clap::Args;
use colored::Colorize;
use indicatif::MultiProgress;
use pesde::{lockfile::Lockfile, Project};
use std::collections::HashSet;
use std::{collections::HashSet, sync::Arc};
#[derive(Debug, Args, Copy, Clone)]
pub struct UpdateCommand {}
@ -44,18 +44,26 @@ impl UpdateCommand {
target: manifest.target.kind(),
overrides: manifest.overrides,
graph: download_graph(
&project,
&mut refreshed_sources,
&graph,
graph: {
let (rx, downloaded_graph) = project
.download_graph(&graph, &mut refreshed_sources, &reqwest, false, false)
.await
.context("failed to download dependencies")?;
progress_bar(
graph.values().map(|versions| versions.len() as u64).sum(),
rx,
&multi,
&reqwest,
false,
false,
"📥 downloading dependencies".to_string(),
"📥 downloaded dependencies".to_string(),
)
.await?,
.await?;
Arc::into_inner(downloaded_graph)
.unwrap()
.into_inner()
.unwrap()
},
workspace: run_on_workspace_members(&project, |project| {
let multi = multi.clone();
@ -67,6 +75,12 @@ impl UpdateCommand {
.await
.context("failed to write lockfile")?;
println!(
"\n{}\nrun `{} install` in order to install the new dependencies",
"✅ done".green(),
env!("CARGO_BIN_NAME")
);
Ok(())
}
}

View file

@ -4,10 +4,10 @@ use fs_err::tokio as fs;
use futures::StreamExt;
use indicatif::MultiProgress;
use pesde::{
lockfile::{DependencyGraph, DownloadedGraph, Lockfile},
lockfile::{DownloadedGraph, Lockfile},
manifest::target::TargetKind,
names::{PackageName, PackageNames},
source::{version_id::VersionId, workspace::specifier::VersionTypeOrReq, PackageSources},
source::{version_id::VersionId, workspace::specifier::VersionTypeOrReq},
Project,
};
use relative_path::RelativePathBuf;
@ -16,7 +16,6 @@ use std::{
future::Future,
path::PathBuf,
str::FromStr,
sync::Arc,
time::Duration,
};
use tokio::pin;
@ -191,20 +190,15 @@ pub fn parse_gix_url(s: &str) -> Result<gix::Url, gix::url::parse::Error> {
s.try_into()
}
#[allow(clippy::too_many_arguments)]
pub async fn download_graph(
project: &Project,
refreshed_sources: &mut HashSet<PackageSources>,
graph: &DependencyGraph,
pub async fn progress_bar<E: std::error::Error + Into<anyhow::Error>>(
len: u64,
mut rx: tokio::sync::mpsc::Receiver<Result<(), E>>,
multi: &MultiProgress,
reqwest: &reqwest::Client,
prod: bool,
write: bool,
progress_msg: String,
finish_msg: String,
) -> anyhow::Result<DownloadedGraph> {
) -> anyhow::Result<()> {
let bar = multi.add(
indicatif::ProgressBar::new(graph.values().map(|versions| versions.len() as u64).sum())
indicatif::ProgressBar::new(len)
.with_style(
indicatif::ProgressStyle::default_bar()
.template("{msg} {bar:40.208/166} {pos}/{len} {percent}% {elapsed_precise}")?,
@ -213,11 +207,6 @@ pub async fn download_graph(
);
bar.enable_steady_tick(Duration::from_millis(100));
let (mut rx, downloaded_graph) = project
.download_graph(graph, refreshed_sources, reqwest, prod, write)
.await
.context("failed to download dependencies")?;
while let Some(result) = rx.recv().await {
bar.inc(1);
@ -229,10 +218,7 @@ pub async fn download_graph(
bar.finish_with_message(finish_msg);
Ok(Arc::into_inner(downloaded_graph)
.unwrap()
.into_inner()
.unwrap())
Ok(())
}
pub fn shift_project_dir(project: &Project, pkg_dir: PathBuf) -> Project {

View file

@ -1,6 +1,7 @@
use crate::{
lockfile::{DependencyGraph, DownloadedDependencyGraphNode, DownloadedGraph},
manifest::DependencyType,
refresh_sources,
source::{
traits::{PackageRef, PackageSource},
PackageSources,
@ -31,35 +32,29 @@ impl Project {
write: bool,
) -> Result<MultithreadDownloadJob, errors::DownloadGraphError> {
let manifest = self.deser_manifest().await?;
let manifest_target_kind = manifest.target.kind();
let downloaded_graph: MultithreadedGraph = Arc::new(Mutex::new(Default::default()));
let (tx, rx) =
tokio::sync::mpsc::channel(graph.iter().map(|(_, versions)| versions.len()).sum());
let (tx, rx) = tokio::sync::mpsc::channel(
graph
.iter()
.map(|(_, versions)| versions.len())
.sum::<usize>()
.max(1),
);
refresh_sources(
self,
graph
.iter()
.flat_map(|(_, versions)| versions.iter())
.map(|(_, node)| node.pkg_ref.source()),
refreshed_sources,
)
.await?;
for (name, versions) in graph {
for (version_id, node) in versions {
let source = node.pkg_ref.source();
if refreshed_sources.insert(source.clone()) {
source.refresh(self).await.map_err(Box::new)?;
}
let container_folder = node.container_folder(
&self
.package_dir()
.join(
manifest
.target
.kind()
.packages_folder(&node.pkg_ref.target_kind()),
)
.join(PACKAGES_CONTAINER_NAME),
name,
version_id.version(),
);
fs::create_dir_all(&container_folder).await?;
let tx = tx.clone();
let name = name.clone();
@ -70,7 +65,29 @@ impl Project {
let reqwest = reqwest.clone();
let downloaded_graph = downloaded_graph.clone();
let package_dir = self.package_dir().to_path_buf();
tokio::spawn(async move {
let source = node.pkg_ref.source();
let container_folder = node.container_folder(
&package_dir
.join(manifest_target_kind.packages_folder(&node.pkg_ref.target_kind()))
.join(PACKAGES_CONTAINER_NAME),
&name,
version_id.version(),
);
match fs::create_dir_all(&container_folder).await {
Ok(_) => {}
Err(e) => {
tx.send(Err(errors::DownloadGraphError::Io(e)))
.await
.unwrap();
return;
}
}
let project = project.clone();
log::debug!("downloading {name}@{version_id}");

View file

@ -3,13 +3,17 @@
//! pesde has its own registry, however it can also use Wally, and Git repositories as package sources.
//! It has been designed with multiple targets in mind, namely Roblox, Lune, and Luau.
use crate::{lockfile::Lockfile, manifest::Manifest};
use crate::{
lockfile::Lockfile,
manifest::Manifest,
source::{traits::PackageSource, PackageSources},
};
use async_stream::stream;
use fs_err::tokio as fs;
use futures::Stream;
use futures::{future::try_join_all, Stream};
use gix::sec::identity::Account;
use std::{
collections::HashMap,
collections::{HashMap, HashSet},
path::{Path, PathBuf},
};
@ -213,6 +217,26 @@ impl Project {
}
}
/// Refreshes the sources asynchronously
pub async fn refresh_sources<I: Iterator<Item = PackageSources>>(
project: &Project,
sources: I,
refreshed_sources: &mut HashSet<PackageSources>,
) -> Result<(), Box<source::errors::RefreshError>> {
try_join_all(sources.map(|source| {
let needs_refresh = refreshed_sources.insert(source.clone());
async move {
if needs_refresh {
source.refresh(project).await.map_err(Box::new)
} else {
Ok(())
}
}
}))
.await
.map(|_| ())
}
/// Errors that can occur when using the pesde library
pub mod errors {
use std::path::PathBuf;

View file

@ -75,17 +75,26 @@ impl Project {
pub async fn apply_patches(
&self,
graph: &DownloadedGraph,
) -> Result<(), errors::ApplyPatchesError> {
) -> Result<
tokio::sync::mpsc::Receiver<Result<(), errors::ApplyPatchesError>>,
errors::ApplyPatchesError,
> {
let manifest = self.deser_manifest().await?;
let (tx, rx) = tokio::sync::mpsc::channel(
manifest
.patches
.values()
.map(|v| v.len())
.sum::<usize>()
.max(1),
);
for (name, versions) in manifest.patches {
for (version_id, patch_path) in versions {
let tx = tx.clone();
let name = name.clone();
let patch_path = patch_path.to_path(self.package_dir());
let patch = Diff::from_buffer(
&fs::read(&patch_path)
.await
.map_err(errors::ApplyPatchesError::PatchRead)?,
)?;
let Some(node) = graph
.get(&name)
@ -94,6 +103,7 @@ impl Project {
log::warn!(
"patch for {name}@{version_id} not applied because it is not in the graph"
);
tx.send(Ok(())).await.unwrap();
continue;
};
@ -111,46 +121,103 @@ impl Project {
version_id.version(),
);
tokio::spawn(async move {
log::debug!("applying patch to {name}@{version_id}");
{
let repo = setup_patches_repo(&container_folder)?;
for delta in patch.deltas() {
if !matches!(delta.status(), git2::Delta::Modified) {
continue;
let patch = match fs::read(&patch_path).await {
Ok(patch) => patch,
Err(e) => {
tx.send(Err(errors::ApplyPatchesError::PatchRead(e)))
.await
.unwrap();
return;
}
let file = delta.new_file();
let Some(relative_path) = file.path() else {
continue;
};
let relative_path = RelativePathBuf::from_path(relative_path).unwrap();
let path = relative_path.to_path(&container_folder);
if !path.is_file() {
continue;
let patch = match Diff::from_buffer(&patch) {
Ok(patch) => patch,
Err(e) => {
tx.send(Err(errors::ApplyPatchesError::Git(e)))
.await
.unwrap();
return;
}
};
{
let repo = match setup_patches_repo(&container_folder) {
Ok(repo) => repo,
Err(e) => {
tx.send(Err(errors::ApplyPatchesError::Git(e)))
.await
.unwrap();
return;
}
};
let modified_files = patch
.deltas()
.filter(|delta| matches!(delta.status(), git2::Delta::Modified))
.filter_map(|delta| delta.new_file().path())
.map(|path| {
RelativePathBuf::from_path(path)
.unwrap()
.to_path(&container_folder)
})
.filter(|path| path.is_file())
.collect::<Vec<_>>();
for path in modified_files {
// there is no way (as far as I know) to check if it's hardlinked
// so, we always unlink it
let content = fs::read(&path).await.unwrap();
fs::remove_file(&path).await.unwrap();
fs::write(path, content).await.unwrap();
let content = match fs::read(&path).await {
Ok(content) => content,
Err(e) => {
tx.send(Err(errors::ApplyPatchesError::File(e)))
.await
.unwrap();
return;
}
};
if let Err(e) = fs::remove_file(&path).await {
tx.send(Err(errors::ApplyPatchesError::File(e)))
.await
.unwrap();
return;
}
repo.apply(&patch, ApplyLocation::Both, None)?;
if let Err(e) = fs::write(path, content).await {
tx.send(Err(errors::ApplyPatchesError::File(e)))
.await
.unwrap();
return;
}
}
if let Err(e) = repo.apply(&patch, ApplyLocation::Both, None) {
tx.send(Err(errors::ApplyPatchesError::Git(e)))
.await
.unwrap();
return;
}
}
log::debug!("patch applied to {name}@{version_id}, removing .git directory");
fs::remove_dir_all(container_folder.join(".git"))
if let Err(e) = fs::remove_dir_all(container_folder.join(".git")).await {
tx.send(Err(errors::ApplyPatchesError::DotGitRemove(e)))
.await
.map_err(errors::ApplyPatchesError::DotGitRemove)?;
.unwrap();
return;
}
tx.send(Ok(())).await.unwrap();
});
}
}
Ok(())
Ok(rx)
}
}
@ -177,5 +244,9 @@ pub mod errors {
/// Error removing the .git directory
#[error("error removing .git directory")]
DotGitRemove(#[source] std::io::Error),
/// Error interacting with a patched file
#[error("error interacting with a patched file")]
File(#[source] std::io::Error),
}
}

View file

@ -3,6 +3,7 @@ use crate::{
source::{IGNORED_DIRS, IGNORED_FILES},
};
use fs_err::tokio as fs;
use futures::future::try_join_all;
use relative_path::RelativePathBuf;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
@ -13,7 +14,7 @@ use std::{
};
use tempfile::Builder;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt, BufWriter},
io::{AsyncReadExt, AsyncWriteExt},
pin,
};
@ -75,7 +76,7 @@ pub(crate) async fn store_in_cas<
let temp_path = Builder::new()
.make_in(&tmp_dir, |_| Ok(()))?
.into_temp_path();
let mut file_writer = BufWriter::new(fs::File::create(temp_path.to_path_buf()).await?);
let mut file_writer = fs::File::create(temp_path.to_path_buf()).await?;
loop {
let bytes_future = contents.read(&mut buf);
@ -99,7 +100,7 @@ pub(crate) async fn store_in_cas<
match temp_path.persist_noclobber(&cas_path) {
Ok(_) => {
make_readonly(&file_writer.into_inner()).await?;
make_readonly(&file_writer).await?;
}
Err(e) if e.error.kind() == std::io::ErrorKind::AlreadyExists => {}
Err(e) => return Err(e.error),
@ -118,8 +119,12 @@ impl PackageFS {
) -> std::io::Result<()> {
match self {
PackageFS::CAS(entries) => {
for (path, entry) in entries {
let path = path.to_path(destination.as_ref());
try_join_all(entries.iter().map(|(path, entry)| {
let destination = destination.as_ref().to_path_buf();
let cas_path = cas_path.as_ref().to_path_buf();
async move {
let path = path.to_path(destination);
match entry {
FSEntry::File(hash) => {
@ -128,16 +133,16 @@ impl PackageFS {
}
let (prefix, rest) = hash.split_at(2);
let cas_file_path = cas_path.as_ref().join(prefix).join(rest);
let cas_file_path = cas_path.join(prefix).join(rest);
if link {
fs::hard_link(cas_file_path, path).await?;
} else {
let mut f = fs::File::create(&path).await?;
f.write_all(&fs::read(cas_file_path).await?).await?;
fs::copy(cas_file_path, &path).await?;
#[cfg(unix)]
{
let f = fs::File::open(&path).await?;
let mut permissions = f.metadata().await?.permissions();
use std::os::unix::fs::PermissionsExt;
permissions.set_mode(permissions.mode() | 0o644);
@ -149,7 +154,11 @@ impl PackageFS {
fs::create_dir_all(path).await?;
}
}
Ok::<_, std::io::Error>(())
}
}))
.await?;
}
PackageFS::Copy(src, target) => {
fs::create_dir_all(destination.as_ref()).await?;

View file

@ -1,7 +1,6 @@
use std::{collections::BTreeMap, fmt::Debug, hash::Hash, path::PathBuf};
use gix::{bstr::BStr, traverse::tree::Recorder, Url};
use gix::{bstr::BStr, traverse::tree::Recorder, ObjectId, Url};
use relative_path::RelativePathBuf;
use std::{collections::BTreeMap, fmt::Debug, hash::Hash, path::PathBuf, sync::Arc};
use crate::{
manifest::{
@ -10,9 +9,9 @@ use crate::{
},
names::PackageNames,
source::{
fs::{FSEntry, PackageFS},
fs::{store_in_cas, FSEntry, PackageFS},
git::{pkg_ref::GitPackageRef, specifier::GitDependencySpecifier},
git_index::GitBasedSource,
git_index::{read_file, GitBasedSource},
specifiers::DependencySpecifiers,
traits::PackageRef,
PackageSource, ResolveResult, VersionId, IGNORED_DIRS, IGNORED_FILES,
@ -21,6 +20,8 @@ use crate::{
Project, DEFAULT_INDEX_NAME, LOCKFILE_FILE_NAME, MANIFEST_FILE_NAME,
};
use fs_err::tokio as fs;
use futures::future::try_join_all;
use tokio::{sync::Mutex, task::spawn_blocking};
/// The Git package reference
pub mod pkg_ref;
@ -126,8 +127,7 @@ impl PackageSource for GitPackageSource {
root_tree.clone()
};
let manifest = match self
.read_file([MANIFEST_FILE_NAME], project, Some(tree.clone()))
let manifest = match read_file(&tree, [MANIFEST_FILE_NAME])
.map_err(|e| errors::ResolveError::ReadManifest(Box::new(self.repo_url.clone()), e))?
{
Some(m) => match toml::from_str::<Manifest>(&m) {
@ -196,12 +196,7 @@ impl PackageSource for GitPackageSource {
}
DependencySpecifiers::Git(_) => {}
DependencySpecifiers::Workspace(specifier) => {
let lockfile = self
.read_file(
[LOCKFILE_FILE_NAME],
project,
Some(root_tree.clone()),
)
let lockfile = read_file(&root_tree, [LOCKFILE_FILE_NAME])
.map_err(|e| {
errors::ResolveError::ReadLockfile(
Box::new(self.repo_url.clone()),
@ -260,11 +255,9 @@ impl PackageSource for GitPackageSource {
#[cfg(feature = "wally-compat")]
None => {
match self
.read_file(
match read_file(
&tree,
[crate::source::wally::compat_util::WALLY_MANIFEST_FILE_NAME],
project,
Some(tree.clone()),
)
.map_err(|e| {
errors::ResolveError::ReadManifest(Box::new(self.repo_url.clone()), e)
@ -396,60 +389,91 @@ impl PackageSource for GitPackageSource {
Err(e) => return Err(errors::DownloadError::Io(e)),
}
let mut entries = BTreeMap::new();
let mut manifest = None;
let repo = gix::open(self.path(project))
.map_err(|e| errors::DownloadError::OpenRepo(Box::new(self.repo_url.clone()), e))?;
.map_err(|e| errors::DownloadError::OpenRepo(Box::new(self.repo_url.clone()), e))?
.into_sync();
let repo_url = self.repo_url.clone();
let tree_id = pkg_ref.tree_id.clone();
let recorder = {
let rev = repo
.rev_parse_single(BStr::new(&pkg_ref.tree_id))
.map_err(|e| {
errors::DownloadError::ParseRev(
pkg_ref.tree_id.clone(),
let (repo, records) = spawn_blocking(move || {
let repo = repo.to_thread_local();
let mut recorder = Recorder::default();
{
let object_id = match tree_id.parse::<ObjectId>() {
Ok(oid) => oid,
Err(e) => {
return Err(errors::DownloadError::ParseTreeId(Box::new(repo_url), e))
}
};
let object = match repo.find_object(object_id) {
Ok(object) => object,
Err(e) => {
return Err(errors::DownloadError::ParseOidToObject(
object_id,
Box::new(repo_url),
e,
))
}
};
let tree = match object.peel_to_tree() {
Ok(tree) => tree,
Err(e) => {
return Err(errors::DownloadError::ParseObjectToTree(
Box::new(repo_url),
e,
))
}
};
if let Err(e) = tree.traverse().breadthfirst(&mut recorder) {
return Err(errors::DownloadError::TraverseTree(Box::new(repo_url), e));
}
}
Ok::<_, errors::DownloadError>((repo.into_sync(), recorder.records))
})
.await
.unwrap()?;
let repo = repo.to_thread_local();
let records = records
.into_iter()
.map(|entry| {
let object = repo.find_object(entry.oid).map_err(|e| {
errors::DownloadError::ParseOidToObject(
entry.oid,
Box::new(self.repo_url.clone()),
e,
)
})?;
let tree = rev
.object()
.map_err(|e| {
errors::DownloadError::ParseEntryToObject(Box::new(self.repo_url.clone()), e)
})?
.peel_to_tree()
.map_err(|e| {
errors::DownloadError::ParseObjectToTree(Box::new(self.repo_url.clone()), e)
})?;
let mut recorder = Recorder::default();
tree.traverse().breadthfirst(&mut recorder).map_err(|e| {
errors::DownloadError::TraverseTree(Box::new(self.repo_url.clone()), e)
})?;
recorder
};
for entry in recorder.records {
let path = RelativePathBuf::from(entry.filepath.to_string());
let name = path.file_name().unwrap_or("");
let object = repo.find_object(entry.oid).map_err(|e| {
errors::DownloadError::ParseEntryToObject(Box::new(self.repo_url.clone()), e)
})?;
Ok::<_, errors::DownloadError>((
RelativePathBuf::from(entry.filepath.to_string()),
if matches!(object.kind, gix::object::Kind::Tree) {
if IGNORED_DIRS.contains(&name) {
continue;
}
None
} else {
Some(object.data.clone())
},
))
})
.collect::<Result<Vec<_>, _>>()?;
entries.insert(path, FSEntry::Directory);
continue;
let manifest = Arc::new(Mutex::new(None::<Vec<u8>>));
let entries = try_join_all(
records
.into_iter()
.filter(|(path, contents)| {
let name = path.file_name().unwrap_or("");
if contents.is_none() {
return !IGNORED_DIRS.contains(&name);
}
if IGNORED_FILES.contains(&name) {
continue;
return false;
}
if pkg_ref.use_new_structure() && name == "default.project.json" {
@ -458,22 +482,35 @@ impl PackageSource for GitPackageSource {
pkg_ref.repo,
pkg_ref.tree_id
);
continue;
return false;
}
let data = object.into_blob().data.clone();
// let hash =
// store_reader_in_cas(project.cas_dir(), data.as_slice(), |_| async { Ok(()) })
// .await?;
true
})
.map(|(path, contents)| {
let manifest = manifest.clone();
async move {
let Some(contents) = contents else {
return Ok::<_, errors::DownloadError>((path, FSEntry::Directory));
};
let hash =
store_in_cas(project.cas_dir(), contents.as_slice(), |_| async { Ok(()) })
.await?;
if path == MANIFEST_FILE_NAME {
manifest = Some(data);
manifest.lock().await.replace(contents);
}
// entries.insert(path, FSEntry::File(hash));
Ok((path, FSEntry::File(hash)))
}
}),
)
.await?
.into_iter()
.collect::<BTreeMap<_, _>>();
let manifest = match manifest {
let manifest = match Arc::into_inner(manifest).unwrap().into_inner() {
Some(data) => match String::from_utf8(data.to_vec()) {
Ok(s) => match toml::from_str::<Manifest>(&s) {
Ok(m) => Some(m),
@ -528,6 +565,7 @@ impl PackageSource for GitPackageSource {
/// Errors that can occur when interacting with the Git package source
pub mod errors {
use crate::manifest::target::TargetKind;
use gix::ObjectId;
use relative_path::RelativePathBuf;
use thiserror::Error;
@ -646,14 +684,6 @@ pub mod errors {
#[error("error opening Git repository for url {0}")]
OpenRepo(Box<gix::Url>, #[source] gix::open::Error),
/// An error occurred parsing rev
#[error("error parsing rev {0} for repository {1}")]
ParseRev(
String,
Box<gix::Url>,
#[source] gix::revision::spec::parse::single::Error,
),
/// An error occurred while traversing the tree
#[error("error traversing tree for repository {0}")]
TraverseTree(
@ -661,22 +691,18 @@ pub mod errors {
#[source] gix::traverse::tree::breadthfirst::Error,
),
/// An error occurred parsing an entry to object
#[error("error parsing an entry to object for repository {0}")]
ParseEntryToObject(Box<gix::Url>, #[source] gix::object::find::existing::Error),
/// An error occurred parsing an object id to object
#[error("error parsing object id {0} to object for repository {1}")]
ParseOidToObject(
ObjectId,
Box<gix::Url>,
#[source] gix::object::find::existing::Error,
),
/// An error occurred parsing object to tree
#[error("error parsing object to tree for repository {0}")]
ParseObjectToTree(Box<gix::Url>, #[source] gix::object::peel::to_kind::Error),
/// An error occurred reading a tree entry
#[error("error reading tree entry for repository {0} at {1}")]
ReadTreeEntry(
Box<gix::Url>,
RelativePathBuf,
#[source] gix::objs::decode::Error,
),
/// An error occurred parsing the pesde manifest to UTF-8
#[error("error parsing the manifest for repository {0} to UTF-8")]
ParseManifest(#[source] std::string::FromUtf8Error),
@ -684,5 +710,9 @@ pub mod errors {
/// An error occurred while serializing the index file
#[error("error serializing the index file for repository {0}")]
SerializeIndex(Box<gix::Url>, #[source] toml::ser::Error),
/// An error occurred while parsing tree_id to ObjectId
#[error("error parsing tree_id to ObjectId for repository {0}")]
ParseTreeId(Box<gix::Url>, #[source] gix::hash::decode::Error),
}
}

View file

@ -12,100 +12,6 @@ pub trait GitBasedSource {
/// The URL of the repository
fn repo_url(&self) -> &gix::Url;
// TODO: make this function async
/// Gets the tree of the repository
fn tree<'a>(&'a self, repo: &'a gix::Repository) -> Result<gix::Tree<'a>, errors::TreeError> {
// this is a bare repo, so this is the actual path
let path = repo.path().to_path_buf();
let remote = match repo.find_default_remote(Direction::Fetch) {
Some(Ok(remote)) => remote,
Some(Err(e)) => return Err(errors::TreeError::GetDefaultRemote(path, Box::new(e))),
None => {
return Err(errors::TreeError::NoDefaultRemote(path));
}
};
let refspec = match remote.refspecs(Direction::Fetch).first() {
Some(head) => head,
None => return Err(errors::TreeError::NoRefSpecs(path)),
};
let spec_ref = refspec.to_ref();
let local_ref = match spec_ref.local() {
Some(local) => local
.to_string()
.replace('*', repo.branch_names().first().unwrap_or(&"main")),
None => return Err(errors::TreeError::NoLocalRefSpec(path)),
};
let reference = match repo.find_reference(&local_ref) {
Ok(reference) => reference,
Err(e) => return Err(errors::TreeError::NoReference(local_ref.to_string(), e)),
};
let reference_name = reference.name().as_bstr().to_string();
let id = match reference.into_fully_peeled_id() {
Ok(id) => id,
Err(e) => return Err(errors::TreeError::CannotPeel(reference_name, e)),
};
let id_str = id.to_string();
let object = match id.object() {
Ok(object) => object,
Err(e) => return Err(errors::TreeError::CannotConvertToObject(id_str, e)),
};
match object.peel_to_tree() {
Ok(tree) => Ok(tree),
Err(e) => Err(errors::TreeError::CannotPeelToTree(id_str, e)),
}
}
/// Reads a file from the repository
fn read_file<I: IntoIterator<Item = P> + Clone, P: ToString + PartialEq<gix::bstr::BStr>>(
&self,
file_path: I,
project: &Project,
tree: Option<gix::Tree>,
) -> Result<Option<String>, errors::ReadFile> {
let path = self.path(project);
let repo = match gix::open(&path) {
Ok(repo) => repo,
Err(e) => return Err(errors::ReadFile::Open(path, Box::new(e))),
};
let tree = match tree.map_or_else(|| self.tree(&repo), Ok) {
Ok(tree) => tree,
Err(e) => return Err(errors::ReadFile::Tree(path, Box::new(e))),
};
let file_path_str = file_path
.clone()
.into_iter()
.map(|s| s.to_string())
.collect::<Vec<_>>()
.join(std::path::MAIN_SEPARATOR_STR);
let entry = match tree.lookup_entry(file_path) {
Ok(Some(entry)) => entry,
Ok(None) => return Ok(None),
Err(e) => return Err(errors::ReadFile::Lookup(file_path_str, e)),
};
let object = match entry.object() {
Ok(object) => object,
Err(e) => return Err(errors::ReadFile::Lookup(file_path_str, e)),
};
let blob = object.into_blob();
let string = String::from_utf8(blob.data.clone())
.map_err(|e| errors::ReadFile::Utf8(file_path_str, e))?;
Ok(Some(string))
}
/// Refreshes the repository
async fn refresh(&self, project: &Project) -> Result<(), errors::RefreshError> {
let path = self.path(project);
@ -183,6 +89,85 @@ pub trait GitBasedSource {
}
}
/// Reads a file from a tree
pub fn read_file<I: IntoIterator<Item = P> + Clone, P: ToString + PartialEq<gix::bstr::BStr>>(
tree: &gix::Tree,
file_path: I,
) -> Result<Option<String>, errors::ReadFile> {
let file_path_str = file_path
.clone()
.into_iter()
.map(|s| s.to_string())
.collect::<Vec<_>>()
.join(std::path::MAIN_SEPARATOR_STR);
let entry = match tree.lookup_entry(file_path) {
Ok(Some(entry)) => entry,
Ok(None) => return Ok(None),
Err(e) => return Err(errors::ReadFile::Lookup(file_path_str, e)),
};
let object = match entry.object() {
Ok(object) => object,
Err(e) => return Err(errors::ReadFile::Lookup(file_path_str, e)),
};
let blob = object.into_blob();
let string = String::from_utf8(blob.data.clone())
.map_err(|e| errors::ReadFile::Utf8(file_path_str, e))?;
Ok(Some(string))
}
/// Gets the root tree of a repository
pub fn root_tree(repo: &gix::Repository) -> Result<gix::Tree, errors::TreeError> {
// this is a bare repo, so this is the actual path
let path = repo.path().to_path_buf();
let remote = match repo.find_default_remote(Direction::Fetch) {
Some(Ok(remote)) => remote,
Some(Err(e)) => return Err(errors::TreeError::GetDefaultRemote(path, Box::new(e))),
None => {
return Err(errors::TreeError::NoDefaultRemote(path));
}
};
let refspec = match remote.refspecs(Direction::Fetch).first() {
Some(head) => head,
None => return Err(errors::TreeError::NoRefSpecs(path)),
};
let spec_ref = refspec.to_ref();
let local_ref = match spec_ref.local() {
Some(local) => local
.to_string()
.replace('*', repo.branch_names().first().unwrap_or(&"main")),
None => return Err(errors::TreeError::NoLocalRefSpec(path)),
};
let reference = match repo.find_reference(&local_ref) {
Ok(reference) => reference,
Err(e) => return Err(errors::TreeError::NoReference(local_ref.to_string(), e)),
};
let reference_name = reference.name().as_bstr().to_string();
let id = match reference.into_fully_peeled_id() {
Ok(id) => id,
Err(e) => return Err(errors::TreeError::CannotPeel(reference_name, e)),
};
let id_str = id.to_string();
let object = match id.object() {
Ok(object) => object,
Err(e) => return Err(errors::TreeError::CannotConvertToObject(id_str, e)),
};
match object.peel_to_tree() {
Ok(tree) => Ok(tree),
Err(e) => Err(errors::TreeError::CannotPeelToTree(id_str, e)),
}
}
/// Errors that can occur when interacting with a git-based package source
pub mod errors {
use std::path::PathBuf;

View file

@ -18,10 +18,10 @@ use crate::{
target::{Target, TargetKind},
DependencyType,
},
names::{PackageName, PackageNames},
names::PackageNames,
source::{
fs::{store_in_cas, FSEntry, PackageFS},
git_index::GitBasedSource,
git_index::{read_file, root_tree, GitBasedSource},
DependencySpecifiers, PackageSource, ResolveResult, VersionId, IGNORED_DIRS, IGNORED_FILES,
},
util::hash,
@ -29,8 +29,7 @@ use crate::{
};
use fs_err::tokio as fs;
use futures::StreamExt;
// TODO: make more of these functions async
use tokio::task::spawn_blocking;
/// The pesde package reference
pub mod pkg_ref;
@ -74,104 +73,22 @@ impl PesdePackageSource {
}
/// Reads the config file
pub fn config(&self, project: &Project) -> Result<IndexConfig, errors::ConfigError> {
let file = self
.read_file(["config.toml"], project, None)
.map_err(Box::new)?;
let string = match file {
Some(s) => s,
None => {
return Err(errors::ConfigError::Missing(Box::new(
self.repo_url.clone(),
)))
}
};
toml::from_str(&string).map_err(Into::into)
}
/// Reads all packages from the index
pub fn all_packages(
&self,
project: &Project,
) -> Result<BTreeMap<PackageName, IndexFile>, errors::AllPackagesError> {
pub async fn config(&self, project: &Project) -> Result<IndexConfig, errors::ConfigError> {
let repo_url = self.repo_url.clone();
let path = self.path(project);
let repo = match gix::open(&path) {
Ok(repo) => repo,
Err(e) => return Err(errors::AllPackagesError::Open(path, Box::new(e))),
};
spawn_blocking(move || {
let repo = gix::open(&path).map_err(Box::new)?;
let tree = root_tree(&repo).map_err(Box::new)?;
let file = read_file(&tree, ["config.toml"]).map_err(Box::new)?;
let tree = match self.tree(&repo) {
Ok(tree) => tree,
Err(e) => return Err(errors::AllPackagesError::Tree(path, Box::new(e))),
};
let mut packages = BTreeMap::<PackageName, IndexFile>::new();
for entry in tree.iter() {
let entry = match entry {
Ok(entry) => entry,
Err(e) => return Err(errors::AllPackagesError::Decode(path, e)),
};
let object = match entry.object() {
Ok(object) => object,
Err(e) => return Err(errors::AllPackagesError::Convert(path, e)),
};
// directories will be trees, and files will be blobs
if !matches!(object.kind, gix::object::Kind::Tree) {
continue;
match file {
Some(s) => toml::from_str(&s).map_err(Into::into),
None => Err(errors::ConfigError::Missing(Box::new(repo_url))),
}
let package_scope = entry.filename().to_string();
for inner_entry in object.into_tree().iter() {
let inner_entry = match inner_entry {
Ok(entry) => entry,
Err(e) => return Err(errors::AllPackagesError::Decode(path, e)),
};
let object = match inner_entry.object() {
Ok(object) => object,
Err(e) => return Err(errors::AllPackagesError::Convert(path, e)),
};
if !matches!(object.kind, gix::object::Kind::Blob) {
continue;
}
let package_name = inner_entry.filename().to_string();
if package_name == SCOPE_INFO_FILE {
continue;
}
let blob = object.into_blob();
let string = String::from_utf8(blob.data.clone())
.map_err(|e| errors::AllPackagesError::Utf8(package_name.to_string(), e))?;
let file: IndexFile = match toml::from_str(&string) {
Ok(file) => file,
Err(e) => {
return Err(errors::AllPackagesError::Deserialize(
package_name,
path,
Box::new(e),
))
}
};
// if this panics, it's an issue with the index.
let name = format!("{package_scope}/{package_name}").parse().unwrap();
packages.insert(name, file);
}
}
Ok(packages)
})
.await
.unwrap()
}
/// The git2 repository for the index
@ -201,7 +118,9 @@ impl PackageSource for PesdePackageSource {
package_target: TargetKind,
) -> Result<ResolveResult<Self::Ref>, Self::ResolveError> {
let (scope, name) = specifier.name.as_str();
let string = match self.read_file([scope, name], project, None) {
let repo = gix::open(self.path(project)).map_err(Box::new)?;
let tree = root_tree(&repo).map_err(Box::new)?;
let string = match read_file(&tree, [scope, name]) {
Ok(Some(s)) => s,
Ok(None) => return Err(Self::ResolveError::NotFound(specifier.name.to_string())),
Err(e) => {
@ -249,7 +168,7 @@ impl PackageSource for PesdePackageSource {
project: &Project,
reqwest: &reqwest::Client,
) -> Result<(PackageFS, Target), Self::DownloadError> {
let config = self.config(project).map_err(Box::new)?;
let config = self.config(project).await.map_err(Box::new)?;
let index_file = project
.cas_dir
.join("index")
@ -292,9 +211,9 @@ impl PackageSource for PesdePackageSource {
let mut entries = BTreeMap::new();
while let Some(entry) = archive
.entries()
.map_err(errors::DownloadError::Unpack)?
let mut archive_entries = archive.entries().map_err(errors::DownloadError::Unpack)?;
while let Some(entry) = archive_entries
.next()
.await
.transpose()
@ -474,8 +393,6 @@ pub type IndexFile = BTreeMap<VersionId, IndexFileEntry>;
/// Errors that can occur when interacting with the pesde package source
pub mod errors {
use std::path::PathBuf;
use thiserror::Error;
use crate::source::git_index::errors::{ReadFile, TreeError};
@ -484,9 +401,13 @@ pub mod errors {
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum ResolveError {
/// Error interacting with the filesystem
#[error("error interacting with the filesystem")]
Io(#[from] std::io::Error),
/// Error opening repository
#[error("error opening repository")]
Open(#[from] Box<gix::open::Error>),
/// Error getting tree
#[error("error getting tree")]
Tree(#[from] Box<TreeError>),
/// Package not found in index
#[error("package {0} not found")]
@ -499,16 +420,20 @@ pub mod errors {
/// Error parsing file for package
#[error("error parsing file for {0}")]
Parse(String, #[source] toml::de::Error),
/// Error parsing file for package as utf8
#[error("error parsing file for {0} to utf8")]
Utf8(String, #[source] std::string::FromUtf8Error),
}
/// Errors that can occur when reading the config file for a pesde package source
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum ConfigError {
/// Error opening repository
#[error("error opening repository")]
Open(#[from] Box<gix::open::Error>),
/// Error getting tree
#[error("error getting tree")]
Tree(#[from] Box<TreeError>),
/// Error reading file
#[error("error reading config file")]
ReadFile(#[from] Box<ReadFile>),
@ -522,35 +447,6 @@ pub mod errors {
Missing(Box<gix::Url>),
}
/// Errors that can occur when reading all packages from a pesde package source
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum AllPackagesError {
/// Error opening the repository
#[error("error opening repository at {0}")]
Open(PathBuf, #[source] Box<gix::open::Error>),
/// Error reading tree from repository
#[error("error getting tree from repository at {0}")]
Tree(PathBuf, #[source] Box<TreeError>),
/// Error decoding entry in repository
#[error("error decoding entry in repository at {0}")]
Decode(PathBuf, #[source] gix::objs::decode::Error),
/// Error converting entry in repository
#[error("error converting entry in repository at {0}")]
Convert(PathBuf, #[source] gix::object::find::existing::Error),
/// Error deserializing file in repository
#[error("error deserializing file {0} in repository at {1}")]
Deserialize(String, PathBuf, #[source] Box<toml::de::Error>),
/// Error parsing file in repository as utf8
#[error("error parsing file for {0} as utf8")]
Utf8(String, #[source] std::string::FromUtf8Error),
}
/// Errors that can occur when downloading a package from a pesde package source
#[derive(Debug, Error)]
#[non_exhaustive]

View file

@ -3,7 +3,7 @@ use crate::{
names::PackageNames,
source::{
fs::{store_in_cas, FSEntry, PackageFS},
git_index::GitBasedSource,
git_index::{read_file, root_tree, GitBasedSource},
traits::PackageSource,
version_id::VersionId,
wally::{compat_util::get_target, manifest::WallyManifest, pkg_ref::WallyPackageRef},
@ -13,13 +13,14 @@ use crate::{
Project,
};
use fs_err::tokio as fs;
use futures::future::try_join_all;
use gix::Url;
use relative_path::RelativePathBuf;
use reqwest::header::AUTHORIZATION;
use serde::Deserialize;
use std::{collections::BTreeMap, path::PathBuf, sync::Arc};
use tempfile::tempdir;
use tokio::{io::AsyncWriteExt, sync::Mutex};
use tokio::{io::AsyncWriteExt, sync::Mutex, task::spawn_blocking};
use tokio_util::compat::FuturesAsyncReadCompatExt;
pub(crate) mod compat_util;
@ -59,21 +60,22 @@ impl WallyPackageSource {
}
/// Reads the config file
pub fn config(&self, project: &Project) -> Result<WallyIndexConfig, errors::ConfigError> {
let file = self
.read_file(["config.json"], project, None)
.map_err(Box::new)?;
pub async fn config(&self, project: &Project) -> Result<WallyIndexConfig, errors::ConfigError> {
let repo_url = self.repo_url.clone();
let path = self.path(project);
let string = match file {
Some(s) => s,
None => {
return Err(errors::ConfigError::Missing(Box::new(
self.repo_url.clone(),
)))
spawn_blocking(move || {
let repo = gix::open(&path).map_err(Box::new)?;
let tree = root_tree(&repo).map_err(Box::new)?;
let file = read_file(&tree, ["config.json"]).map_err(Box::new)?;
match file {
Some(s) => serde_json::from_str(&s).map_err(Into::into),
None => Err(errors::ConfigError::Missing(Box::new(repo_url))),
}
};
serde_json::from_str(&string).map_err(Into::into)
})
.await
.unwrap()
}
}
@ -94,8 +96,10 @@ impl PackageSource for WallyPackageSource {
project: &Project,
_package_target: TargetKind,
) -> Result<crate::source::ResolveResult<Self::Ref>, Self::ResolveError> {
let repo = gix::open(self.path(project)).map_err(Box::new)?;
let tree = root_tree(&repo).map_err(Box::new)?;
let (scope, name) = specifier.name.as_str();
let string = match self.read_file([scope, name], project, None) {
let string = match read_file(&tree, [scope, name]) {
Ok(Some(s)) => s,
Ok(None) => return Err(Self::ResolveError::NotFound(specifier.name.to_string())),
Err(e) => {
@ -142,7 +146,7 @@ impl PackageSource for WallyPackageSource {
project: &Project,
reqwest: &reqwest::Client,
) -> Result<(PackageFS, Target), Self::DownloadError> {
let config = self.config(project).map_err(Box::new)?;
let config = self.config(project).await.map_err(Box::new)?;
let index_file = project
.cas_dir
.join("wally_index")
@ -170,13 +174,13 @@ impl PackageSource for WallyPackageSource {
let (scope, name) = pkg_ref.name.as_str();
let url = format!(
let mut request = reqwest
.get(format!(
"{}/v1/package-contents/{scope}/{name}/{}",
config.api.as_str().trim_end_matches('/'),
pkg_ref.version
);
let mut request = reqwest.get(&url).header(
))
.header(
"Wally-Version",
std::env::var("PESDE_WALLY_VERSION")
.as_deref()
@ -191,50 +195,69 @@ impl PackageSource for WallyPackageSource {
let response = request.send().await?.error_for_status()?;
let mut bytes = response.bytes().await?;
let mut entries = BTreeMap::new();
let mut archive = async_zip::tokio::read::seek::ZipFileReader::with_tokio(
let archive = async_zip::tokio::read::seek::ZipFileReader::with_tokio(
std::io::Cursor::new(&mut bytes),
)
.await?;
for index in 0..archive.file().entries().len() {
let entries = (0..archive.file().entries().len())
.map(|index| {
let entry = archive.file().entries().get(index).unwrap();
let relative_path = RelativePathBuf::from_path(entry.filename().as_str()?).unwrap();
let path = relative_path.to_path(tempdir.path());
Ok::<_, errors::DownloadError>((index, entry.dir()?, relative_path))
})
.collect::<Result<Vec<_>, _>>()?;
let archive = Arc::new(Mutex::new(archive));
let entries = try_join_all(
entries
.into_iter()
.filter(|(_, is_dir, relative_path)| {
let name = relative_path.file_name().unwrap_or("");
let entry_is_dir = entry.dir()?;
let entry_reader = archive.reader_without_entry(index).await?;
if entry_is_dir {
if IGNORED_DIRS.contains(&name) {
continue;
if *is_dir {
return !IGNORED_DIRS.contains(&name);
}
entries.insert(relative_path, FSEntry::Directory);
!IGNORED_FILES.contains(&name)
})
.map(|(index, is_dir, relative_path)| {
let tempdir_path = tempdir.path().to_path_buf();
let archive = archive.clone();
async move {
let path = relative_path.to_path(tempdir_path);
if is_dir {
fs::create_dir_all(&path).await?;
continue;
}
if IGNORED_FILES.contains(&name) {
continue;
return Ok::<_, errors::DownloadError>((
relative_path,
FSEntry::Directory,
));
}
let mut archive = archive.lock().await;
let entry_reader = archive.reader_without_entry(index).await?;
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).await?;
}
let writer = Arc::new(Mutex::new(fs::File::create(&path).await?));
let hash = store_in_cas(project.cas_dir(), entry_reader.compat(), |bytes| {
let hash =
store_in_cas(project.cas_dir(), entry_reader.compat(), |bytes| {
let writer = writer.clone();
async move { writer.lock().await.write_all(&bytes).await }
})
.await?;
entries.insert(relative_path, FSEntry::File(hash));
Ok((relative_path, FSEntry::File(hash)))
}
}),
)
.await?
.into_iter()
.collect::<BTreeMap<_, _>>();
let fs = PackageFS::CAS(entries);
@ -268,9 +291,13 @@ pub mod errors {
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum ResolveError {
/// Error interacting with the filesystem
#[error("error interacting with the filesystem")]
Io(#[from] std::io::Error),
/// Error opening repository
#[error("error opening repository")]
Open(#[from] Box<gix::open::Error>),
/// Error getting tree
#[error("error getting tree")]
Tree(#[from] Box<crate::source::git_index::errors::TreeError>),
/// Package not found in index
#[error("package {0} not found")]
@ -284,10 +311,6 @@ pub mod errors {
#[error("error parsing file for {0}")]
Parse(String, #[source] serde_json::Error),
/// Error parsing file for package as utf8
#[error("error parsing file for {0} to utf8")]
Utf8(String, #[source] std::string::FromUtf8Error),
/// Error parsing all dependencies
#[error("error parsing all dependencies for {0}")]
AllDependencies(
@ -300,6 +323,14 @@ pub mod errors {
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum ConfigError {
/// Error opening repository
#[error("error opening repository")]
Open(#[from] Box<gix::open::Error>),
/// Error getting tree
#[error("error getting tree")]
Tree(#[from] Box<crate::source::git_index::errors::TreeError>),
/// Error reading file
#[error("error reading config file")]
ReadFile(#[from] Box<ReadFile>),
@ -341,10 +372,6 @@ pub mod errors {
#[error("error interacting with the filesystem")]
Io(#[from] std::io::Error),
/// Error stripping prefix from path
#[error("error stripping prefix from path")]
StripPrefix(#[from] std::path::StripPrefixError),
/// Error serializing index file
#[error("error serializing index file")]
SerializeIndex(#[from] toml::ser::Error),