Skip to content

Commit 8b6a5bc

Browse files
b5dignifiedquire
andauthored
feat(provider): add 'CollectionAdded' Provider event (#1131)
Co-authored-by: dignifiedquire <me@dignifiedquire.com>
1 parent ac6bb1a commit 8b6a5bc

3 files changed

Lines changed: 72 additions & 3 deletions

File tree

iroh-bytes/src/provider.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,11 @@ pub use ticket::Ticket;
3434
/// Events emitted by the provider informing about the current status.
3535
#[derive(Debug, Clone)]
3636
pub enum Event {
37+
/// A new collection has been added
38+
CollectionAdded {
39+
/// The hash of the added collection
40+
hash: Hash,
41+
},
3742
/// A new client connected to the node.
3843
ClientConnected {
3944
/// An unique connection id.

iroh/src/commands/provide.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ async fn get_keypair(key: Option<PathBuf>) -> Result<Keypair> {
175175
}
176176
}
177177

178+
/// Makes a an RPC endpoint that uses a QUIC transport
178179
fn make_rpc_endpoint(
179180
keypair: &Keypair,
180181
rpc_port: u16,

iroh/src/node.rs

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use quic_rpc::{RpcClient, RpcServer, ServiceConnection, ServiceEndpoint};
3535
use tokio::sync::{broadcast, mpsc};
3636
use tokio::task::JoinError;
3737
use tokio_util::sync::CancellationToken;
38-
use tracing::{debug, trace};
38+
use tracing::{debug, trace, warn};
3939

4040
use crate::rpc_protocol::{
4141
AddrsRequest, AddrsResponse, IdRequest, IdResponse, ListBlobsRequest, ListBlobsResponse,
@@ -222,7 +222,7 @@ where
222222
// the size of this channel must be large because the producer can be on
223223
// a different thread than the consumer, and can produce a lot of events
224224
// in a short time
225-
let (events_sender, _events_receiver) = broadcast::channel(256);
225+
let (events_sender, _events_receiver) = broadcast::channel(512);
226226
let events = events_sender.clone();
227227
let cancel_token = CancellationToken::new();
228228

@@ -585,13 +585,19 @@ impl RpcHandler {
585585
let data_sources = iroh_bytes::provider::create_data_sources(root)?;
586586
// create the collection
587587
// todo: provide feedback for progress
588-
let (db, _) = iroh_bytes::provider::collection::create_collection(
588+
let (db, hash) = iroh_bytes::provider::collection::create_collection(
589589
data_sources,
590590
Progress::new(progress),
591591
)
592592
.await?;
593593
self.inner.db.union_with(db);
594594

595+
if let Err(e) = self.inner.events.send(Event::ByteProvide(
596+
iroh_bytes::provider::Event::CollectionAdded { hash },
597+
)) {
598+
warn!("failed to send CollectionAdded event: {:?}", e);
599+
};
600+
595601
Ok(())
596602
}
597603
async fn version(self, _: VersionRequest) -> VersionResponse {
@@ -699,6 +705,9 @@ pub fn make_server_config(
699705

700706
#[cfg(test)]
701707
mod tests {
708+
use anyhow::bail;
709+
use futures::StreamExt;
710+
use std::collections::HashMap;
702711
use std::net::Ipv4Addr;
703712
use std::path::Path;
704713

@@ -728,4 +737,58 @@ mod tests {
728737
println!("addrs: {:?}", ticket.addrs());
729738
assert!(!ticket.addrs().is_empty());
730739
}
740+
741+
#[tokio::test]
742+
async fn test_node_add_collection_event() -> Result<()> {
743+
let db = Database::from(HashMap::new());
744+
let node = Builder::with_db(db)
745+
.bind_addr((Ipv4Addr::UNSPECIFIED, 0).into())
746+
.runtime(&test_runtime())
747+
.spawn()
748+
.await?;
749+
750+
let _drop_guard = node.cancel_token().drop_guard();
751+
752+
let mut events = node.subscribe();
753+
let provide_handle = tokio::spawn(async move {
754+
while let Ok(msg) = events.recv().await {
755+
if let Event::ByteProvide(iroh_bytes::provider::Event::CollectionAdded { hash }) =
756+
msg
757+
{
758+
return Some(hash);
759+
}
760+
}
761+
None
762+
});
763+
764+
let got_hash = tokio::time::timeout(Duration::from_secs(1), async move {
765+
let mut stream = node
766+
.controller()
767+
.server_streaming(ProvideRequest {
768+
path: Path::new(env!("CARGO_MANIFEST_DIR")).join("README.md"),
769+
})
770+
.await?;
771+
772+
while let Some(item) = stream.next().await {
773+
match item? {
774+
ProvideProgress::AllDone { hash } => {
775+
return Ok(hash);
776+
}
777+
ProvideProgress::Abort(e) => {
778+
bail!("Error while adding data: {e}");
779+
}
780+
_ => {}
781+
}
782+
}
783+
bail!("stream ended without providing data");
784+
})
785+
.await
786+
.context("timeout")?
787+
.context("get failed")?;
788+
789+
let event_hash = provide_handle.await?.expect("missing collection event");
790+
assert_eq!(got_hash, event_hash);
791+
792+
Ok(())
793+
}
731794
}

0 commit comments

Comments
 (0)