Skip to content

Commit 779e470

Browse files
authored
fix(sync): fix PeerData encoding, neighbor events, better & predictable tests (#1513)
## Description * **fix(gossip): properly encode peer data.** #1506 introduced a bug: The `PeerData` was encoded from the `PeerAddr` (including the peer id) but decoded to `AddrInfo` (without the peer id). So it failed, and dialing peers failed. It only did not matter much because most tests use tickets separately, so do not rely on the `PeerData` gossip. * **feat: expose neighbor events** through document subscriptions. for now used to write better and less flakey tests. also useful for stats like usecases, and potentially others. * **tests: improve sync tests** and make `sync_full_basic` not flakey anymore (hopefully). the main change is that for some events, we don't care about the exact order in tests anymore, because the exact order is too unpredictable timing-wise for things that happen concurrently. instead they are matched in chunks. ## Change checklist - [x] Self-review. - [x] Documentation updates if relevant. - [x] Tests if relevant. Replaces #1512
1 parent 49bde4f commit 779e470

6 files changed

Lines changed: 322 additions & 178 deletions

File tree

iroh-gossip/src/net.rs

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use tokio::{
1515
use tracing::{debug, trace, warn};
1616

1717
use self::util::{read_message, write_message, Dialer, Timers};
18-
use crate::proto::{self, Scope, TopicId};
18+
use crate::proto::{self, PeerData, Scope, TopicId};
1919

2020
pub mod util;
2121

@@ -347,10 +347,9 @@ impl Actor {
347347
}
348348
},
349349
_ = self.on_endpoints_rx.changed() => {
350-
let info = self.endpoint.my_addr().await?;
351-
let peer_data = Bytes::from(postcard::to_stdvec(&info)?);
352-
353-
self.handle_in_event(InEvent::UpdatePeerData(peer_data.into()), Instant::now()).await?;
350+
let addr = self.endpoint.my_addr().await?;
351+
let peer_data = encode_peer_data(&addr.info)?;
352+
self.handle_in_event(InEvent::UpdatePeerData(peer_data), Instant::now()).await?;
354353
}
355354
(peer_id, res) = self.dialer.next_conn() => {
356355
match res {
@@ -385,7 +384,7 @@ impl Actor {
385384

386385
async fn handle_to_actor_msg(&mut self, msg: ToActor, now: Instant) -> anyhow::Result<()> {
387386
let me = *self.state.me();
388-
debug!(?me, "handle to_actor {msg:?}");
387+
trace!(?me, "handle to_actor {msg:?}");
389388
match msg {
390389
ToActor::ConnIncoming(peer_id, origin, conn) => {
391390
self.conns.insert(peer_id, conn.clone());
@@ -396,7 +395,7 @@ impl Actor {
396395
// Spawn a task for this connection
397396
let in_event_tx = self.in_event_tx.clone();
398397
tokio::spawn(async move {
399-
debug!(?me, peer = ?peer_id, "connection established, start loop");
398+
debug!(?me, peer = ?peer_id, "connection established");
400399
match connection_loop(peer_id, conn, origin, send_rx, &in_event_tx).await {
401400
Ok(()) => {
402401
debug!(?me, peer = ?peer_id, "connection closed without error")
@@ -514,8 +513,8 @@ impl Actor {
514513
self.pending_sends.remove(&peer);
515514
self.dialer.abort_dial(&peer);
516515
}
517-
OutEvent::PeerData(peer, data) => match postcard::from_bytes::<AddrInfo>(&data) {
518-
Err(err) => warn!("Failed to decode PeerData from {peer}: {err}"),
516+
OutEvent::PeerData(peer, data) => match decode_peer_data(&data) {
517+
Err(err) => warn!("Failed to decode {data:?} from {peer}: {err}"),
519518
Ok(info) => {
520519
debug!(me = ?self.endpoint.peer_id(), peer = ?peer, "add known addrs: {info:?}");
521520
let peer_addr = PeerAddr {
@@ -600,6 +599,15 @@ async fn connection_loop(
600599
Ok(())
601600
}
602601

602+
fn encode_peer_data(info: &AddrInfo) -> anyhow::Result<PeerData> {
603+
Ok(PeerData::new(postcard::to_stdvec(info)?))
604+
}
605+
606+
fn decode_peer_data(peer_data: &PeerData) -> anyhow::Result<AddrInfo> {
607+
let info = postcard::from_bytes(peer_data.as_bytes())?;
608+
Ok(info)
609+
}
610+
603611
#[cfg(test)]
604612
mod test {
605613
use std::time::Duration;

iroh-gossip/src/proto.rs

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
4848
use std::{fmt, hash::Hash};
4949

50+
use bytes::Bytes;
5051
use serde::{de::DeserializeOwned, Deserialize, Serialize};
5152

5253
mod hyparview;
@@ -80,20 +81,26 @@ impl<T> PeerIdentity for T where T: Hash + Eq + Copy + fmt::Debug + Serialize +
8081
///
8182
/// Implementations may use these bytes to supply addresses or other information needed to connect
8283
/// to a peer that is not included in the peer's [`PeerIdentity`].
83-
#[derive(
84-
derive_more::Debug,
85-
Serialize,
86-
Deserialize,
87-
Clone,
88-
PartialEq,
89-
Eq,
90-
derive_more::From,
91-
derive_more::Into,
92-
derive_more::Deref,
93-
Default,
94-
)]
84+
#[derive(derive_more::Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
9585
#[debug("PeerData({}b)", self.0.len())]
96-
pub struct PeerData(bytes::Bytes);
86+
pub struct PeerData(Bytes);
87+
88+
impl PeerData {
89+
/// Create a new [`PeerData`] from a byte buffer.
90+
pub fn new(data: impl Into<Bytes>) -> Self {
91+
Self(data.into())
92+
}
93+
94+
/// Get a reference to the contained [`bytes::Bytes`].
95+
pub fn inner(&self) -> &bytes::Bytes {
96+
&self.0
97+
}
98+
99+
/// Get the peer data as a byte slice.
100+
pub fn as_bytes(&self) -> &[u8] {
101+
&self.0
102+
}
103+
}
97104

98105
/// PeerInfo contains a peer's identifier and the opaque peer data as provided by the implementer.
99106
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]

iroh/examples/sync.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,7 @@ async fn run(args: Args) -> anyhow::Result<()> {
291291
println!("change: {}", fmt_entry(&entry));
292292
}
293293
}
294+
_ => {}
294295
}
295296
}
296297
}

iroh/src/commands/sync.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,12 @@ impl DocCommands {
265265
),
266266
}
267267
}
268+
LiveEvent::NeighborUp(peer) => {
269+
println!("neighbor peer up: {peer:?}");
270+
}
271+
LiveEvent::NeighborDown(peer) => {
272+
println!("neighbor peer down: {peer:?}");
273+
}
268274
}
269275
}
270276
}

iroh/src/sync_engine/live.rs

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ pub type OnLiveEventCallback =
124124
Box<dyn Fn(LiveEvent) -> BoxFuture<'static, KeepCallback> + Send + Sync + 'static>;
125125

126126
/// Events informing about actions of the live sync progres.
127-
#[derive(Serialize, Deserialize, Debug, Clone)]
127+
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
128128
#[allow(clippy::large_enum_variant)]
129129
pub enum LiveEvent {
130130
/// A local insertion.
@@ -146,6 +146,10 @@ pub enum LiveEvent {
146146
/// The content hash of the newly available entry content
147147
hash: Hash,
148148
},
149+
/// We have a new neighbor in the swarm.
150+
NeighborUp(PublicKey),
151+
/// We lost a neighbor in the swarm.
152+
NeighborDown(PublicKey),
149153
/// A set-reconciliation sync finished.
150154
SyncFinished(SyncEvent),
151155
}
@@ -795,14 +799,17 @@ impl<S: store::Store, B: baomap::Store> Actor<S, B> {
795799
let op: Op = postcard::from_bytes(&msg.content)?;
796800
match op {
797801
Op::Put(entry) => {
798-
debug!(peer = ?msg.delivered_from, topic = ?topic, "received entry via gossip");
799-
// If the distance is 0, we received the message from its original author.
800-
// In this case, assume that the peer can provide the content to us.
802+
debug!(peer = ?msg.delivered_from, ?namespace, "received entry via gossip");
803+
// Insert the entry into our replica.
804+
// If the message was broadcast with neighbor scope, or is received
805+
// directly from the author, we assume that the content is available at
806+
// that peer. Otherwise we don't.
807+
// The download is not triggered here, but in the `on_replica_event`
808+
// handler for the `InsertRemote` event.
801809
let content_status = match msg.scope.is_direct() {
802810
true => ContentStatus::Complete,
803811
false => ContentStatus::Missing,
804812
};
805-
// At this point, we do not know if the peer has the content.
806813
replica.insert_remote_entry(
807814
entry,
808815
*msg.delivered_from.as_bytes(),
@@ -822,9 +829,18 @@ impl<S: store::Store, B: baomap::Store> Actor<S, B> {
822829
// [Self::sync_with_peer] will check to not resync with peers synced previously in the
823830
// same session. TODO: Maybe this is too broad and leads to too many sync requests.
824831
Event::NeighborUp(peer) => {
832+
debug!(?peer, ?namespace, "neighbor up");
825833
self.sync_with_peer(namespace, peer, SyncReason::NewNeighbor);
834+
if let Some(subs) = self.event_subscriptions.get_mut(&namespace) {
835+
notify_all(subs, LiveEvent::NeighborUp(peer)).await;
836+
}
837+
}
838+
Event::NeighborDown(peer) => {
839+
debug!(?peer, ?namespace, "neighbor down");
840+
if let Some(subs) = self.event_subscriptions.get_mut(&namespace) {
841+
notify_all(subs, LiveEvent::NeighborDown(peer)).await;
842+
}
826843
}
827-
_ => {}
828844
}
829845
Ok(())
830846
}
@@ -844,7 +860,7 @@ impl<S: store::Store, B: baomap::Store> Actor<S, B> {
844860
// A new entry was inserted locally. Broadcast a gossip message.
845861
let op = Op::Put(signed_entry);
846862
let message = postcard::to_stdvec(&op)?.into();
847-
debug!(topic = ?topic, "broadcast new entry");
863+
debug!(?namespace, "broadcast new entry");
848864
self.gossip.broadcast(topic, message).await?;
849865

850866
// Notify subscribers about the event

0 commit comments

Comments
 (0)