Skip to content

Commit 1b5760d

Browse files
authored
fix(iroh-net): temp fix for progress bar when downloading a hash seq (#1658)
## Description Fixes the download progress bar when downloading a hash seq where the size is not known. ## Notes & open questions Is this good enough? The only way to get a **proper** progress for a hash seq where you don't know the overall size is to do a verified size request before and give the sum of the sizes to the progress logic. But that would be another roundtrip. Fine for GUI stuff, but in many cases we don't really care, so I would hate to do this all the time. ## Change checklist - [x] Self-review. - [ ] Documentation updates if relevant. - [ ] Tests if relevant.
1 parent 215c5fc commit 1b5760d

9 files changed

Lines changed: 147 additions & 129 deletions

File tree

iroh-bytes/src/provider.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -156,26 +156,26 @@ pub enum AddProgress {
156156

157157
/// Progress updates for the get operation.
158158
#[derive(Debug, Serialize, Deserialize)]
159-
pub enum GetProgress {
159+
pub enum DownloadProgress {
160160
/// A new connection was established.
161161
Connected,
162162
/// An item was found with hash `hash`, from now on referred to via `id`.
163163
Found {
164164
/// A new unique id for this entry.
165165
id: u64,
166+
/// child offset
167+
child: u64,
166168
/// The name of the entry.
167169
hash: Hash,
168170
/// The size of the entry in bytes.
169171
size: u64,
170172
},
171173
/// An item was found with hash `hash`, from now on referred to via `id`.
172-
FoundCollection {
174+
FoundHashSeq {
173175
/// The name of the entry.
174176
hash: Hash,
175177
/// Number of children in the collection, if known.
176-
num_blobs: Option<u64>,
177-
/// The size of the entry in bytes, if known.
178-
total_blobs_size: Option<u64>,
178+
children: u64,
179179
},
180180
/// We got progress ingesting item `id`.
181181
Progress {
@@ -279,7 +279,7 @@ pub async fn transfer_collection<D: Map, E: EventSender>(
279279
// if the request is just for the root, we don't need to deserialize the collection
280280
let just_root = matches!(request.ranges.as_single(), Some((0, _)));
281281
let mut c = if !just_root {
282-
// use the collection parser to parse the collection
282+
// parse the hash seq
283283
let (stream, num_blobs) = parse_hash_seq(&mut data).await?;
284284
writer
285285
.events

iroh-bytes/src/store/flat.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1197,7 +1197,7 @@ impl Store {
11971197
meta_path: PathBuf,
11981198
rt: crate::util::runtime::Handle,
11991199
) -> anyhow::Result<Self> {
1200-
tracing::debug!(
1200+
tracing::info!(
12011201
"loading database from {} {}",
12021202
complete_path.display(),
12031203
partial_path.display()

iroh-bytes/src/store/traits.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -332,13 +332,13 @@ async fn gc_mark_task<'a>(
332332
warn!("gc: {} creating data reader failed", hash);
333333
continue;
334334
};
335-
let Ok((mut iter, count)) = parse_hash_seq(reader).await else {
335+
let Ok((mut stream, count)) = parse_hash_seq(reader).await else {
336336
warn!("gc: {} parse failed", hash);
337337
continue;
338338
};
339339
info!("parsed collection {} {:?}", hash, count);
340340
loop {
341-
let item = match iter.next().await {
341+
let item = match stream.next().await {
342342
Ok(Some(item)) => item,
343343
Ok(None) => break,
344344
Err(_err) => {

iroh/src/client.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use crate::rpc_protocol::{
3737
DocDelResponse, DocDropRequest, DocGetManyRequest, DocGetOneRequest, DocImportRequest,
3838
DocLeaveRequest, DocListRequest, DocOpenRequest, DocSetHashRequest, DocSetRequest,
3939
DocShareRequest, DocStartSyncRequest, DocStatusRequest, DocSubscribeRequest, DocTicket,
40-
GetProgress, ListTagsRequest, ListTagsResponse, NodeConnectionInfoRequest,
40+
DownloadProgress, ListTagsRequest, ListTagsResponse, NodeConnectionInfoRequest,
4141
NodeConnectionInfoResponse, NodeConnectionsRequest, NodeShutdownRequest, NodeStatsRequest,
4242
NodeStatusRequest, NodeStatusResponse, ProviderService, SetTagOption, ShareMode, WrapOption,
4343
};
@@ -333,7 +333,7 @@ where
333333
pub async fn download(
334334
&self,
335335
req: BlobDownloadRequest,
336-
) -> Result<impl Stream<Item = Result<GetProgress>>> {
336+
) -> Result<impl Stream<Item = Result<DownloadProgress>>> {
337337
let stream = self.rpc.server_streaming(req).await?;
338338
Ok(stream.map_err(anyhow::Error::from))
339339
}

iroh/src/commands.rs

Lines changed: 47 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use std::collections::BTreeMap;
21
use std::str::FromStr;
32
use std::{net::SocketAddr, path::PathBuf, time::Duration};
43

@@ -11,7 +10,8 @@ use console::style;
1110
use futures::{Stream, StreamExt};
1211
use human_time::ToHumanTimeString;
1312
use indicatif::{
14-
HumanBytes, HumanDuration, ProgressBar, ProgressDrawTarget, ProgressState, ProgressStyle,
13+
HumanBytes, HumanDuration, MultiProgress, ProgressBar, ProgressDrawTarget, ProgressState,
14+
ProgressStyle,
1515
};
1616
use iroh::client::quic::Iroh;
1717
use iroh::dial::Ticket;
@@ -727,15 +727,24 @@ fn fmt_latency(latency: Option<Duration>) -> String {
727727
}
728728
}
729729

730-
const PROGRESS_STYLE: &str =
731-
"{msg}\n{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})";
730+
fn make_overall_progress() -> ProgressBar {
731+
let pb = ProgressBar::hidden();
732+
pb.enable_steady_tick(std::time::Duration::from_millis(100));
733+
pb.set_style(
734+
ProgressStyle::with_template(
735+
"{msg}{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {pos}/{len}",
736+
)
737+
.unwrap()
738+
.progress_chars("#>-"),
739+
);
740+
pb
741+
}
732742

733-
fn make_download_pb() -> ProgressBar {
743+
fn make_individual_progress() -> ProgressBar {
734744
let pb = ProgressBar::hidden();
735-
pb.set_draw_target(ProgressDrawTarget::stderr());
736-
pb.enable_steady_tick(std::time::Duration::from_millis(50));
745+
pb.enable_steady_tick(std::time::Duration::from_millis(100));
737746
pb.set_style(
738-
ProgressStyle::with_template(PROGRESS_STYLE)
747+
ProgressStyle::with_template("{msg}{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})")
739748
.unwrap()
740749
.with_key(
741750
"eta",
@@ -750,75 +759,59 @@ fn make_download_pb() -> ProgressBar {
750759

751760
pub async fn show_download_progress(
752761
hash: Hash,
753-
mut stream: impl Stream<Item = Result<GetProgress>> + Unpin,
762+
mut stream: impl Stream<Item = Result<DownloadProgress>> + Unpin,
754763
) -> Result<()> {
755764
eprintln!("Fetching: {}", hash);
756-
let pb = make_download_pb();
757-
pb.set_message(format!("{} Connecting ...", style("[1/3]").bold().dim()));
758-
let mut sizes = BTreeMap::new();
759-
let mut downloading = false;
765+
let mp = MultiProgress::new();
766+
mp.set_draw_target(ProgressDrawTarget::stderr());
767+
let op = mp.add(make_overall_progress());
768+
let ip = mp.add(make_individual_progress());
769+
op.set_message(format!("{} Connecting ...\n", style("[1/3]").bold().dim()));
770+
let mut seq = false;
760771
while let Some(x) = stream.next().await {
761772
match x? {
762-
GetProgress::Connected => {
763-
pb.set_message(format!("{} Requesting ...", style("[2/3]").bold().dim()));
773+
DownloadProgress::Connected => {
774+
op.set_message(format!("{} Requesting ...\n", style("[2/3]").bold().dim()));
764775
}
765-
GetProgress::FoundCollection {
766-
total_blobs_size,
767-
num_blobs,
768-
..
769-
} => {
770-
let count = num_blobs.unwrap_or_default();
771-
let missing_bytes = total_blobs_size.unwrap_or_default();
772-
pb.set_message(format!(
773-
"{} Downloading {} file(s) with total transfer size {}",
776+
DownloadProgress::FoundHashSeq { children, .. } => {
777+
op.set_message(format!(
778+
"{} Downloading {} blob(s)\n",
774779
style("[3/3]").bold().dim(),
775-
count,
776-
HumanBytes(missing_bytes),
780+
children + 1,
777781
));
778-
pb.set_length(missing_bytes);
779-
pb.reset();
780-
downloading = true;
782+
op.set_length(children + 1);
783+
op.reset();
784+
seq = true;
781785
}
782-
GetProgress::Found { id, size, .. } => {
783-
if !downloading {
784-
pb.set_message(format!(
785-
"{} Downloading blob with size {}",
786-
style("[3/3]").bold().dim(),
787-
size,
788-
));
789-
pb.set_length(size);
790-
pb.reset();
786+
DownloadProgress::Found { size, child, .. } => {
787+
if seq {
788+
op.set_position(child);
789+
} else {
790+
op.finish_and_clear();
791791
}
792-
sizes.insert(id, (size, 0));
792+
ip.set_length(size);
793+
ip.reset();
793794
}
794-
GetProgress::Progress { id, offset } => {
795-
if let Some((_, current)) = sizes.get_mut(&id) {
796-
*current = offset;
797-
let total = sizes.values().map(|(_, current)| current).sum::<u64>();
798-
pb.set_position(total);
799-
}
795+
DownloadProgress::Progress { offset, .. } => {
796+
ip.set_position(offset);
800797
}
801-
GetProgress::Done { id } => {
802-
if let Some((size, current)) = sizes.get_mut(&id) {
803-
*current = *size;
804-
let total = sizes.values().map(|(_, current)| current).sum::<u64>();
805-
pb.set_position(total);
806-
}
798+
DownloadProgress::Done { .. } => {
799+
ip.finish_and_clear();
807800
}
808-
GetProgress::NetworkDone {
801+
DownloadProgress::NetworkDone {
809802
bytes_read,
810803
elapsed,
811804
..
812805
} => {
813-
pb.finish_and_clear();
806+
op.finish_and_clear();
814807
eprintln!(
815808
"Transferred {} in {}, {}/s",
816809
HumanBytes(bytes_read),
817810
HumanDuration(elapsed),
818811
HumanBytes((bytes_read as f64 / elapsed.as_secs_f64()) as u64)
819812
);
820813
}
821-
GetProgress::AllDone => {
814+
DownloadProgress::AllDone => {
822815
break;
823816
}
824817
_ => {}

iroh/src/commands/get.rs

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use iroh_bytes::{
1717
Hash,
1818
};
1919
use iroh_bytes::{
20-
provider::GetProgress,
20+
provider::DownloadProgress,
2121
util::{
2222
progress::{FlumeProgressSender, IdGenerator, ProgressSender},
2323
BlobFormat,
@@ -130,7 +130,7 @@ impl GetInteractive {
130130
let response = fsm::start(connection, request);
131131
let connected = response.next().await?;
132132
// we are connected
133-
sender.send(GetProgress::Connected).await?;
133+
sender.send(DownloadProgress::Connected).await?;
134134
let ConnectedNext::StartRoot(curr) = connected.next().await? else {
135135
anyhow::bail!("expected root to be present");
136136
};
@@ -140,13 +140,13 @@ impl GetInteractive {
140140
get_to_stdout_multi(curr, sender.clone()).await?
141141
};
142142
sender
143-
.send(GetProgress::NetworkDone {
143+
.send(DownloadProgress::NetworkDone {
144144
bytes_written: stats.bytes_written,
145145
bytes_read: stats.bytes_read,
146146
elapsed: stats.elapsed,
147147
})
148148
.await?;
149-
sender.send(GetProgress::AllDone).await?;
149+
sender.send(DownloadProgress::AllDone).await?;
150150
display_task.await??;
151151

152152
Ok(())
@@ -155,22 +155,31 @@ impl GetInteractive {
155155

156156
async fn get_to_stdout_single(
157157
curr: get::fsm::AtStartRoot,
158-
sender: FlumeProgressSender<GetProgress>,
158+
sender: FlumeProgressSender<DownloadProgress>,
159159
) -> Result<get::Stats> {
160160
let curr = curr.next();
161161
let id = sender.new_id();
162162
let hash = curr.hash();
163163
let (curr, size) = curr.next().await?;
164-
sender.send(GetProgress::Found { id, hash, size }).await?;
164+
sender
165+
.send(DownloadProgress::Found {
166+
id,
167+
hash,
168+
size,
169+
child: 0,
170+
})
171+
.await?;
165172
let sender2 = sender.clone();
166173
let mut writer = ProgressSliceWriter::new(
167174
ConcatenateSliceWriter::new(tokio::io::stdout()),
168175
move |offset| {
169-
sender2.try_send(GetProgress::Progress { id, offset }).ok();
176+
sender2
177+
.try_send(DownloadProgress::Progress { id, offset })
178+
.ok();
170179
},
171180
);
172181
let curr = curr.write_all(&mut writer).await?;
173-
sender.send(GetProgress::Done { id }).await?;
182+
sender.send(DownloadProgress::Done { id }).await?;
174183
let EndBlobNext::Closing(curr) = curr.next() else {
175184
anyhow::bail!("expected end of stream")
176185
};
@@ -179,15 +188,14 @@ async fn get_to_stdout_single(
179188

180189
async fn get_to_stdout_multi(
181190
curr: get::fsm::AtStartRoot,
182-
sender: FlumeProgressSender<GetProgress>,
191+
sender: FlumeProgressSender<DownloadProgress>,
183192
) -> Result<get::Stats> {
184193
let hash = curr.hash();
185-
let (mut next, _links, collection) = Collection::read_fsm(curr).await?;
194+
let (mut next, links, collection) = Collection::read_fsm(curr).await?;
186195
sender
187-
.send(GetProgress::FoundCollection {
196+
.send(DownloadProgress::FoundHashSeq {
188197
hash,
189-
num_blobs: Some(collection.total_entries()),
190-
total_blobs_size: Some(collection.total_blobs_size()),
198+
children: links.len() as u64,
191199
})
192200
.await?;
193201
let collection = collection.into_inner();
@@ -207,21 +215,24 @@ async fn get_to_stdout_multi(
207215
let id = sender.new_id();
208216
let (curr, size) = header.next().await?;
209217
sender
210-
.send(GetProgress::Found {
218+
.send(DownloadProgress::Found {
211219
id,
212220
hash: blob.hash,
213221
size,
222+
child: curr.offset(),
214223
})
215224
.await?;
216225
let sender2 = sender.clone();
217226
let mut io_writer = ProgressSliceWriter::new(
218227
ConcatenateSliceWriter::new(tokio::io::stdout()),
219228
move |offset| {
220-
sender2.try_send(GetProgress::Progress { id, offset }).ok();
229+
sender2
230+
.try_send(DownloadProgress::Progress { id, offset })
231+
.ok();
221232
},
222233
);
223234
let curr = curr.write_all(&mut io_writer).await?;
224-
sender.send(GetProgress::Done { id }).await?;
235+
sender.send(DownloadProgress::Done { id }).await?;
225236
// wait for the progress task to finish, only after dropping the writer
226237
next = curr.next();
227238
};

0 commit comments

Comments
 (0)