Skip to content

Commit 0c145d5

Browse files
feat(iroh): pass a runtime to Doc client to spawn close task on drop (#1758)
We spawn a tokio task when dropping a client `Doc` handle to notify the node that the document was closed. This does not work in the FFI currently because the FFI does not run in a global tokio context. Instead we now always pass a `runtime::Handle` into the client which can be used to spawn tasks independently of running in a tokio context. --------- Co-authored-by: Franz Heinzmann (Frando) <frando@unbiskant.org>
1 parent 10f5982 commit 0c145d5

5 files changed

Lines changed: 60 additions & 16 deletions

File tree

iroh/src/client.rs

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use futures::stream::BoxStream;
1717
use futures::{SinkExt, Stream, StreamExt, TryStreamExt};
1818
use iroh_bytes::provider::AddProgress;
1919
use iroh_bytes::store::ValidateProgress;
20+
use iroh_bytes::util::runtime;
2021
use iroh_bytes::Hash;
2122
use iroh_bytes::{BlobFormat, Tag};
2223
use iroh_net::{key::PublicKey, magic_endpoint::ConnectionInfo, PeerAddr};
@@ -67,11 +68,14 @@ where
6768
C: ServiceConnection<ProviderService>,
6869
{
6970
/// Create a new high-level client to a Iroh node from the low-level RPC client.
70-
pub fn new(rpc: RpcClient<ProviderService, C>) -> Self {
71+
pub fn new(rpc: RpcClient<ProviderService, C>, rt: runtime::Handle) -> Self {
7172
Self {
7273
node: NodeClient { rpc: rpc.clone() },
7374
blobs: BlobsClient { rpc: rpc.clone() },
74-
docs: DocsClient { rpc: rpc.clone() },
75+
docs: DocsClient {
76+
rpc: rpc.clone(),
77+
rt,
78+
},
7579
authors: AuthorsClient { rpc: rpc.clone() },
7680
tags: TagsClient { rpc },
7781
}
@@ -129,6 +133,7 @@ where
129133
#[derive(Debug, Clone)]
130134
pub struct DocsClient<C> {
131135
rpc: RpcClient<ProviderService, C>,
136+
rt: runtime::Handle,
132137
}
133138

134139
impl<C> DocsClient<C>
@@ -138,7 +143,7 @@ where
138143
/// Create a new document.
139144
pub async fn create(&self) -> Result<Doc<C>> {
140145
let res = self.rpc.rpc(DocCreateRequest {}).await??;
141-
let doc = Doc::new(self.rpc.clone(), res.id);
146+
let doc = Doc::new(self.rt.clone(), self.rpc.clone(), res.id);
142147
Ok(doc)
143148
}
144149

@@ -155,7 +160,7 @@ where
155160
/// Import a document from a ticket and join all peers in the ticket.
156161
pub async fn import(&self, ticket: DocTicket) -> Result<Doc<C>> {
157162
let res = self.rpc.rpc(DocImportRequest(ticket)).await??;
158-
let doc = Doc::new(self.rpc.clone(), res.doc_id);
163+
let doc = Doc::new(self.rt.clone(), self.rpc.clone(), res.doc_id);
159164
Ok(doc)
160165
}
161166

@@ -168,7 +173,7 @@ where
168173
/// Get a [`Doc`] client for a single document. Return None if the document cannot be found.
169174
pub async fn open(&self, id: NamespaceId) -> Result<Option<Doc<C>>> {
170175
self.rpc.rpc(DocOpenRequest { doc_id: id }).await??;
171-
let doc = Doc::new(self.rpc.clone(), id);
176+
let doc = Doc::new(self.rt.clone(), self.rpc.clone(), id);
172177
Ok(Some(doc))
173178
}
174179
}
@@ -523,6 +528,7 @@ struct DocInner<C: ServiceConnection<ProviderService>> {
523528
id: NamespaceId,
524529
rpc: RpcClient<ProviderService, C>,
525530
closed: AtomicBool,
531+
rt: runtime::Handle,
526532
}
527533

528534
impl<C> Drop for DocInner<C>
@@ -532,7 +538,7 @@ where
532538
fn drop(&mut self) {
533539
let doc_id = self.id;
534540
let rpc = self.rpc.clone();
535-
tokio::task::spawn(async move {
541+
self.rt.main().spawn(async move {
536542
rpc.rpc(DocCloseRequest { doc_id }).await.ok();
537543
});
538544
}
@@ -542,11 +548,12 @@ impl<C> Doc<C>
542548
where
543549
C: ServiceConnection<ProviderService>,
544550
{
545-
fn new(rpc: RpcClient<ProviderService, C>, id: NamespaceId) -> Self {
551+
fn new(rt: runtime::Handle, rpc: RpcClient<ProviderService, C>, id: NamespaceId) -> Self {
546552
Self(Arc::new(DocInner {
547553
rpc,
548554
id,
549555
closed: AtomicBool::new(false),
556+
rt,
550557
}))
551558
}
552559

@@ -741,3 +748,35 @@ where
741748
Err(err) => Err(err.into()),
742749
})
743750
}
751+
752+
#[cfg(test)]
753+
mod tests {
754+
use super::*;
755+
756+
use iroh_bytes::util::runtime;
757+
758+
#[tokio::test]
759+
async fn test_drop_doc_client_sync() -> Result<()> {
760+
let db = iroh_bytes::store::readonly_mem::Store::default();
761+
let doc_store = iroh_sync::store::memory::Store::default();
762+
let rt = runtime::Handle::from_current(1)?;
763+
let node = crate::node::Node::builder(db, doc_store)
764+
.runtime(&rt)
765+
.spawn()
766+
.await?;
767+
768+
let client = node.client();
769+
let doc = client.docs.create().await?;
770+
771+
let res = std::thread::spawn(move || {
772+
drop(doc);
773+
drop(client);
774+
drop(node);
775+
});
776+
777+
tokio::task::spawn_blocking(move || res.join().map_err(|e| anyhow::anyhow!("{:?}", e)))
778+
.await??;
779+
780+
Ok(())
781+
}
782+
}

iroh/src/client/quic.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::{
66
time::Duration,
77
};
88

9+
use iroh_bytes::util::runtime;
910
use quic_rpc::transport::quinn::QuinnConnection;
1011

1112
use crate::rpc_protocol::{NodeStatusRequest, ProviderRequest, ProviderResponse, ProviderService};
@@ -26,9 +27,13 @@ pub type Iroh = super::Iroh<QuinnConnection<ProviderResponse, ProviderRequest>>;
2627
pub type Doc = super::Doc<QuinnConnection<ProviderResponse, ProviderRequest>>;
2728

2829
/// Connect to an iroh node running on the same computer, but in a different process.
29-
pub async fn connect(rpc_port: u16) -> anyhow::Result<Iroh> {
30+
pub async fn connect(rpc_port: u16, rt: Option<runtime::Handle>) -> anyhow::Result<Iroh> {
31+
let rt = match rt {
32+
Some(rt) => rt,
33+
None => runtime::Handle::from_current(1)?,
34+
};
3035
let client = connect_raw(rpc_port).await?;
31-
Ok(Iroh::new(client))
36+
Ok(Iroh::new(client, rt))
3237
}
3338

3439
/// Create a raw RPC client to an iroh node running on the same computer, but in a different

iroh/src/commands.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,15 +84,15 @@ pub struct FullArgs {
8484
}
8585

8686
impl Cli {
87-
pub async fn run(self, rt: &runtime::Handle) -> Result<()> {
87+
pub async fn run(self, rt: runtime::Handle) -> Result<()> {
8888
match self.command {
8989
Commands::Console => {
90-
let iroh = iroh::client::quic::connect(self.rpc_args.rpc_port).await?;
90+
let iroh = iroh::client::quic::connect(self.rpc_args.rpc_port, Some(rt)).await?;
9191
let env = ConsoleEnv::for_console()?;
9292
repl::run(&iroh, &env).await
9393
}
9494
Commands::Rpc(command) => {
95-
let iroh = iroh::client::quic::connect(self.rpc_args.rpc_port).await?;
95+
let iroh = iroh::client::quic::connect(self.rpc_args.rpc_port, Some(rt)).await?;
9696
let env = ConsoleEnv::for_cli()?;
9797
command.run(&iroh, &env).await
9898
}
@@ -106,9 +106,9 @@ impl Cli {
106106
let config = NodeConfig::from_env(cfg.as_deref())?;
107107

108108
#[cfg(feature = "metrics")]
109-
let metrics_fut = start_metrics_server(metrics_addr, rt);
109+
let metrics_fut = start_metrics_server(metrics_addr, &rt);
110110

111-
let res = command.run(rt, &config, keylog).await;
111+
let res = command.run(&rt, &config, keylog).await;
112112

113113
#[cfg(feature = "metrics")]
114114
if let Some(metrics_fut) = metrics_fut {

iroh/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,5 +33,5 @@ async fn main_impl() -> Result<()> {
3333
.init();
3434

3535
let cli = Cli::parse();
36-
cli.run(&rt).await
36+
cli.run(rt).await
3737
}

iroh/src/node.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -743,7 +743,7 @@ impl<D: ReadableStore> Node<D> {
743743

744744
/// Return a client to control this node over an in-memory channel.
745745
pub fn client(&self) -> crate::client::mem::Iroh {
746-
crate::client::Iroh::new(self.controller())
746+
crate::client::Iroh::new(self.controller(), self.inner.rt.clone())
747747
}
748748

749749
/// Return a single token containing everything needed to get a hash.

0 commit comments

Comments
 (0)