Skip to content

Commit

Permalink
ChainDB: let the BlockFetch client add blocks asynchronously
Browse files Browse the repository at this point in the history
Port of IntersectMBO/ouroboros-network#2721

Co-authored-by: Thomas Winant <[email protected]>
Co-authored-by: Alexander Esgen <[email protected]>
  • Loading branch information
3 people authored and neilmayhew committed Dec 18, 2024
1 parent b32c5f6 commit 5f35f21
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 38 deletions.
1 change: 1 addition & 0 deletions ouroboros-consensus/ouroboros-consensus.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ library
io-classes ^>=1.5,
measures,
mtl,
multiset ^>=0.3,
nothunks ^>=0.2,
ouroboros-network-api ^>=0.11,
ouroboros-network-mock ^>=0.1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import Ouroboros.Consensus.Ledger.SupportsProtocol
(LedgerSupportsProtocol)
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CSClient
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.Jumping as CSJumping
import Ouroboros.Consensus.Storage.ChainDB.API (ChainDB)
import Ouroboros.Consensus.Storage.ChainDB.API (AddBlockPromise,
ChainDB)
import qualified Ouroboros.Consensus.Storage.ChainDB.API as ChainDB
import Ouroboros.Consensus.Storage.ChainDB.API.Types.InvalidBlockPunishment
(InvalidBlockPunishment)
Expand All @@ -57,16 +58,16 @@ data ChainDbView m blk = ChainDbView {
getCurrentChain :: STM m (AnchoredFragment (Header blk))
, getIsFetched :: STM m (Point blk -> Bool)
, getMaxSlotNo :: STM m MaxSlotNo
, addBlockWaitWrittenToDisk :: InvalidBlockPunishment m -> blk -> m Bool
, addBlockAsync :: InvalidBlockPunishment m -> blk -> m (AddBlockPromise m blk)
, getChainSelStarvation :: STM m ChainSelStarvation
}

defaultChainDbView :: IOLike m => ChainDB m blk -> ChainDbView m blk
defaultChainDbView :: ChainDB m blk -> ChainDbView m blk
defaultChainDbView chainDB = ChainDbView {
getCurrentChain = ChainDB.getCurrentChain chainDB
, getIsFetched = ChainDB.getIsFetched chainDB
, getMaxSlotNo = ChainDB.getMaxSlotNo chainDB
, addBlockWaitWrittenToDisk = ChainDB.addBlockWaitWrittenToDisk chainDB
, addBlockAsync = ChainDB.addBlockAsync chainDB
, getChainSelStarvation = ChainDB.getChainSelStarvation chainDB
}

Expand Down Expand Up @@ -217,8 +218,8 @@ mkBlockFetchConsensusInterface
pipeliningPunishment <- InvalidBlockPunishment.mkForDiffusionPipelining
pure $ mkAddFetchedBlock_ pipeliningPunishment pipelining

-- Waits until the block has been written to disk, but not until chain
-- selection has processed the block.
-- Hand over the block to the ChainDB, but don't wait until it has been
-- written to disk or processed.
mkAddFetchedBlock_ ::
( BlockConfig blk
-> Header blk
Expand Down Expand Up @@ -262,7 +263,7 @@ mkBlockFetchConsensusInterface
DiffusionPipeliningOff -> disconnect
DiffusionPipeliningOn ->
pipeliningPunishment bcfg (getHeader blk) disconnect
addBlockWaitWrittenToDisk
addBlockAsync
chainDB
punishment
blk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,9 @@ data ChainDB m blk = ChainDB {
, getBlockComponent :: forall b. BlockComponent blk b
-> RealPoint blk -> m (Maybe b)

-- | Return membership check function for recent blocks
-- | Return membership check function for recent blocks. This includes
-- blocks in the VolatileDB and blocks that are currently being processed
-- or are waiting in a queue to be processed.
--
-- This check is only reliable for blocks up to @k@ away from the tip.
-- For blocks older than that the results should be regarded as
Expand All @@ -238,7 +240,8 @@ data ChainDB m blk = ChainDB {
-- are part of a shorter fork.
, getIsValid :: STM m (RealPoint blk -> Maybe Bool)

-- | Get the highest slot number stored in the ChainDB.
-- | Get the highest slot number stored in the ChainDB (this includes
-- blocks that are waiting in the background queue to be processed).
--
-- Note that the corresponding block doesn't have to be part of the
-- current chain, it could be part of some fork, or even be a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,7 @@ addBlockRunner fuse cdb@CDB{..} = forever $ do
ChainSelAddBlock BlockToAdd{blockToAdd} ->
trace $ PoppedBlockFromQueue $ FallingEdgeWith $
blockRealPoint blockToAdd
chainSelSync cdb message)
chainSelSync cdb message
lift $ atomically $ processedChainSelMessage cdbChainSelQueue message)
where
starvationTracer = Tracer $ traceWith cdbTracer . TraceChainSelStarvationEvent
Original file line number Diff line number Diff line change
Expand Up @@ -164,18 +164,15 @@ getBlockComponent ::
getBlockComponent CDB{..} = getAnyBlockComponent cdbImmutableDB cdbVolatileDB

getIsFetched ::
forall m blk. IOLike m
forall m blk. (IOLike m, HasHeader blk)
=> ChainDbEnv m blk -> STM m (Point blk -> Bool)
getIsFetched CDB{..} = basedOnHash <$> VolatileDB.getIsMember cdbVolatileDB
where
-- The volatile DB indexes by hash only, not by points. However, it should
-- not be possible to have two points with the same hash but different
-- slot numbers.
basedOnHash :: (HeaderHash blk -> Bool) -> Point blk -> Bool
basedOnHash f p =
case pointHash p of
BlockHash hash -> f hash
GenesisHash -> False
getIsFetched CDB{..} = do
checkQueue <- memberChainSelQueue cdbChainSelQueue
checkVolDb <- VolatileDB.getIsMember cdbVolatileDB
return $ \pt ->
case pointToWithOriginRealPoint pt of
Origin -> False
NotOrigin pt' -> checkQueue pt' || checkVolDb (realPointHash pt')

getIsInvalidBlock ::
forall m blk. (IOLike m, HasHeader blk)
Expand Down Expand Up @@ -218,10 +215,13 @@ getMaxSlotNo CDB{..} = do
-- contains block 9'. The ImmutableDB contains blocks 1-10. The max slot
-- of the current chain will be 10 (being the anchor point of the empty
-- current chain), while the max slot of the VolatileDB will be 9.
--
-- Moreover, we have to look in 'ChainSelQueue' too.
curChainMaxSlotNo <- maxSlotNoFromWithOrigin . AF.headSlot
<$> readTVar cdbChain
volatileDbMaxSlotNo <- VolatileDB.getMaxSlotNo cdbVolatileDB
return $ curChainMaxSlotNo `max` volatileDbMaxSlotNo
volatileDbMaxSlotNo <- VolatileDB.getMaxSlotNo cdbVolatileDB
queuedMaxSlotNo <- getMaxSlotNoChainSelQueue cdbChainSelQueue
return $ curChainMaxSlotNo `max` volatileDbMaxSlotNo `max` queuedMaxSlotNo

{-------------------------------------------------------------------------------
Unifying interface over the immutable DB and volatile DB, but independent
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
Expand Down Expand Up @@ -42,12 +43,15 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Types (
-- * Blocks to add
, BlockToAdd (..)
, ChainSelMessage (..)
, ChainSelQueue
, ChainSelQueue -- opaque
, addBlockToAdd
, addReprocessLoEBlocks
, closeChainSelQueue
, getChainSelMessage
, getMaxSlotNoChainSelQueue
, memberChainSelQueue
, newChainSelQueue
, processedChainSelMessage
-- * Trace types
, SelectionChangedInfo (..)
, TraceAddBlockEvent (..)
Expand All @@ -63,14 +67,15 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Types (
, TraceValidationEvent (..)
) where

import Cardano.Prelude (whenM)
import Control.Monad (when)
import Control.ResourceRegistry
import Control.Tracer
import Data.Foldable (traverse_)
import Data.Map.Strict (Map)
import Data.Maybe (mapMaybe)
import Data.Maybe.Strict (StrictMaybe (..))
import Data.MultiSet (MultiSet)
import qualified Data.MultiSet as MultiSet
import Data.Set (Set)
import Data.Typeable
import Data.Void (Void)
Expand Down Expand Up @@ -107,7 +112,7 @@ import Ouroboros.Consensus.Util.Enclose (Enclosing, Enclosing' (..))
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.STM (WithFingerprint)
import Ouroboros.Network.AnchoredFragment (AnchoredFragment)
import Ouroboros.Network.Block (MaxSlotNo)
import Ouroboros.Network.Block (MaxSlotNo (..))
import Ouroboros.Network.BlockFetch.ConsensusInterface
(ChainSelStarvation (..))

Expand Down Expand Up @@ -419,7 +424,19 @@ data InvalidBlockInfo blk = InvalidBlockInfo
-- | FIFO queue used to add blocks asynchronously to the ChainDB. Blocks are
-- read from this queue by a background thread, which processes the blocks
-- synchronously.
newtype ChainSelQueue m blk = ChainSelQueue (TBQueue m (ChainSelMessage m blk))
--
-- We also maintain a multiset of the points of all of the blocks in the queue,
-- plus potentially the one block for which chain selection is currently in
-- progress. It is used to account for queued blocks in eg 'getIsFetched' and
-- 'getMaxSlotNo'.
--
-- INVARIANT: Counted with multiplicity, @varChainSelPoints@ contains exactly
-- the same hashes or at most one additional hash compared to the hashes of
-- blocks in @varChainSelQueue@.
data ChainSelQueue m blk = ChainSelQueue {
varChainSelQueue :: TBQueue m (ChainSelMessage m blk)
, varChainSelPoints :: StrictTVar m (MultiSet (RealPoint blk))
}
deriving NoThunks via OnlyCheckWhnfNamed "ChainSelQueue" (ChainSelQueue m blk)

-- | Entry in the 'ChainSelQueue' queue: a block together with the 'TMVar's used
Expand All @@ -445,9 +462,14 @@ data ChainSelMessage m blk
-- ^ Used for 'ChainSelectionPromise'.

-- | Create a new 'ChainSelQueue' with the given size.
newChainSelQueue :: IOLike m => Word -> m (ChainSelQueue m blk)
newChainSelQueue queueSize = ChainSelQueue <$>
atomically (newTBQueue (fromIntegral queueSize))
newChainSelQueue :: (IOLike m, StandardHash blk, Typeable blk) => Word -> m (ChainSelQueue m blk)
newChainSelQueue chainSelQueueCapacity = do
varChainSelQueue <- newTBQueueIO (fromIntegral chainSelQueueCapacity)
varChainSelPoints <- newTVarIO MultiSet.empty
pure ChainSelQueue {
varChainSelQueue
, varChainSelPoints
}

-- | Add a block to the 'ChainSelQueue' queue. Can block when the queue is full.
addBlockToAdd ::
Expand All @@ -457,7 +479,7 @@ addBlockToAdd ::
-> InvalidBlockPunishment m
-> blk
-> m (AddBlockPromise m blk)
addBlockToAdd tracer (ChainSelQueue queue) punish blk = do
addBlockToAdd tracer (ChainSelQueue {varChainSelQueue, varChainSelPoints}) punish blk = do
varBlockWrittenToDisk <- newEmptyTMVarIO
varBlockProcessed <- newEmptyTMVarIO
let !toAdd = BlockToAdd
Expand All @@ -466,10 +488,12 @@ addBlockToAdd tracer (ChainSelQueue queue) punish blk = do
, varBlockWrittenToDisk
, varBlockProcessed
}
traceWith tracer $ AddedBlockToQueue (blockRealPoint blk) RisingEdge
pt = blockRealPoint blk
traceWith tracer $ AddedBlockToQueue pt RisingEdge
queueSize <- atomically $ do
writeTBQueue queue (ChainSelAddBlock toAdd)
lengthTBQueue queue
writeTBQueue varChainSelQueue (ChainSelAddBlock toAdd)
modifyTVar varChainSelPoints $ MultiSet.insert pt
lengthTBQueue varChainSelQueue
traceWith tracer $
AddedBlockToQueue (blockRealPoint blk) (FallingEdgeWith (fromIntegral queueSize))
return AddBlockPromise
Expand All @@ -483,11 +507,12 @@ addReprocessLoEBlocks
=> Tracer m (TraceAddBlockEvent blk)
-> ChainSelQueue m blk
-> m (ChainSelectionPromise m)
addReprocessLoEBlocks tracer (ChainSelQueue queue) = do
addReprocessLoEBlocks tracer ChainSelQueue {varChainSelQueue} = do
varProcessed <- newEmptyTMVarIO
let waitUntilRan = atomically $ readTMVar varProcessed
traceWith tracer $ AddedReprocessLoEBlocksToQueue
atomically $ writeTBQueue queue $ ChainSelReprocessLoEBlocks varProcessed
atomically $ writeTBQueue varChainSelQueue $
ChainSelReprocessLoEBlocks varProcessed
return $ ChainSelectionPromise waitUntilRan

-- | Get the oldest message from the 'ChainSelQueue' queue. Can block when the
Expand All @@ -499,7 +524,7 @@ getChainSelMessage
-> StrictTVar m ChainSelStarvation
-> ChainSelQueue m blk
-> m (ChainSelMessage m blk)
getChainSelMessage starvationTracer starvationVar (ChainSelQueue queue) =
getChainSelMessage starvationTracer starvationVar chainSelQueue =
atomically (tryReadTBQueue' queue) >>= \case
Just msg -> pure msg
Nothing -> do
Expand All @@ -508,6 +533,10 @@ getChainSelMessage starvationTracer starvationVar (ChainSelQueue queue) =
terminateStarvationMeasure msg
pure msg
where
ChainSelQueue {
varChainSelQueue = queue
} = chainSelQueue

startStarvationMeasure :: m ()
startStarvationMeasure = do
prevStarvation <- atomically $ swapTVar starvationVar ChainSelStarvationOngoing
Expand All @@ -531,7 +560,7 @@ tryReadTBQueue' q = (Just <$> readTBQueue q) `orElse` pure Nothing
-- | Flush the 'ChainSelQueue' queue and notify the waiting threads.
--
closeChainSelQueue :: IOLike m => ChainSelQueue m blk -> STM m ()
closeChainSelQueue (ChainSelQueue queue) = do
closeChainSelQueue ChainSelQueue{varChainSelQueue = queue} = do
as <- mapMaybe blockAdd <$> flushTBQueue queue
traverse_ (\a -> tryPutTMVar (varBlockProcessed a)
(FailedToAddBlock "Queue flushed"))
Expand All @@ -541,6 +570,41 @@ closeChainSelQueue (ChainSelQueue queue) = do
ChainSelAddBlock ab -> Just ab
ChainSelReprocessLoEBlocks _ -> Nothing

-- | To invoke when the given 'ChainSelMessage' has been processed by ChainSel.
-- This is used to remove the respective point from the multiset of points in
-- the 'ChainSelQueue' (as the block has now been written to disk by ChainSel).
processedChainSelMessage ::
(IOLike m, HasHeader blk)
=> ChainSelQueue m blk
-> ChainSelMessage m blk
-> STM m ()
processedChainSelMessage ChainSelQueue {varChainSelPoints} = \case
ChainSelAddBlock BlockToAdd{blockToAdd = blk} ->
modifyTVar varChainSelPoints $ MultiSet.delete (blockRealPoint blk)
ChainSelReprocessLoEBlocks{} ->
pure ()

-- | Return a function to test the membership
memberChainSelQueue ::
(IOLike m, HasHeader blk)
=> ChainSelQueue m blk
-> STM m (RealPoint blk -> Bool)
memberChainSelQueue ChainSelQueue {varChainSelPoints} =
flip MultiSet.member <$> readTVar varChainSelPoints

getMaxSlotNoChainSelQueue ::
IOLike m
=> ChainSelQueue m blk
-> STM m MaxSlotNo
getMaxSlotNoChainSelQueue ChainSelQueue {varChainSelPoints} =
aux <$> readTVar varChainSelPoints
where
-- | The 'Ord' instance of 'RealPoint' orders by 'SlotNo' first, so the
-- maximal key of the map has the greatest 'SlotNo'.
aux :: MultiSet (RealPoint blk) -> MaxSlotNo
aux pts = case MultiSet.maxView pts of
Nothing -> NoMaxSlotNo
Just (RealPoint s _, _) -> MaxSlotNo s

{-------------------------------------------------------------------------------
Trace types
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import Data.Bimap (Bimap)
import qualified Data.Bimap as Bimap
import Data.IntPSQ (IntPSQ)
import qualified Data.IntPSQ as PSQ
import Data.MultiSet (MultiSet)
import qualified Data.MultiSet as MultiSet
import Data.SOP.BasicFunctors
import NoThunks.Class (InspectHeap (..), InspectHeapNamed (..),
NoThunks (..), OnlyCheckWhnfNamed (..), allNoThunks,
Expand Down Expand Up @@ -75,6 +77,10 @@ instance NoThunks a => NoThunks (K a b) where
showTypeOf _ = showTypeOf (Proxy @a)
wNoThunks ctxt (K a) = wNoThunks ("K":ctxt) a

instance NoThunks a => NoThunks (MultiSet a) where
showTypeOf _ = "MultiSet"
wNoThunks ctxt = wNoThunks ctxt . MultiSet.toMap

{-------------------------------------------------------------------------------
fs-api
-------------------------------------------------------------------------------}
Expand Down

0 comments on commit 5f35f21

Please sign in to comment.