Skip to content

Commit d8cc9df

Browse files
fix: avoid double conns, better state tracking (#1505)
## Description On `main` the test `sync_full_basic` is flakey. This PR (hopefully) fixes it. The reason was: We have the situation that two peers initiate connections to each other at (roughly) the same time. In #1491 this was sometimes prevented, but not reliably. This PR fixes it by: * When connecting, we set a `SyncState::Dialing` for the `(namespace, peer)` * When accepting, if our own state is `Dialing` for the incoming request for `(namespace, peer)` we compare our peer id with that of the incoming request, and abort if ours is higher (doesn't matter which way, we care about a predictable outcome only * Through this, only one of the two simoultanoues connections will survive * Also added a `Abort` frame to the wire protocol to transfer to inform the dialer about the reason of the declined request, which is either "we do double sync, and will take the other conn" (`AlreadySyncing`) or "this replica is not here" (`NotAvailable`) This PR also: * Further improves logging * Improves errors ## Notes & open questions <!-- Any notes, remarks or open questions you have to make about the PR. --> ## Change checklist - [x] Self-review. - [x] Documentation updates if relevant. - [x] Tests if relevant. --------- Co-authored-by: Diva M <divma@protonmail.com>
1 parent f16e439 commit d8cc9df

6 files changed

Lines changed: 399 additions & 183 deletions

File tree

iroh-net/src/key.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ impl From<VerifyingKey> for PublicKey {
121121

122122
impl Debug for PublicKey {
123123
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
124-
let mut text = data_encoding::BASE32_NOPAD.encode(self.as_bytes());
124+
let mut text = data_encoding::BASE32_NOPAD.encode(&self.as_bytes()[..10]);
125125
text.make_ascii_lowercase();
126126
write!(f, "PublicKey({text})")
127127
}

iroh-sync/src/net.rs

Lines changed: 118 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
33
use std::{future::Future, net::SocketAddr};
44

5-
use anyhow::{Context, Result};
65
use iroh_net::{key::PublicKey, magic_endpoint::get_peer_id, MagicEndpoint};
6+
use serde::{Deserialize, Serialize};
77
use tracing::debug;
88

99
use crate::{
@@ -30,18 +30,23 @@ pub async fn connect_and_sync<S: store::Store>(
3030
peer: PublicKey,
3131
derp_region: Option<u16>,
3232
addrs: &[SocketAddr],
33-
) -> Result<()> {
34-
debug!(peer = ?peer, "sync (via connect): start");
33+
) -> Result<(), ConnectError> {
34+
debug!(?peer, "sync[dial]: connect");
3535
let namespace = doc.namespace();
3636
let connection = endpoint
3737
.connect((peer, derp_region, addrs).into(), SYNC_ALPN)
3838
.await
39-
.context("failed to establish connection")?;
40-
debug!(?peer, ?namespace, "sync (via connect): connected");
41-
let (mut send_stream, mut recv_stream) = connection.open_bi().await?;
39+
.map_err(ConnectError::connect)?;
40+
debug!(?peer, ?namespace, "sync[dial]: connected");
41+
let (mut send_stream, mut recv_stream) =
42+
connection.open_bi().await.map_err(ConnectError::connect)?;
4243
let res = run_alice::<S, _, _>(&mut send_stream, &mut recv_stream, doc, peer).await;
43-
send_stream.finish().await?;
44-
recv_stream.read_to_end(0).await?;
44+
45+
send_stream.finish().await.map_err(ConnectError::close)?;
46+
recv_stream
47+
.read_to_end(0)
48+
.await
49+
.map_err(ConnectError::close)?;
4550

4651
#[cfg(feature = "metrics")]
4752
if res.is_ok() {
@@ -50,47 +55,32 @@ pub async fn connect_and_sync<S: store::Store>(
5055
inc!(Metrics, sync_via_connect_failure);
5156
}
5257

53-
debug!(peer = ?peer, ?res, "sync (via connect): done");
58+
debug!(?peer, ?namespace, ?res, "sync[dial]: done");
5459
res
5560
}
5661

5762
/// What to do with incoming sync requests
58-
#[derive(Debug)]
59-
pub enum AcceptOutcome<S: store::Store> {
60-
/// This namespace is not available for sync.
61-
NotAvailable,
62-
/// This namespace is already syncing, therefore abort.
63-
AlreadySyncing,
64-
/// Accept the sync request.
65-
Accept(Replica<S::Instance>),
66-
}
67-
68-
impl<S: store::Store> From<Option<Replica<S::Instance>>> for AcceptOutcome<S> {
69-
fn from(replica: Option<Replica<S::Instance>>) -> Self {
70-
match replica {
71-
Some(replica) => AcceptOutcome::Accept(replica),
72-
None => AcceptOutcome::NotAvailable,
73-
}
74-
}
75-
}
63+
pub type AcceptOutcome<S> = Result<Replica<<S as store::Store>::Instance>, AbortReason>;
7664

7765
/// Handle an iroh-sync connection and sync all shared documents in the replica store.
7866
pub async fn handle_connection<S, F, Fut>(
7967
connecting: quinn::Connecting,
8068
accept_cb: F,
81-
) -> std::result::Result<(NamespaceId, PublicKey), SyncError>
69+
) -> Result<(NamespaceId, PublicKey), AcceptError>
8270
where
8371
S: store::Store,
8472
F: Fn(NamespaceId, PublicKey) -> Fut,
8573
Fut: Future<Output = anyhow::Result<AcceptOutcome<S>>>,
8674
{
87-
let connection = connecting.await.map_err(SyncError::connect)?;
88-
let peer = get_peer_id(&connection).await.map_err(SyncError::connect)?;
75+
let connection = connecting.await.map_err(AcceptError::connect)?;
76+
let peer = get_peer_id(&connection)
77+
.await
78+
.map_err(AcceptError::connect)?;
8979
let (mut send_stream, mut recv_stream) = connection
9080
.accept_bi()
9181
.await
92-
.map_err(|error| SyncError::open(peer, error))?;
93-
debug!(peer = ?peer, "sync (via accept): start");
82+
.map_err(|e| AcceptError::open(peer, e))?;
83+
debug!(?peer, "sync[accept]: handle");
9484

9585
let res = run_bob::<S, _, _, _, _>(&mut send_stream, &mut recv_stream, accept_cb, peer).await;
9686

@@ -101,24 +91,30 @@ where
10191
inc!(Metrics, sync_via_accept_failure);
10292
}
10393

104-
debug!(peer = ?peer, ?res, "sync (via accept): done");
94+
let namespace = match &res {
95+
Ok(namespace) => Some(*namespace),
96+
Err(err) => err.namespace(),
97+
};
10598

106-
let namespace = res?;
10799
send_stream
108100
.finish()
109101
.await
110-
.map_err(|error| SyncError::close(peer, namespace, error))?;
102+
.map_err(|error| AcceptError::close(peer, namespace, error))?;
111103
recv_stream
112104
.read_to_end(0)
113105
.await
114-
.map_err(|error| SyncError::close(peer, namespace, error))?;
106+
.map_err(|error| AcceptError::close(peer, namespace, error))?;
107+
let namespace = res?;
108+
109+
debug!(?peer, ?namespace, "sync[accept]: done");
110+
115111
Ok((namespace, peer))
116112
}
117113

118-
/// Failure reasons for sync.
114+
/// Errors that may occur on handling incoming sync connections.
119115
#[derive(thiserror::Error, Debug)]
120116
#[allow(missing_docs)]
121-
pub enum SyncError {
117+
pub enum AcceptError {
122118
/// Failed to establish connection
123119
#[error("Failed to establish connection")]
124120
Connect {
@@ -132,6 +128,13 @@ pub enum SyncError {
132128
#[source]
133129
error: anyhow::Error,
134130
},
131+
/// We aborted the sync request.
132+
#[error("Aborted sync of {namespace:?} with {peer:?}: {reason:?}")]
133+
Abort {
134+
peer: PublicKey,
135+
namespace: NamespaceId,
136+
reason: AbortReason,
137+
},
135138
/// Failed to run sync
136139
#[error("Failed to sync {namespace:?} with {peer:?}")]
137140
Sync {
@@ -144,13 +147,52 @@ pub enum SyncError {
144147
#[error("Failed to close {namespace:?} with {peer:?}")]
145148
Close {
146149
peer: PublicKey,
147-
namespace: NamespaceId,
150+
namespace: Option<NamespaceId>,
148151
#[source]
149152
error: anyhow::Error,
150153
},
151154
}
152155

153-
impl SyncError {
156+
/// Errors that may occur on outgoing sync requests.
157+
#[derive(thiserror::Error, Debug)]
158+
#[allow(missing_docs)]
159+
pub enum ConnectError {
160+
/// Failed to establish connection
161+
#[error("Failed to establish connection")]
162+
Connect {
163+
#[source]
164+
error: anyhow::Error,
165+
},
166+
/// The remote peer aborted the sync request.
167+
#[error("Remote peer aborted sync: {0:?}")]
168+
RemoteAbort(AbortReason),
169+
/// We cancelled the operation
170+
#[error("Cancelled")]
171+
Cancelled,
172+
/// Failed to run sync
173+
#[error("Failed to sync")]
174+
Sync {
175+
#[source]
176+
error: anyhow::Error,
177+
},
178+
/// Failed to close
179+
#[error("Failed to close connection1")]
180+
Close {
181+
#[source]
182+
error: anyhow::Error,
183+
},
184+
}
185+
186+
/// Reason why we aborted an incoming sync request.
187+
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
188+
pub enum AbortReason {
189+
/// Namespace is not avaiable.
190+
NotAvailable,
191+
/// We are already syncing this namespace.
192+
AlreadySyncing,
193+
}
194+
195+
impl AcceptError {
154196
fn connect(error: impl Into<anyhow::Error>) -> Self {
155197
Self::Connect {
156198
error: error.into(),
@@ -173,7 +215,11 @@ impl SyncError {
173215
error: error.into(),
174216
}
175217
}
176-
fn close(peer: PublicKey, namespace: NamespaceId, error: impl Into<anyhow::Error>) -> Self {
218+
fn close(
219+
peer: PublicKey,
220+
namespace: Option<NamespaceId>,
221+
error: impl Into<anyhow::Error>,
222+
) -> Self {
177223
Self::Close {
178224
peer,
179225
namespace,
@@ -183,20 +229,43 @@ impl SyncError {
183229
/// Get the peer's node ID (if available)
184230
pub fn peer(&self) -> Option<PublicKey> {
185231
match self {
186-
SyncError::Connect { .. } => None,
187-
SyncError::Open { peer, .. } => Some(*peer),
188-
SyncError::Sync { peer, .. } => Some(*peer),
189-
SyncError::Close { peer, .. } => Some(*peer),
232+
AcceptError::Connect { .. } => None,
233+
AcceptError::Open { peer, .. } => Some(*peer),
234+
AcceptError::Sync { peer, .. } => Some(*peer),
235+
AcceptError::Close { peer, .. } => Some(*peer),
236+
AcceptError::Abort { peer, .. } => Some(*peer),
190237
}
191238
}
192239

193240
/// Get the namespace (if available)
194241
pub fn namespace(&self) -> Option<NamespaceId> {
195242
match self {
196-
SyncError::Connect { .. } => None,
197-
SyncError::Open { .. } => None,
198-
SyncError::Sync { namespace, .. } => namespace.to_owned(),
199-
SyncError::Close { namespace, .. } => Some(*namespace),
243+
AcceptError::Connect { .. } => None,
244+
AcceptError::Open { .. } => None,
245+
AcceptError::Sync { namespace, .. } => namespace.to_owned(),
246+
AcceptError::Close { namespace, .. } => namespace.to_owned(),
247+
AcceptError::Abort { namespace, .. } => Some(*namespace),
200248
}
201249
}
202250
}
251+
252+
impl ConnectError {
253+
fn connect(error: impl Into<anyhow::Error>) -> Self {
254+
Self::Connect {
255+
error: error.into(),
256+
}
257+
}
258+
fn close(error: impl Into<anyhow::Error>) -> Self {
259+
Self::Close {
260+
error: error.into(),
261+
}
262+
}
263+
pub(crate) fn sync(error: impl Into<anyhow::Error>) -> Self {
264+
Self::Sync {
265+
error: error.into(),
266+
}
267+
}
268+
pub(crate) fn remote_abort(reason: AbortReason) -> Self {
269+
Self::RemoteAbort(reason)
270+
}
271+
}

0 commit comments

Comments
 (0)