From b8de760a7a1b8ee88dc88d43e8a1859bbe27f9da Mon Sep 17 00:00:00 2001 From: Sam Gwilym Date: Thu, 29 Aug 2024 12:16:27 +0100 Subject: [PATCH 1/5] Add EntryOrigin to ingestion --- data-model/src/store.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/data-model/src/store.rs b/data-model/src/store.rs index e7b9790..9231355 100644 --- a/data-model/src/store.rs +++ b/data-model/src/store.rs @@ -149,7 +149,11 @@ where AT: AuthorisationToken, { /// A new entry was ingested. - Ingested(u64, AuthorisedEntry), + Ingested( + u64, + AuthorisedEntry, + EntryOrigin, + ), /// An existing entry received a portion of its corresponding payload. Appended(u64, LengthyAuthorisedEntry), /// An entry was forgotten. @@ -188,6 +192,14 @@ pub enum ForgetPayloadError { ReferredToByOtherEntries, } +/// The origin of an entry ingestion event. +pub enum EntryOrigin { + /// The entry was probably created on this machine. + Local, + /// The entry was sourced from another device, e.g. a networked sync session. + Remote(u64), +} + /// A [`Store`] is a set of [`AuthorisedEntry`] belonging to a single namespace, and a (possibly partial) corresponding set of payloads. pub trait Store where From de0f72fc972de30c9590bc8d7b2a255e8666ce49 Mon Sep 17 00:00:00 2001 From: Sam Gwilym Date: Thu, 29 Aug 2024 12:31:46 +0100 Subject: [PATCH 2/5] Use Areas for subscriptions --- data-model/src/store.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/data-model/src/store.rs b/data-model/src/store.rs index 9231355..578924d 100644 --- a/data-model/src/store.rs +++ b/data-model/src/store.rs @@ -384,7 +384,7 @@ where /// If `ignore_empty_payloads` is `true`, the producer will not produce entries with a `payload_length` of `0`. fn subscribe_area( &self, - area: &AreaOfInterest, + area: &Area, ignore: Option, ) -> impl Producer>; @@ -392,7 +392,7 @@ where fn resume_subscription( &self, progress_id: u64, - area: &AreaOfInterest, + area: &Area, ignore: Option, ) -> impl Producer>; } From d39f7e04e2f1b774f9b6b5d4aa9aab49c07de791 Mon Sep 17 00:00:00 2001 From: Sam Gwilym Date: Thu, 29 Aug 2024 14:13:25 +0100 Subject: [PATCH 3/5] Add standard derives --- data-model/src/lengthy_entry.rs | 2 ++ data-model/src/store.rs | 14 +++++++++++++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/data-model/src/lengthy_entry.rs b/data-model/src/lengthy_entry.rs index 1941062..06fa898 100644 --- a/data-model/src/lengthy_entry.rs +++ b/data-model/src/lengthy_entry.rs @@ -3,6 +3,7 @@ use crate::{AuthorisationToken, AuthorisedEntry, Entry, NamespaceId, PayloadDige /// An [`Entry`] together with information about how much of its payload a given [`Store`] holds. /// /// [Definition](https://willowprotocol.org/specs/3d-range-based-set-reconciliation/index.html#LengthyEntry) +#[derive(Debug, Clone, PartialEq, Eq)] pub struct LengthyEntry where N: NamespaceId, @@ -56,6 +57,7 @@ where } /// An [`AuthorisedEntry`] together with information about how much of its payload a given [`Store`] holds. +#[derive(Debug, Clone, PartialEq, Eq)] pub struct LengthyAuthorisedEntry< const MCL: usize, const MCC: usize, diff --git a/data-model/src/store.rs b/data-model/src/store.rs index 578924d..e09a754 100644 --- a/data-model/src/store.rs +++ b/data-model/src/store.rs @@ -10,6 +10,7 @@ use crate::{ }; /// Returned when an entry could be ingested into a [`Store`]. +#[derive(Debug, Clone)] pub enum EntryIngestionSuccess< const MCL: usize, const MCC: usize, @@ -31,6 +32,7 @@ pub enum EntryIngestionSuccess< } /// Returned when an entry cannot be ingested into a [`Store`]. +#[derive(Debug, Clone)] pub enum EntryIngestionError< const MCL: usize, const MCC: usize, @@ -68,6 +70,7 @@ pub type BulkIngestionResult< ); /// Returned when a bulk ingestion failed due to a consumer error. +#[derive(Debug, Clone)] pub struct BulkIngestionError< const MCL: usize, const MCC: usize, @@ -84,6 +87,7 @@ pub struct BulkIngestionError< } /// Return when a payload is successfully appended to the [`Store`]. +#[derive(Debug, Clone)] pub enum PayloadAppendSuccess where N: NamespaceId, @@ -97,6 +101,7 @@ where } /// Returned when a payload fails to be appended into the [`Store`]. +#[derive(Debug, Clone)] pub enum PayloadAppendError { /// None of the entries in the store reference this payload. NotEntryReference, @@ -111,9 +116,11 @@ pub enum PayloadAppendError { } /// Returned when no entry was found for some criteria. +#[derive(Debug, Clone)] pub struct NoSuchEntryError; /// The order by which entries should be returned for a given query. +#[derive(Debug, Clone)] pub enum QueryOrder { /// Ordered by subspace, then path, then timestamp. Subspace, @@ -126,6 +133,7 @@ pub enum QueryOrder { } /// Describes an [`AuthorisedEntry`] which was pruned and the [`AuthorisedEntry`] which triggered the pruning. +#[derive(Debug, Clone)] pub struct PruneEvent where N: NamespaceId, @@ -141,6 +149,7 @@ where /// An event which took place within a [`Store`]. /// Each event includes a *progress ID* which can be used to *resume* a subscription at any point in the future. +#[derive(Debug, Clone)] pub enum StoreEvent where N: NamespaceId, @@ -165,10 +174,11 @@ where } /// Returned when the store chooses to not resume a subscription. +#[derive(Debug, Clone)] pub struct ResumptionFailedError(pub u64); /// Describes which entries to ignore during a query. -#[derive(Default)] +#[derive(Default, Clone)] pub struct QueryIgnoreParams { /// Omit entries with locally incomplete corresponding payloads. pub ignore_incomplete_payloads: bool, @@ -187,12 +197,14 @@ impl QueryIgnoreParams { } /// Returned when a payload could not be forgotten. +#[derive(Debug, Clone)] pub enum ForgetPayloadError { NoSuchEntry, ReferredToByOtherEntries, } /// The origin of an entry ingestion event. +#[derive(Debug, Clone, PartialEq, Eq)] pub enum EntryOrigin { /// The entry was probably created on this machine. Local, From 51ab2c4882d943236444b69a7adc5ccafe8a8066 Mon Sep 17 00:00:00 2001 From: Sam Gwilym Date: Thu, 29 Aug 2024 15:44:19 +0100 Subject: [PATCH 4/5] tedious implementation of Error --- data-model/src/store.rs | 151 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 145 insertions(+), 6 deletions(-) diff --git a/data-model/src/store.rs b/data-model/src/store.rs index e09a754..04c62ae 100644 --- a/data-model/src/store.rs +++ b/data-model/src/store.rs @@ -1,4 +1,8 @@ -use std::future::Future; +use std::{ + error::Error, + fmt::{Debug, Display}, + future::Future, +}; use ufotofu::{local_nb::Producer, nb::BulkProducer}; @@ -41,7 +45,7 @@ pub enum EntryIngestionError< S: SubspaceId, PD: PayloadDigest, AT, - OE, + OE: Display + Error, > { /// The entry belonged to another namespace. WrongNamespace(AuthorisedEntry), @@ -51,6 +55,43 @@ pub enum EntryIngestionError< OperationsError(OE), } +impl< + const MCL: usize, + const MCC: usize, + const MPL: usize, + N: NamespaceId, + S: SubspaceId, + PD: PayloadDigest, + AT, + OE: Display + Error, + > Display for EntryIngestionError +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + EntryIngestionError::WrongNamespace(_) => { + write!(f, "Tried to ingest an entry from a different namespace.") + } + EntryIngestionError::PruningPrevented => { + write!(f, "Entry ingestion would have triggered undesired pruning.") + } + EntryIngestionError::OperationsError(err) => Display::fmt(err, f), + } + } +} + +impl< + const MCL: usize, + const MCC: usize, + const MPL: usize, + N: NamespaceId + Debug, + S: SubspaceId + Debug, + PD: PayloadDigest + Debug, + AT: Debug, + OE: Display + Error, + > Error for EntryIngestionError +{ +} + /// A tuple of an [`AuthorisedEntry`] and how a [`Store`] responded to its ingestion. pub type BulkIngestionResult< const MCL: usize, @@ -79,13 +120,48 @@ pub struct BulkIngestionError< S: SubspaceId, PD: PayloadDigest, AT: AuthorisationToken, - OE, + OE: Error, IngestionError, > { pub ingested: Vec>, pub error: IngestionError, } +impl< + const MCL: usize, + const MCC: usize, + const MPL: usize, + N: NamespaceId, + S: SubspaceId, + PD: PayloadDigest, + AT: AuthorisationToken, + OE: Display + Error, + IngestionError: Error, + > std::fmt::Display for BulkIngestionError +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "An error stopped bulk ingestion after successfully ingesting {:?} entries", + self.ingested.len() + ) + } +} + +impl< + const MCL: usize, + const MCC: usize, + const MPL: usize, + N: NamespaceId + Debug, + S: SubspaceId + Debug, + PD: PayloadDigest + Debug, + AT: AuthorisationToken + Debug, + OE: Display + Error, + IngestionError: Error, + > Error for BulkIngestionError +{ +} + /// Return when a payload is successfully appended to the [`Store`]. #[derive(Debug, Clone)] pub enum PayloadAppendSuccess @@ -115,10 +191,42 @@ pub enum PayloadAppendError { OperationError(OE), } +impl Display for PayloadAppendError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + PayloadAppendError::NotEntryReference => write!( + f, + "None of the entries in the soter reference this payload." + ), + PayloadAppendError::AlreadyHaveIt => { + write!(f, "The payload is already held in storage.") + } + PayloadAppendError::TooManyBytes => write!( + f, + "The payload source produced more bytes than were expected for this payload." + ), + PayloadAppendError::DigestMismatch => { + write!(f, "The complete payload's digest is not what was expected.") + } + PayloadAppendError::OperationError(err) => std::fmt::Display::fmt(err, f), + } + } +} + +impl Error for PayloadAppendError {} + /// Returned when no entry was found for some criteria. #[derive(Debug, Clone)] pub struct NoSuchEntryError; +impl Display for NoSuchEntryError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "No entry was found for the given criteria.") + } +} + +impl Error for NoSuchEntryError {} + /// The order by which entries should be returned for a given query. #[derive(Debug, Clone)] pub enum QueryOrder { @@ -177,6 +285,18 @@ where #[derive(Debug, Clone)] pub struct ResumptionFailedError(pub u64); +impl Display for ResumptionFailedError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "The subscription with ID {:?} could not be resumed.", + self.0 + ) + } +} + +impl Error for ResumptionFailedError {} + /// Describes which entries to ignore during a query. #[derive(Default, Clone)] pub struct QueryIgnoreParams { @@ -203,6 +323,25 @@ pub enum ForgetPayloadError { ReferredToByOtherEntries, } +impl Display for ForgetPayloadError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ForgetPayloadError::NoSuchEntry => { + write!( + f, + "No entry for the given criteria could be found in this store." + ) + } + ForgetPayloadError::ReferredToByOtherEntries => write!( + f, + "The payload could not be forgotten because it is referred to by other entries." + ), + } + } +} + +impl Error for ForgetPayloadError {} + /// The origin of an entry ingestion event. #[derive(Debug, Clone, PartialEq, Eq)] pub enum EntryOrigin { @@ -220,9 +359,9 @@ where PD: PayloadDigest, AT: AuthorisationToken, { - type FlushError; - type BulkIngestionError; - type OperationsError; + type FlushError: Display + Error; + type BulkIngestionError: Display + Error; + type OperationsError: Display + Error; /// The [namespace](https://willowprotocol.org/specs/data-model/index.html#namespace) which all of this store's [`AuthorisedEntry`] belong to. fn namespace_id() -> N; From 914bc631e7d4e35bcb916da255c05a62de38cc09 Mon Sep 17 00:00:00 2001 From: Sam Gwilym Date: Fri, 30 Aug 2024 11:26:54 +0100 Subject: [PATCH 5/5] clearer docs for EntryOrigin --- data-model/src/store.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/data-model/src/store.rs b/data-model/src/store.rs index 04c62ae..fe4386f 100644 --- a/data-model/src/store.rs +++ b/data-model/src/store.rs @@ -347,7 +347,8 @@ impl Error for ForgetPayloadError {} pub enum EntryOrigin { /// The entry was probably created on this machine. Local, - /// The entry was sourced from another device, e.g. a networked sync session. + /// The entry was sourced from another source with an ID assigned by us. + /// This is useful if you want to suppress the forwarding of entries to the peers from which the entry was originally sourced. Remote(u64), }