diff --git a/src/cli/reporters.rs b/src/cli/reporters.rs index 9c09a4b..8d71024 100644 --- a/src/cli/reporters.rs +++ b/src/cli/reporters.rs @@ -71,6 +71,7 @@ pub struct CliReporter { writer: Mutex, child_style: ProgressStyle, child_style_with_bytes: ProgressStyle, + child_style_with_bytes_without_total: ProgressStyle, multi_progress: MultiProgress, root_progress: ProgressBar, } @@ -88,6 +89,10 @@ impl CliReporter { &"{msg} {bytes}/{total_bytes}".dimmed().to_string(), ) .unwrap(), + child_style_with_bytes_without_total: ProgressStyle::with_template( + &"{msg} {bytes}".dimmed().to_string(), + ) + .unwrap(), multi_progress, root_progress, } @@ -135,7 +140,15 @@ impl DownloadProgressReporter progress.set_position(len); self.set_progress.call_once(|| { - progress.set_style(self.root_reporter.child_style_with_bytes.clone()); + if total > 0 { + progress.set_style(self.root_reporter.child_style_with_bytes.clone()); + } else { + progress.set_style( + self.root_reporter + .child_style_with_bytes_without_total + .clone(), + ); + } }); } } diff --git a/src/source/wally/mod.rs b/src/source/wally/mod.rs index a6ded61..6ffd7b2 100644 --- a/src/source/wally/mod.rs +++ b/src/source/wally/mod.rs @@ -18,7 +18,7 @@ use crate::{ Project, }; use fs_err::tokio as fs; -use futures::future::try_join_all; +use futures::{future::try_join_all, StreamExt}; use gix::Url; use relative_path::RelativePathBuf; use reqwest::header::AUTHORIZATION; @@ -29,8 +29,12 @@ use std::{ sync::Arc, }; use tempfile::tempdir; -use tokio::{io::AsyncWriteExt, sync::Mutex, task::spawn_blocking}; -use tokio_util::compat::FuturesAsyncReadCompatExt; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + sync::Mutex, + task::spawn_blocking, +}; +use tokio_util::{compat::FuturesAsyncReadCompatExt, io::StreamReader}; use tracing::instrument; pub(crate) mod compat_util; @@ -203,7 +207,7 @@ impl PackageSource for WallyPackageSource { pkg_ref: &Self::Ref, project: &Project, reqwest: &reqwest::Client, - _reporter: Arc, + reporter: Arc, ) -> Result<(PackageFS, Target), Self::DownloadError> { let config = self.config(project).await.map_err(Box::new)?; let index_file = project @@ -252,12 +256,30 @@ impl PackageSource for WallyPackageSource { } let response = request.send().await?.error_for_status()?; - let mut bytes = response.bytes().await?; - let archive = async_zip::tokio::read::seek::ZipFileReader::with_tokio( - std::io::Cursor::new(&mut bytes), - ) - .await?; + let total_len = response.content_length().unwrap_or(0); + reporter.report_progress(total_len, 0); + + let mut bytes_downloaded = 0; + let bytes = response + .bytes_stream() + .inspect(|chunk| { + chunk.as_ref().ok().inspect(|chunk| { + bytes_downloaded += chunk.len() as u64; + reporter.report_progress(total_len, bytes_downloaded); + }); + }) + .map(|result| { + result.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) + }); + + let mut bytes = StreamReader::new(bytes); + let mut buf = vec![]; + bytes.read_to_end(&mut buf).await?; + + let archive = + async_zip::tokio::read::seek::ZipFileReader::with_tokio(std::io::Cursor::new(&mut buf)) + .await?; let entries = (0..archive.file().entries().len()) .map(|index| { @@ -330,6 +352,8 @@ impl PackageSource for WallyPackageSource { .await .map_err(errors::DownloadError::WriteIndex)?; + reporter.report_done(); + Ok((fs, get_target(project, &tempdir).await?)) } }