Skip to content

Commit 6c07ad3

Browse files
authored
feat(sync): track incoming sync requests, allow subscriptions without sync, close inactive replicas (#1491)
## Description This PR adds a few missing parts for sync in iroh: * Allow to subscribe to replicas if they're not in the set of syncing replicas (this was not possible before). * Properly track open replicas in the `LiveSync` and close them once they're not needed anymore. Therefore adds `iroh_sync::store::close_replica` to remove a replica instance from the store-internal cache of open replicas. This is needed because otherwise it grows unbounded (for anchor nodes especially). * Pass incoming connections to the `LiveSync` actor and handle them there. Before, incoming connections for sync were *always* processed if a replica with the requested `NamespaceId` is found in the local replica store. This PR changes this mechanism so that incoming sync connections are passed into the sync engine first. This leads to the changes that motivate this PR: * Incoming sync requests are only answered if the document (replica) in question was opened for sync before (via a call to `start_sync`). Note that this is *not* persisted at the moment, but should be (separate change though). * We can track the sync status for outgoing *and incoming* sync flows. This will reduce the number of sync connections, because currently we will oftenly double-sync in both directions (unneeded, because the sync is always bidirectional). Now, we abort if a sync is already in progress. * Expose events about successfull and failed sync runs to the upper layer via the document subscriptions with a new `LiveEvent:.SyncFinished` that reports all available information ## Notes & open questions Do we want to track history of syncs internally or leave it up to the app via the events? Not sure myself. The diff of `sync_engine/live.rs` is a little hard to read, because there's a change from `TopicId` to `NamespaceId` in a few places that is mostly unrelated. Sorry for the mixup, but it makes sense to track everything by `NamespaceId` and it ended up in a commit here. Logging is much improved in this PR. Maybe we can improve the code flow a little to make things more obvious. <!-- Any notes, remarks or open questions you have to make about the PR. --> ## Change checklist - [x] Self-review. - [x] Documentation updates if relevant. - [ ] Tests if relevant.
1 parent 8fe3f71 commit 6c07ad3

15 files changed

Lines changed: 840 additions & 251 deletions

File tree

iroh-sync/src/keys.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,18 @@ impl fmt::Debug for Namespace {
201201
}
202202
}
203203

204+
impl fmt::Debug for NamespaceId {
205+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
206+
write!(f, "NamespaceId({})", base32::fmt_short(self.0))
207+
}
208+
}
209+
210+
impl fmt::Debug for AuthorId {
211+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212+
write!(f, "AuthorId({})", base32::fmt_short(self.0))
213+
}
214+
}
215+
204216
impl fmt::Debug for Author {
205217
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
206218
write!(f, "Author({})", self)
@@ -321,6 +333,14 @@ pub(super) mod base32 {
321333
text.make_ascii_lowercase();
322334
text
323335
}
336+
/// Convert to a base32 string limited to the first 10 bytes
337+
pub fn fmt_short(bytes: impl AsRef<[u8]>) -> String {
338+
let len = bytes.as_ref().len().min(10);
339+
let mut text = data_encoding::BASE32_NOPAD.encode(&bytes.as_ref()[..len]);
340+
text.make_ascii_lowercase();
341+
text.push('…');
342+
text
343+
}
324344
/// Parse from a base32 string into a byte array
325345
pub fn parse_array<const N: usize>(input: &str) -> anyhow::Result<[u8; N]> {
326346
data_encoding::BASE32_NOPAD
@@ -332,7 +352,6 @@ pub(super) mod base32 {
332352

333353
/// [`NamespacePublicKey`] in bytes
334354
#[derive(
335-
Debug,
336355
Default,
337356
Clone,
338357
Copy,
@@ -351,7 +370,6 @@ pub struct NamespaceId([u8; 32]);
351370

352371
/// [`AuthorPublicKey`] in bytes
353372
#[derive(
354-
Debug,
355373
Default,
356374
Clone,
357375
Copy,

iroh-sync/src/metrics.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@ pub struct Metrics {
1313
pub new_entries_remote: Counter,
1414
pub new_entries_local_size: Counter,
1515
pub new_entries_remote_size: Counter,
16-
pub initial_sync_success: Counter,
17-
pub initial_sync_failed: Counter,
16+
pub sync_via_connect_success: Counter,
17+
pub sync_via_connect_failure: Counter,
18+
pub sync_via_accept_success: Counter,
19+
pub sync_via_accept_failure: Counter,
1820
}
1921

2022
impl Default for Metrics {
@@ -24,8 +26,10 @@ impl Default for Metrics {
2426
new_entries_remote: Counter::new("Number of document entries added by peers"),
2527
new_entries_local_size: Counter::new("Total size of entry contents added locally"),
2628
new_entries_remote_size: Counter::new("Total size of entry contents added by peers"),
27-
initial_sync_success: Counter::new("Number of successfull initial syncs "),
28-
initial_sync_failed: Counter::new("Number of failed initial syncs"),
29+
sync_via_accept_success: Counter::new("Number of successfull syncs (via accept)"),
30+
sync_via_accept_failure: Counter::new("Number of failed syncs (via accept)"),
31+
sync_via_connect_success: Counter::new("Number of successfull syncs (via connect)"),
32+
sync_via_connect_failure: Counter::new("Number of failed syncs (via connect)"),
2933
}
3034
}
3135
}

iroh-sync/src/net.rs

Lines changed: 149 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! Network implementation of the iroh-sync protocol
22
3-
use std::net::SocketAddr;
3+
use std::{future::Future, net::SocketAddr};
44

55
use anyhow::{Context, Result};
66
use iroh_net::{key::PublicKey, magic_endpoint::get_peer_id, MagicEndpoint};
@@ -10,6 +10,7 @@ use crate::{
1010
net::codec::{run_alice, run_bob},
1111
store,
1212
sync::Replica,
13+
NamespaceId,
1314
};
1415

1516
#[cfg(feature = "metrics")]
@@ -26,52 +27,176 @@ mod codec;
2627
pub async fn connect_and_sync<S: store::Store>(
2728
endpoint: &MagicEndpoint,
2829
doc: &Replica<S::Instance>,
29-
peer_id: PublicKey,
30+
peer: PublicKey,
3031
derp_region: Option<u16>,
3132
addrs: &[SocketAddr],
32-
) -> anyhow::Result<()> {
33-
debug!("sync with peer {}: start", peer_id);
33+
) -> Result<()> {
34+
debug!(peer = ?peer, "sync (via connect): start");
35+
let namespace = doc.namespace();
3436
let connection = endpoint
35-
.connect(peer_id, SYNC_ALPN, derp_region, addrs)
37+
.connect(peer, SYNC_ALPN, derp_region, addrs)
3638
.await
37-
.context("dial_and_sync")?;
39+
.context("failed to establish connection")?;
40+
debug!(?peer, ?namespace, "sync (via connect): connected");
3841
let (mut send_stream, mut recv_stream) = connection.open_bi().await?;
39-
let res = run_alice::<S, _, _>(&mut send_stream, &mut recv_stream, doc, peer_id).await;
42+
let res = run_alice::<S, _, _>(&mut send_stream, &mut recv_stream, doc, peer).await;
43+
send_stream.finish().await?;
44+
recv_stream.read_to_end(0).await?;
4045

4146
#[cfg(feature = "metrics")]
4247
if res.is_ok() {
43-
inc!(Metrics, initial_sync_success);
48+
inc!(Metrics, sync_via_connect_success);
4449
} else {
45-
inc!(Metrics, initial_sync_failed);
50+
inc!(Metrics, sync_via_connect_failure);
4651
}
4752

48-
debug!("sync with peer {}: finish {:?}", peer_id, res);
53+
debug!(peer = ?peer, ?res, "sync (via connect): done");
4954
res
5055
}
5156

57+
/// What to do with incoming sync requests
58+
#[derive(Debug)]
59+
pub enum AcceptOutcome<S: store::Store> {
60+
/// This namespace is not available for sync.
61+
NotAvailable,
62+
/// This namespace is already syncing, therefore abort.
63+
AlreadySyncing,
64+
/// Accept the sync request.
65+
Accept(Replica<S::Instance>),
66+
}
67+
68+
impl<S: store::Store> From<Option<Replica<S::Instance>>> for AcceptOutcome<S> {
69+
fn from(replica: Option<Replica<S::Instance>>) -> Self {
70+
match replica {
71+
Some(replica) => AcceptOutcome::Accept(replica),
72+
None => AcceptOutcome::NotAvailable,
73+
}
74+
}
75+
}
76+
5277
/// Handle an iroh-sync connection and sync all shared documents in the replica store.
53-
pub async fn handle_connection<S: store::Store>(
78+
pub async fn handle_connection<S, F, Fut>(
5479
connecting: quinn::Connecting,
55-
replica_store: S,
56-
) -> Result<()> {
57-
let connection = connecting.await?;
58-
let peer_id = get_peer_id(&connection).await?;
59-
let (mut send_stream, mut recv_stream) = connection.accept_bi().await?;
60-
debug!(peer = ?peer_id, "incoming sync: start");
80+
accept_cb: F,
81+
) -> std::result::Result<(NamespaceId, PublicKey), SyncError>
82+
where
83+
S: store::Store,
84+
F: Fn(NamespaceId, PublicKey) -> Fut,
85+
Fut: Future<Output = anyhow::Result<AcceptOutcome<S>>>,
86+
{
87+
let connection = connecting.await.map_err(SyncError::connect)?;
88+
let peer = get_peer_id(&connection).await.map_err(SyncError::connect)?;
89+
let (mut send_stream, mut recv_stream) = connection
90+
.accept_bi()
91+
.await
92+
.map_err(|error| SyncError::open(peer, error))?;
93+
debug!(peer = ?peer, "sync (via accept): start");
6194

62-
let res = run_bob(&mut send_stream, &mut recv_stream, replica_store, peer_id).await;
95+
let res = run_bob::<S, _, _, _, _>(&mut send_stream, &mut recv_stream, accept_cb, peer).await;
6396

6497
#[cfg(feature = "metrics")]
6598
if res.is_ok() {
66-
inc!(Metrics, initial_sync_success);
99+
inc!(Metrics, sync_via_accept_success);
67100
} else {
68-
inc!(Metrics, initial_sync_failed);
101+
inc!(Metrics, sync_via_accept_failure);
69102
}
70103

71-
res?;
72-
send_stream.finish().await?;
104+
debug!(peer = ?peer, ?res, "sync (via accept): done");
105+
106+
let namespace = res?;
107+
send_stream
108+
.finish()
109+
.await
110+
.map_err(|error| SyncError::close(peer, namespace, error))?;
111+
recv_stream
112+
.read_to_end(0)
113+
.await
114+
.map_err(|error| SyncError::close(peer, namespace, error))?;
115+
Ok((namespace, peer))
116+
}
73117

74-
debug!(peer = ?peer_id, "incoming sync: done");
118+
/// Failure reasons for sync.
119+
#[derive(thiserror::Error, Debug)]
120+
#[allow(missing_docs)]
121+
pub enum SyncError {
122+
/// Failed to establish connection
123+
#[error("Failed to establish connection")]
124+
Connect {
125+
#[source]
126+
error: anyhow::Error,
127+
},
128+
/// Failed to open replica
129+
#[error("Failed to open replica with {peer:?}")]
130+
Open {
131+
peer: PublicKey,
132+
#[source]
133+
error: anyhow::Error,
134+
},
135+
/// Failed to run sync
136+
#[error("Failed to sync {namespace:?} with {peer:?}")]
137+
Sync {
138+
peer: PublicKey,
139+
namespace: Option<NamespaceId>,
140+
#[source]
141+
error: anyhow::Error,
142+
},
143+
/// Failed to close
144+
#[error("Failed to close {namespace:?} with {peer:?}")]
145+
Close {
146+
peer: PublicKey,
147+
namespace: NamespaceId,
148+
#[source]
149+
error: anyhow::Error,
150+
},
151+
}
75152

76-
Ok(())
153+
impl SyncError {
154+
fn connect(error: impl Into<anyhow::Error>) -> Self {
155+
Self::Connect {
156+
error: error.into(),
157+
}
158+
}
159+
fn open(peer: PublicKey, error: impl Into<anyhow::Error>) -> Self {
160+
Self::Open {
161+
peer,
162+
error: error.into(),
163+
}
164+
}
165+
pub(crate) fn sync(
166+
peer: PublicKey,
167+
namespace: Option<NamespaceId>,
168+
error: impl Into<anyhow::Error>,
169+
) -> Self {
170+
Self::Sync {
171+
peer,
172+
namespace,
173+
error: error.into(),
174+
}
175+
}
176+
fn close(peer: PublicKey, namespace: NamespaceId, error: impl Into<anyhow::Error>) -> Self {
177+
Self::Close {
178+
peer,
179+
namespace,
180+
error: error.into(),
181+
}
182+
}
183+
/// Get the peer's node ID (if available)
184+
pub fn peer(&self) -> Option<PublicKey> {
185+
match self {
186+
SyncError::Connect { .. } => None,
187+
SyncError::Open { peer, .. } => Some(*peer),
188+
SyncError::Sync { peer, .. } => Some(*peer),
189+
SyncError::Close { peer, .. } => Some(*peer),
190+
}
191+
}
192+
193+
/// Get the namespace (if available)
194+
pub fn namespace(&self) -> Option<NamespaceId> {
195+
match self {
196+
SyncError::Connect { .. } => None,
197+
SyncError::Open { .. } => None,
198+
SyncError::Sync { namespace, .. } => namespace.to_owned(),
199+
SyncError::Close { namespace, .. } => Some(*namespace),
200+
}
201+
}
77202
}

0 commit comments

Comments
 (0)