feat: switch to JoinSet over join_all
Some checks are pending
Debug / Get build version (push) Waiting to run
Debug / Build for linux-x86_64 (push) Blocked by required conditions
Debug / Build for macos-aarch64 (push) Blocked by required conditions
Debug / Build for macos-x86_64 (push) Blocked by required conditions
Debug / Build for windows-x86_64 (push) Blocked by required conditions
Test & Lint / lint (push) Waiting to run

This commit is contained in:
daimond113 2025-01-01 18:46:00 +01:00
parent 83fa22f7de
commit 5d62549817
No known key found for this signature in database
GPG key ID: 3A8ECE51328B513C
3 changed files with 78 additions and 56 deletions

View file

@ -10,7 +10,6 @@ use actix_web::{web, web::Bytes, HttpResponse, Responder};
use async_compression::Level; use async_compression::Level;
use convert_case::{Case, Casing}; use convert_case::{Case, Casing};
use fs_err::tokio as fs; use fs_err::tokio as fs;
use futures::{future::join_all, join};
use git2::{Remote, Repository, Signature}; use git2::{Remote, Repository, Signature};
use pesde::{ use pesde::{
manifest::Manifest, manifest::Manifest,
@ -31,7 +30,10 @@ use std::{
collections::{BTreeSet, HashMap}, collections::{BTreeSet, HashMap},
io::{Cursor, Write}, io::{Cursor, Write},
}; };
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
task::JoinSet,
};
fn signature<'a>() -> Signature<'a> { fn signature<'a>() -> Signature<'a> {
Signature::now( Signature::now(
@ -487,29 +489,43 @@ pub async fn publish_package(
let version_id = VersionId::new(manifest.version.clone(), manifest.target.kind()); let version_id = VersionId::new(manifest.version.clone(), manifest.target.kind());
let (a, b, c) = join!( let mut tasks = docs_pages
app_state .into_iter()
.storage .map(|(hash, content)| {
.store_package(&manifest.name, &version_id, bytes.to_vec()), let app_state = app_state.clone();
join_all( async move { app_state.storage.store_doc(hash, content).await }
docs_pages })
.into_iter() .collect::<JoinSet<_>>();
.map(|(hash, content)| app_state.storage.store_doc(hash, content)),
), {
async { let app_state = app_state.clone();
if let Some(readme) = readme { let name = manifest.name.clone();
app_state let version_id = version_id.clone();
.storage
.store_readme(&manifest.name, &version_id, readme) tasks.spawn(async move {
.await app_state
} else { .storage
Ok(()) .store_package(&name, &version_id, bytes.to_vec())
} .await
} });
); }
a?;
b.into_iter().collect::<Result<(), _>>()?; if let Some(readme) = readme {
c?; let app_state = app_state.clone();
let name = manifest.name.clone();
let version_id = version_id.clone();
tasks.spawn(async move {
app_state
.storage
.store_readme(&name, &version_id, readme)
.await
});
}
while let Some(res) = tasks.join_next().await {
res.unwrap()?;
}
Ok(HttpResponse::Ok().body(format!( Ok(HttpResponse::Ok().body(format!(
"published {}@{} {}", "published {}@{} {}",

View file

@ -13,7 +13,6 @@ use crate::cli::{
use anyhow::Context; use anyhow::Context;
use colored::Colorize; use colored::Colorize;
use fs_err::tokio as fs; use fs_err::tokio as fs;
use futures::future::try_join_all;
use pesde::{ use pesde::{
download_and_link::{filter_graph, DownloadAndLinkHooks, DownloadAndLinkOptions}, download_and_link::{filter_graph, DownloadAndLinkHooks, DownloadAndLinkOptions},
graph::{ConvertableGraph, DependencyGraph, DownloadedGraph}, graph::{ConvertableGraph, DependencyGraph, DownloadedGraph},
@ -209,7 +208,7 @@ pub async fn install(
for target_kind in TargetKind::VARIANTS { for target_kind in TargetKind::VARIANTS {
let folder = manifest.target.kind().packages_folder(target_kind); let folder = manifest.target.kind().packages_folder(target_kind);
let package_dir = project.package_dir(); let package_dir = project.package_dir().to_path_buf();
deleted_folders deleted_folders
.entry(folder.to_string()) .entry(folder.to_string())
@ -229,9 +228,10 @@ pub async fn install(
}); });
} }
try_join_all(deleted_folders.into_values()) let mut tasks = deleted_folders.into_values().collect::<JoinSet<_>>();
.await while let Some(task) = tasks.join_next().await {
.context("failed to remove package folders")?; task.unwrap()?;
}
} }
root_progress.reset(); root_progress.reset();

View file

@ -3,7 +3,6 @@ use crate::{
source::{IGNORED_DIRS, IGNORED_FILES}, source::{IGNORED_DIRS, IGNORED_FILES},
}; };
use fs_err::tokio as fs; use fs_err::tokio as fs;
use futures::future::try_join_all;
use relative_path::RelativePathBuf; use relative_path::RelativePathBuf;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
@ -16,6 +15,7 @@ use tempfile::Builder;
use tokio::{ use tokio::{
io::{AsyncReadExt, AsyncWriteExt}, io::{AsyncReadExt, AsyncWriteExt},
pin, pin,
task::JoinSet,
}; };
use tracing::instrument; use tracing::instrument;
@ -128,38 +128,44 @@ impl PackageFs {
) -> std::io::Result<()> { ) -> std::io::Result<()> {
match self { match self {
PackageFs::CAS(entries) => { PackageFs::CAS(entries) => {
try_join_all(entries.iter().map(|(path, entry)| { let mut tasks = entries
let destination = destination.as_ref().to_path_buf(); .iter()
let cas_path = cas_path.as_ref().to_path_buf(); .map(|(path, entry)| {
let destination = destination.as_ref().to_path_buf();
async move { let cas_path = cas_path.as_ref().to_path_buf();
let path = path.to_path(destination); let path = path.to_path(destination);
let entry = entry.clone();
match entry { async move {
FsEntry::File(hash) => { match entry {
if let Some(parent) = path.parent() { FsEntry::File(hash) => {
fs::create_dir_all(parent).await?; if let Some(parent) = path.parent() {
fs::create_dir_all(parent).await?;
}
let (prefix, rest) = hash.split_at(2);
let cas_file_path = cas_path.join(prefix).join(rest);
if link {
fs::hard_link(cas_file_path, path).await?;
} else {
fs::copy(cas_file_path, &path).await?;
set_readonly(&path, false).await?;
}
} }
FsEntry::Directory => {
let (prefix, rest) = hash.split_at(2); fs::create_dir_all(path).await?;
let cas_file_path = cas_path.join(prefix).join(rest);
if link {
fs::hard_link(cas_file_path, path).await?;
} else {
fs::copy(cas_file_path, &path).await?;
set_readonly(&path, false).await?;
} }
} }
FsEntry::Directory => {
fs::create_dir_all(path).await?; Ok::<_, std::io::Error>(())
}
} }
})
.collect::<JoinSet<_>>();
Ok::<_, std::io::Error>(()) while let Some(task) = tasks.join_next().await {
} task.unwrap()?;
})) }
.await?;
} }
PackageFs::Copy(src, target) => { PackageFs::Copy(src, target) => {
fs::create_dir_all(destination.as_ref()).await?; fs::create_dir_all(destination.as_ref()).await?;