From 5d62549817f85a8d968816dfe3106642fda01d14 Mon Sep 17 00:00:00 2001 From: daimond113 <72147841+daimond113@users.noreply.github.com> Date: Wed, 1 Jan 2025 18:46:00 +0100 Subject: [PATCH] feat: switch to JoinSet over join_all --- registry/src/endpoints/publish_version.rs | 66 ++++++++++++++--------- src/cli/install.rs | 10 ++-- src/source/fs.rs | 58 +++++++++++--------- 3 files changed, 78 insertions(+), 56 deletions(-) diff --git a/registry/src/endpoints/publish_version.rs b/registry/src/endpoints/publish_version.rs index b36ad96..724f2c1 100644 --- a/registry/src/endpoints/publish_version.rs +++ b/registry/src/endpoints/publish_version.rs @@ -10,7 +10,6 @@ use actix_web::{web, web::Bytes, HttpResponse, Responder}; use async_compression::Level; use convert_case::{Case, Casing}; use fs_err::tokio as fs; -use futures::{future::join_all, join}; use git2::{Remote, Repository, Signature}; use pesde::{ manifest::Manifest, @@ -31,7 +30,10 @@ use std::{ collections::{BTreeSet, HashMap}, io::{Cursor, Write}, }; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + task::JoinSet, +}; fn signature<'a>() -> Signature<'a> { Signature::now( @@ -487,29 +489,43 @@ pub async fn publish_package( let version_id = VersionId::new(manifest.version.clone(), manifest.target.kind()); - let (a, b, c) = join!( - app_state - .storage - .store_package(&manifest.name, &version_id, bytes.to_vec()), - join_all( - docs_pages - .into_iter() - .map(|(hash, content)| app_state.storage.store_doc(hash, content)), - ), - async { - if let Some(readme) = readme { - app_state - .storage - .store_readme(&manifest.name, &version_id, readme) - .await - } else { - Ok(()) - } - } - ); - a?; - b.into_iter().collect::>()?; - c?; + let mut tasks = docs_pages + .into_iter() + .map(|(hash, content)| { + let app_state = app_state.clone(); + async move { app_state.storage.store_doc(hash, content).await } + }) + .collect::>(); + + { + let app_state = app_state.clone(); + let name = manifest.name.clone(); + let version_id = version_id.clone(); + + tasks.spawn(async move { + app_state + .storage + .store_package(&name, &version_id, bytes.to_vec()) + .await + }); + } + + if let Some(readme) = readme { + let app_state = app_state.clone(); + let name = manifest.name.clone(); + let version_id = version_id.clone(); + + tasks.spawn(async move { + app_state + .storage + .store_readme(&name, &version_id, readme) + .await + }); + } + + while let Some(res) = tasks.join_next().await { + res.unwrap()?; + } Ok(HttpResponse::Ok().body(format!( "published {}@{} {}", diff --git a/src/cli/install.rs b/src/cli/install.rs index f727d21..fcd9e22 100644 --- a/src/cli/install.rs +++ b/src/cli/install.rs @@ -13,7 +13,6 @@ use crate::cli::{ use anyhow::Context; use colored::Colorize; use fs_err::tokio as fs; -use futures::future::try_join_all; use pesde::{ download_and_link::{filter_graph, DownloadAndLinkHooks, DownloadAndLinkOptions}, graph::{ConvertableGraph, DependencyGraph, DownloadedGraph}, @@ -209,7 +208,7 @@ pub async fn install( for target_kind in TargetKind::VARIANTS { let folder = manifest.target.kind().packages_folder(target_kind); - let package_dir = project.package_dir(); + let package_dir = project.package_dir().to_path_buf(); deleted_folders .entry(folder.to_string()) @@ -229,9 +228,10 @@ pub async fn install( }); } - try_join_all(deleted_folders.into_values()) - .await - .context("failed to remove package folders")?; + let mut tasks = deleted_folders.into_values().collect::>(); + while let Some(task) = tasks.join_next().await { + task.unwrap()?; + } } root_progress.reset(); diff --git a/src/source/fs.rs b/src/source/fs.rs index 5cabbab..fa643bd 100644 --- a/src/source/fs.rs +++ b/src/source/fs.rs @@ -3,7 +3,6 @@ use crate::{ source::{IGNORED_DIRS, IGNORED_FILES}, }; use fs_err::tokio as fs; -use futures::future::try_join_all; use relative_path::RelativePathBuf; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; @@ -16,6 +15,7 @@ use tempfile::Builder; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, pin, + task::JoinSet, }; use tracing::instrument; @@ -128,38 +128,44 @@ impl PackageFs { ) -> std::io::Result<()> { match self { PackageFs::CAS(entries) => { - try_join_all(entries.iter().map(|(path, entry)| { - let destination = destination.as_ref().to_path_buf(); - let cas_path = cas_path.as_ref().to_path_buf(); - - async move { + let mut tasks = entries + .iter() + .map(|(path, entry)| { + let destination = destination.as_ref().to_path_buf(); + let cas_path = cas_path.as_ref().to_path_buf(); let path = path.to_path(destination); + let entry = entry.clone(); - match entry { - FsEntry::File(hash) => { - if let Some(parent) = path.parent() { - fs::create_dir_all(parent).await?; + async move { + match entry { + FsEntry::File(hash) => { + if let Some(parent) = path.parent() { + fs::create_dir_all(parent).await?; + } + + let (prefix, rest) = hash.split_at(2); + let cas_file_path = cas_path.join(prefix).join(rest); + + if link { + fs::hard_link(cas_file_path, path).await?; + } else { + fs::copy(cas_file_path, &path).await?; + set_readonly(&path, false).await?; + } } - - let (prefix, rest) = hash.split_at(2); - let cas_file_path = cas_path.join(prefix).join(rest); - - if link { - fs::hard_link(cas_file_path, path).await?; - } else { - fs::copy(cas_file_path, &path).await?; - set_readonly(&path, false).await?; + FsEntry::Directory => { + fs::create_dir_all(path).await?; } } - FsEntry::Directory => { - fs::create_dir_all(path).await?; - } + + Ok::<_, std::io::Error>(()) } + }) + .collect::>(); - Ok::<_, std::io::Error>(()) - } - })) - .await?; + while let Some(task) = tasks.join_next().await { + task.unwrap()?; + } } PackageFs::Copy(src, target) => { fs::create_dir_all(destination.as_ref()).await?;