Skip to content

Commit 8fe3f71

Browse files
authored
feat: content hashes iterator for sync store (#1501)
## Description Adds an iterator over all content hashes in the document store. Needed for #1496. ## Notes & open questions <!-- Any notes, remarks or open questions you have to make about the PR. --> ## Change checklist - [x] Self-review. - [x] Documentation updates if relevant. - [x] Tests if relevant.
1 parent 5380cd5 commit 8fe3f71

4 files changed

Lines changed: 132 additions & 1 deletion

File tree

iroh-sync/src/store.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! Storage trait and implementation for iroh-sync documents
22
33
use anyhow::Result;
4+
use iroh_bytes::Hash;
45
use rand_core::CryptoRngCore;
56
use serde::{Deserialize, Serialize};
67

@@ -26,6 +27,11 @@ pub trait Store: std::fmt::Debug + Clone + Send + Sync + 'static {
2627
where
2728
Self: 'a;
2829

30+
/// Iterator over all content hashes in the store, returned from [`Self::content_hashes`]
31+
type ContentHashesIter<'a>: Iterator<Item = Result<Hash>>
32+
where
33+
Self: 'a;
34+
2935
/// Iterator over replica namespaces in the store, returned from [`Self::list_namespaces`]
3036
type NamespaceIter<'a>: Iterator<Item = Result<NamespaceId>>
3137
where
@@ -78,6 +84,9 @@ pub trait Store: std::fmt::Debug + Clone + Send + Sync + 'static {
7884
author: AuthorId,
7985
key: impl AsRef<[u8]>,
8086
) -> Result<Option<SignedEntry>>;
87+
88+
/// Get all content hashes of all replicas in the store.
89+
fn content_hashes(&self) -> Result<Self::ContentHashesIter<'_>>;
8190
}
8291

8392
/// Filter a get query onto a namespace

iroh-sync/src/store/fs.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use std::{cmp::Ordering, collections::HashMap, path::Path, sync::Arc};
55
use anyhow::Result;
66
use derive_more::From;
77
use ed25519_dalek::{SignatureError, VerifyingKey};
8+
use iroh_bytes::Hash;
89
use ouroboros::self_referencing;
910
use parking_lot::RwLock;
1011
use redb::{
@@ -111,6 +112,7 @@ impl Store {
111112
impl super::Store for Store {
112113
type Instance = StoreInstance;
113114
type GetIter<'a> = RangeIterator<'a>;
115+
type ContentHashesIter<'a> = ContentHashesIterator<'a>;
114116
type AuthorsIter<'a> = std::vec::IntoIter<Result<Author>>;
115117
type NamespaceIter<'a> = std::vec::IntoIter<Result<NamespaceId>>;
116118

@@ -225,6 +227,10 @@ impl super::Store for Store {
225227

226228
Ok(Some(signed_entry))
227229
}
230+
231+
fn content_hashes(&self) -> Result<Self::ContentHashesIter<'_>> {
232+
ContentHashesIterator::create(&self.db)
233+
}
228234
}
229235

230236
impl Store {
@@ -503,6 +509,47 @@ impl crate::ranger::Store<SignedEntry> for StoreInstance {
503509
}
504510
}
505511

512+
/// Iterator over all content hashes for the fs store.
513+
#[self_referencing]
514+
pub struct ContentHashesIterator<'a> {
515+
read_tx: ReadTransaction<'a>,
516+
#[borrows(read_tx)]
517+
#[covariant]
518+
record_table: RecordsTable<'this>,
519+
#[covariant]
520+
#[borrows(record_table)]
521+
records: RecordsRange<'this>,
522+
}
523+
impl<'a> ContentHashesIterator<'a> {
524+
fn create(db: &'a Arc<Database>) -> anyhow::Result<Self> {
525+
let iter = Self::try_new(
526+
db.begin_read()?,
527+
|read_tx| {
528+
read_tx
529+
.open_table(RECORDS_TABLE)
530+
.map_err(anyhow::Error::from)
531+
},
532+
|table| table.iter().map_err(anyhow::Error::from),
533+
)?;
534+
Ok(iter)
535+
}
536+
}
537+
538+
impl Iterator for ContentHashesIterator<'_> {
539+
type Item = Result<Hash>;
540+
541+
fn next(&mut self) -> Option<Self::Item> {
542+
self.with_mut(|fields| match fields.records.next() {
543+
None => None,
544+
Some(Err(err)) => Some(Err(err.into())),
545+
Some(Ok((_key, value))) => {
546+
let (_timestamp, _namespace_sig, _author_sig, _len, hash) = value.value();
547+
Some(Ok(Hash::from(hash)))
548+
}
549+
})
550+
}
551+
}
552+
506553
#[self_referencing]
507554
pub struct RangeIterator<'a> {
508555
read_tx: ReadTransaction<'a>,

iroh-sync/src/store/memory.rs

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use std::{
88

99
use anyhow::Result;
1010
use ed25519_dalek::{SignatureError, VerifyingKey};
11+
use iroh_bytes::Hash;
1112
use parking_lot::{RwLock, RwLockReadGuard};
1213

1314
use crate::{
@@ -31,11 +32,12 @@ pub struct Store {
3132
type Rid = (AuthorId, Vec<u8>);
3233
type Rvalue = SignedEntry;
3334
type RecordMap = BTreeMap<Rid, Rvalue>;
34-
type ReplicaRecordsOwned = HashMap<NamespaceId, RecordMap>;
35+
type ReplicaRecordsOwned = BTreeMap<NamespaceId, RecordMap>;
3536

3637
impl super::Store for Store {
3738
type Instance = ReplicaStoreInstance;
3839
type GetIter<'a> = RangeIterator<'a>;
40+
type ContentHashesIter<'a> = ContentHashesIterator<'a>;
3941
type AuthorsIter<'a> = std::vec::IntoIter<Result<Author>>;
4042
type NamespaceIter<'a> = std::vec::IntoIter<Result<NamespaceId>>;
4143

@@ -117,6 +119,16 @@ impl super::Store for Store {
117119

118120
Ok(value.cloned())
119121
}
122+
123+
/// Get all content hashes of all replicas in the store.
124+
fn content_hashes(&self) -> Result<Self::ContentHashesIter<'_>> {
125+
let records = self.replica_records.read();
126+
Ok(ContentHashesIterator {
127+
records,
128+
namespace_i: 0,
129+
record_i: 0,
130+
})
131+
}
120132
}
121133

122134
impl Store {
@@ -234,6 +246,31 @@ impl GetFilter {
234246
}
235247
}
236248

249+
/// Iterator over all content hashes in the memory store.
250+
pub struct ContentHashesIterator<'a> {
251+
records: ReplicaRecords<'a>,
252+
namespace_i: usize,
253+
record_i: usize,
254+
}
255+
impl<'a> Iterator for ContentHashesIterator<'a> {
256+
type Item = Result<Hash>;
257+
fn next(&mut self) -> Option<Self::Item> {
258+
loop {
259+
let records = self.records.values().nth(self.namespace_i)?;
260+
match records.values().nth(self.record_i) {
261+
None => {
262+
self.namespace_i += 1;
263+
self.record_i = 0;
264+
}
265+
Some(record) => {
266+
self.record_i += 1;
267+
return Some(Ok(record.content_hash()));
268+
}
269+
}
270+
}
271+
}
272+
}
273+
237274
/// Iterator over entries in the memory store
238275
#[derive(Debug)]
239276
pub struct RangeIterator<'a> {

iroh-sync/src/sync.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -719,6 +719,8 @@ impl Record {
719719

720720
#[cfg(test)]
721721
mod tests {
722+
use std::collections::HashSet;
723+
722724
use anyhow::Result;
723725
use rand_core::SeedableRng;
724726

@@ -887,6 +889,42 @@ mod tests {
887889
Ok(())
888890
}
889891

892+
#[test]
893+
fn test_content_hashes_iterator_memory() -> Result<()> {
894+
let store = store::memory::Store::default();
895+
test_basics(store)
896+
}
897+
898+
#[cfg(feature = "fs-store")]
899+
#[test]
900+
fn test_content_hashes_iterator_fs() -> Result<()> {
901+
let dbfile = tempfile::NamedTempFile::new()?;
902+
let store = store::fs::Store::new(dbfile.path())?;
903+
test_content_hashes_iterator(store)
904+
}
905+
906+
fn test_content_hashes_iterator<S: store::Store>(store: S) -> Result<()> {
907+
let mut rng = rand::thread_rng();
908+
let mut expected = HashSet::new();
909+
let n_replicas = 3;
910+
let n_entries = 4;
911+
for i in 0..n_replicas {
912+
let namespace = Namespace::new(&mut rng);
913+
let author = store.new_author(&mut rng)?;
914+
let replica = store.new_replica(namespace)?;
915+
for j in 0..n_entries {
916+
let key = format!("{j}");
917+
let data = format!("{i}:{j}");
918+
let hash = replica.hash_and_insert(key, &author, data)?;
919+
expected.insert(hash);
920+
}
921+
}
922+
assert_eq!(expected.len(), n_replicas * n_entries);
923+
let actual = store.content_hashes()?.collect::<Result<HashSet<Hash>>>()?;
924+
assert_eq!(actual, expected);
925+
Ok(())
926+
}
927+
890928
#[test]
891929
fn test_multikey() {
892930
let mut rng = rand::thread_rng();

0 commit comments

Comments
 (0)