Skip to content

Commit

Permalink
Merge branch 'main' into store-trait-internal
Browse files Browse the repository at this point in the history
  • Loading branch information
sgwilym authored Aug 30, 2024
2 parents f628ccd + 442854e commit a4aaeeb
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 10 deletions.
2 changes: 2 additions & 0 deletions data-model/src/lengthy_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::{AuthorisationToken, AuthorisedEntry, Entry, NamespaceId, PayloadDige
/// An [`Entry`] together with information about how much of its payload a given [`Store`] holds.
///
/// [Definition](https://willowprotocol.org/specs/3d-range-based-set-reconciliation/index.html#LengthyEntry)
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct LengthyEntry<const MCL: usize, const MCC: usize, const MPL: usize, N, S, PD>
where
N: NamespaceId,
Expand Down Expand Up @@ -56,6 +57,7 @@ where
}

/// An [`AuthorisedEntry`] together with information about how much of its payload a given [`Store`] holds.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct LengthyAuthorisedEntry<
const MCL: usize,
const MCC: usize,
Expand Down
184 changes: 174 additions & 10 deletions data-model/src/store.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use std::future::Future;
use std::{
error::Error,
fmt::{Debug, Display},
future::Future,
};

use ufotofu::{local_nb::Producer, nb::BulkProducer};

Expand All @@ -10,6 +14,7 @@ use crate::{
};

/// Returned when an entry could be ingested into a [`Store`].
#[derive(Debug, Clone)]
pub enum EntryIngestionSuccess<
const MCL: usize,
const MCC: usize,
Expand All @@ -31,6 +36,7 @@ pub enum EntryIngestionSuccess<
}

/// Returned when an entry cannot be ingested into a [`Store`].
#[derive(Debug, Clone)]
pub enum EntryIngestionError<
const MCL: usize,
const MCC: usize,
Expand All @@ -39,7 +45,7 @@ pub enum EntryIngestionError<
S: SubspaceId,
PD: PayloadDigest,
AT,
OE,
OE: Display + Error,
> {
/// The entry belonged to another namespace.
WrongNamespace(AuthorisedEntry<MCL, MCC, MPL, N, S, PD, AT>),
Expand All @@ -49,6 +55,43 @@ pub enum EntryIngestionError<
OperationsError(OE),
}

impl<
const MCL: usize,
const MCC: usize,
const MPL: usize,
N: NamespaceId,
S: SubspaceId,
PD: PayloadDigest,
AT,
OE: Display + Error,
> Display for EntryIngestionError<MCL, MCC, MPL, N, S, PD, AT, OE>
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
EntryIngestionError::WrongNamespace(_) => {
write!(f, "Tried to ingest an entry from a different namespace.")
}
EntryIngestionError::PruningPrevented => {
write!(f, "Entry ingestion would have triggered undesired pruning.")
}
EntryIngestionError::OperationsError(err) => Display::fmt(err, f),
}
}
}

impl<
const MCL: usize,
const MCC: usize,
const MPL: usize,
N: NamespaceId + Debug,
S: SubspaceId + Debug,
PD: PayloadDigest + Debug,
AT: Debug,
OE: Display + Error,
> Error for EntryIngestionError<MCL, MCC, MPL, N, S, PD, AT, OE>
{
}

/// A tuple of an [`AuthorisedEntry`] and how a [`Store`] responded to its ingestion.
pub type BulkIngestionResult<
const MCL: usize,
Expand All @@ -68,6 +111,7 @@ pub type BulkIngestionResult<
);

/// Returned when a bulk ingestion failed due to a consumer error.
#[derive(Debug, Clone)]
pub struct BulkIngestionError<
const MCL: usize,
const MCC: usize,
Expand All @@ -76,14 +120,50 @@ pub struct BulkIngestionError<
S: SubspaceId,
PD: PayloadDigest,
AT: AuthorisationToken<MCL, MCC, MPL, N, S, PD>,
OE,
OE: Error,
IngestionError,
> {
pub ingested: Vec<BulkIngestionResult<MCL, MCC, MPL, N, S, PD, AT, OE>>,
pub error: IngestionError,
}

impl<
const MCL: usize,
const MCC: usize,
const MPL: usize,
N: NamespaceId,
S: SubspaceId,
PD: PayloadDigest,
AT: AuthorisationToken<MCL, MCC, MPL, N, S, PD>,
OE: Display + Error,
IngestionError: Error,
> std::fmt::Display for BulkIngestionError<MCL, MCC, MPL, N, S, PD, AT, OE, IngestionError>
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"An error stopped bulk ingestion after successfully ingesting {:?} entries",
self.ingested.len()
)
}
}

impl<
const MCL: usize,
const MCC: usize,
const MPL: usize,
N: NamespaceId + Debug,
S: SubspaceId + Debug,
PD: PayloadDigest + Debug,
AT: AuthorisationToken<MCL, MCC, MPL, N, S, PD> + Debug,
OE: Display + Error,
IngestionError: Error,
> Error for BulkIngestionError<MCL, MCC, MPL, N, S, PD, AT, OE, IngestionError>
{
}

/// Return when a payload is successfully appended to the [`Store`].
#[derive(Debug, Clone)]
pub enum PayloadAppendSuccess<const MCL: usize, const MCC: usize, const MPL: usize, N, S, PD>
where
N: NamespaceId,
Expand All @@ -97,6 +177,7 @@ where
}

/// Returned when a payload fails to be appended into the [`Store`].
#[derive(Debug, Clone)]
pub enum PayloadAppendError<OE> {
/// None of the entries in the store reference this payload.
NotEntryReference,
Expand All @@ -110,10 +191,44 @@ pub enum PayloadAppendError<OE> {
OperationError(OE),
}

impl<OE: Display + Error> Display for PayloadAppendError<OE> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PayloadAppendError::NotEntryReference => write!(
f,
"None of the entries in the soter reference this payload."
),
PayloadAppendError::AlreadyHaveIt => {
write!(f, "The payload is already held in storage.")
}
PayloadAppendError::TooManyBytes => write!(
f,
"The payload source produced more bytes than were expected for this payload."
),
PayloadAppendError::DigestMismatch => {
write!(f, "The complete payload's digest is not what was expected.")
}
PayloadAppendError::OperationError(err) => std::fmt::Display::fmt(err, f),
}
}
}

impl<OE: Display + Error> Error for PayloadAppendError<OE> {}

/// Returned when no entry was found for some criteria.
#[derive(Debug, Clone)]
pub struct NoSuchEntryError;

impl Display for NoSuchEntryError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "No entry was found for the given criteria.")
}
}

impl Error for NoSuchEntryError {}

/// The order by which entries should be returned for a given query.
#[derive(Debug, Clone)]
pub enum QueryOrder {
/// Ordered by subspace, then path, then timestamp.
Subspace,
Expand All @@ -126,6 +241,7 @@ pub enum QueryOrder {
}

/// Describes an [`AuthorisedEntry`] which was pruned and the [`AuthorisedEntry`] which triggered the pruning.
#[derive(Debug, Clone)]
pub struct PruneEvent<const MCL: usize, const MCC: usize, const MPL: usize, N, S, PD, AT>
where
N: NamespaceId,
Expand All @@ -141,6 +257,7 @@ where

/// An event which took place within a [`Store`].
/// Each event includes a *progress ID* which can be used to *resume* a subscription at any point in the future.
#[derive(Debug, Clone)]
pub enum StoreEvent<const MCL: usize, const MCC: usize, const MPL: usize, N, S, PD, AT>
where
N: NamespaceId,
Expand All @@ -149,7 +266,11 @@ where
AT: AuthorisationToken<MCL, MCC, MPL, N, S, PD>,
{
/// A new entry was ingested.
Ingested(u64, AuthorisedEntry<MCL, MCC, MPL, N, S, PD, AT>),
Ingested(
u64,
AuthorisedEntry<MCL, MCC, MPL, N, S, PD, AT>,
EntryOrigin,
),
/// An existing entry received a portion of its corresponding payload.
Appended(u64, LengthyAuthorisedEntry<MCL, MCC, MPL, N, S, PD, AT>),
/// An entry was forgotten.
Expand All @@ -161,10 +282,23 @@ where
}

/// Returned when the store chooses to not resume a subscription.
#[derive(Debug, Clone)]
pub struct ResumptionFailedError(pub u64);

impl Display for ResumptionFailedError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"The subscription with ID {:?} could not be resumed.",
self.0
)
}
}

impl Error for ResumptionFailedError {}

/// Describes which entries to ignore during a query.
#[derive(Default)]
#[derive(Default, Clone)]
pub struct QueryIgnoreParams {
/// Omit entries with locally incomplete corresponding payloads.
pub ignore_incomplete_payloads: bool,
Expand All @@ -183,11 +317,41 @@ impl QueryIgnoreParams {
}

/// Returned when a payload could not be forgotten.
#[derive(Debug, Clone)]
pub enum ForgetPayloadError {
NoSuchEntry,
ReferredToByOtherEntries,
}

impl Display for ForgetPayloadError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ForgetPayloadError::NoSuchEntry => {
write!(
f,
"No entry for the given criteria could be found in this store."
)
}
ForgetPayloadError::ReferredToByOtherEntries => write!(
f,
"The payload could not be forgotten because it is referred to by other entries."
),
}
}
}

impl Error for ForgetPayloadError {}

/// The origin of an entry ingestion event.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum EntryOrigin {
/// The entry was probably created on this machine.
Local,
/// The entry was sourced from another source with an ID assigned by us.
/// This is useful if you want to suppress the forwarding of entries to the peers from which the entry was originally sourced.
Remote(u64),
}

/// A [`Store`] is a set of [`AuthorisedEntry`] belonging to a single namespace, and a (possibly partial) corresponding set of payloads.
pub trait Store<const MCL: usize, const MCC: usize, const MPL: usize, N, S, PD, AT>
where
Expand All @@ -196,9 +360,9 @@ where
PD: PayloadDigest,
AT: AuthorisationToken<MCL, MCC, MPL, N, S, PD>,
{
type FlushError;
type BulkIngestionError;
type OperationsError;
type FlushError: Display + Error;
type BulkIngestionError: Display + Error;
type OperationsError: Display + Error;

/// The [namespace](https://willowprotocol.org/specs/data-model/index.html#namespace) which all of this store's [`AuthorisedEntry`] belong to.
fn namespace_id() -> N;
Expand Down Expand Up @@ -363,14 +527,14 @@ where
/// Subscribe to events concerning entries [included](https://willowprotocol.org/specs/grouping-entries/index.html#area_include) by an [`AreaOfInterest`], returning a producer of `StoreEvent`s which occurred since the moment of calling this function.
fn subscribe_area(
&self,
area: &AreaOfInterest<MCL, MCC, MPL, S>,
area: &Area<MCL, MCC, MPL, S>,
ignore: Option<QueryIgnoreParams>,
) -> impl Producer<Item = StoreEvent<MCL, MCC, MPL, N, S, PD, AT>>;

/// Attempt to resume a subscription using a *progress ID* obtained from a previous subscription, or return an error if this store implementation is unable to resume the subscription.
fn resume_subscription(
progress_id: u64,
area: AreaOfInterest<MCL, MCC, MPL, S>,
area: &Area<MCL, MCC, MPL, S>,
ignore: Option<QueryIgnoreParams>,
) -> Result<impl Producer<Item = StoreEvent<MCL, MCC, MPL, N, S, PD, AT>>, ResumptionFailedError>;
}

0 comments on commit a4aaeeb

Please sign in to comment.