diff --git a/ouroboros-consensus/ouroboros-consensus.cabal b/ouroboros-consensus/ouroboros-consensus.cabal index 2a5d2e6cda..360c4265f1 100644 --- a/ouroboros-consensus/ouroboros-consensus.cabal +++ b/ouroboros-consensus/ouroboros-consensus.cabal @@ -292,6 +292,7 @@ library io-classes ^>=1.5, measures, mtl, + multiset ^>=0.3, nothunks ^>=0.2, ouroboros-network-api ^>=0.11, ouroboros-network-mock ^>=0.1, diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/BlockFetch/ClientInterface.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/BlockFetch/ClientInterface.hs index 73402ba489..9a8c6b39f3 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/BlockFetch/ClientInterface.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/BlockFetch/ClientInterface.hs @@ -31,7 +31,8 @@ import Ouroboros.Consensus.Ledger.SupportsProtocol (LedgerSupportsProtocol) import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CSClient import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.Jumping as CSJumping -import Ouroboros.Consensus.Storage.ChainDB.API (ChainDB) +import Ouroboros.Consensus.Storage.ChainDB.API (AddBlockPromise, + ChainDB) import qualified Ouroboros.Consensus.Storage.ChainDB.API as ChainDB import Ouroboros.Consensus.Storage.ChainDB.API.Types.InvalidBlockPunishment (InvalidBlockPunishment) @@ -57,16 +58,16 @@ data ChainDbView m blk = ChainDbView { getCurrentChain :: STM m (AnchoredFragment (Header blk)) , getIsFetched :: STM m (Point blk -> Bool) , getMaxSlotNo :: STM m MaxSlotNo - , addBlockWaitWrittenToDisk :: InvalidBlockPunishment m -> blk -> m Bool + , addBlockAsync :: InvalidBlockPunishment m -> blk -> m (AddBlockPromise m blk) , getChainSelStarvation :: STM m ChainSelStarvation } -defaultChainDbView :: IOLike m => ChainDB m blk -> ChainDbView m blk +defaultChainDbView :: ChainDB m blk -> ChainDbView m blk defaultChainDbView chainDB = ChainDbView { getCurrentChain = ChainDB.getCurrentChain chainDB , getIsFetched = ChainDB.getIsFetched chainDB , getMaxSlotNo = ChainDB.getMaxSlotNo chainDB - , addBlockWaitWrittenToDisk = ChainDB.addBlockWaitWrittenToDisk chainDB + , addBlockAsync = ChainDB.addBlockAsync chainDB , getChainSelStarvation = ChainDB.getChainSelStarvation chainDB } @@ -217,8 +218,8 @@ mkBlockFetchConsensusInterface pipeliningPunishment <- InvalidBlockPunishment.mkForDiffusionPipelining pure $ mkAddFetchedBlock_ pipeliningPunishment pipelining - -- Waits until the block has been written to disk, but not until chain - -- selection has processed the block. + -- Hand over the block to the ChainDB, but don't wait until it has been + -- written to disk or processed. mkAddFetchedBlock_ :: ( BlockConfig blk -> Header blk @@ -262,7 +263,7 @@ mkBlockFetchConsensusInterface DiffusionPipeliningOff -> disconnect DiffusionPipeliningOn -> pipeliningPunishment bcfg (getHeader blk) disconnect - addBlockWaitWrittenToDisk + addBlockAsync chainDB punishment blk diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/API.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/API.hs index fe20cb6042..dfc656e4c3 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/API.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/API.hs @@ -212,7 +212,9 @@ data ChainDB m blk = ChainDB { , getBlockComponent :: forall b. BlockComponent blk b -> RealPoint blk -> m (Maybe b) - -- | Return membership check function for recent blocks + -- | Return membership check function for recent blocks. This includes + -- blocks in the VolatileDB and blocks that are currently being processed + -- or are waiting in a queue to be processed. -- -- This check is only reliable for blocks up to @k@ away from the tip. -- For blocks older than that the results should be regarded as @@ -238,7 +240,8 @@ data ChainDB m blk = ChainDB { -- are part of a shorter fork. , getIsValid :: STM m (RealPoint blk -> Maybe Bool) - -- | Get the highest slot number stored in the ChainDB. + -- | Get the highest slot number stored in the ChainDB (this includes + -- blocks that are waiting in the background queue to be processed). -- -- Note that the corresponding block doesn't have to be part of the -- current chain, it could be part of some fork, or even be a diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs index 977221f40d..9d7c31af8a 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs @@ -541,6 +541,7 @@ addBlockRunner fuse cdb@CDB{..} = forever $ do ChainSelAddBlock BlockToAdd{blockToAdd} -> trace $ PoppedBlockFromQueue $ FallingEdgeWith $ blockRealPoint blockToAdd - chainSelSync cdb message) + chainSelSync cdb message + lift $ atomically $ processedChainSelMessage cdbChainSelQueue message) where starvationTracer = Tracer $ traceWith cdbTracer . TraceChainSelStarvationEvent diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Query.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Query.hs index b090b41629..9d03c5f5a0 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Query.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Query.hs @@ -164,18 +164,15 @@ getBlockComponent :: getBlockComponent CDB{..} = getAnyBlockComponent cdbImmutableDB cdbVolatileDB getIsFetched :: - forall m blk. IOLike m + forall m blk. (IOLike m, HasHeader blk) => ChainDbEnv m blk -> STM m (Point blk -> Bool) -getIsFetched CDB{..} = basedOnHash <$> VolatileDB.getIsMember cdbVolatileDB - where - -- The volatile DB indexes by hash only, not by points. However, it should - -- not be possible to have two points with the same hash but different - -- slot numbers. - basedOnHash :: (HeaderHash blk -> Bool) -> Point blk -> Bool - basedOnHash f p = - case pointHash p of - BlockHash hash -> f hash - GenesisHash -> False +getIsFetched CDB{..} = do + checkQueue <- memberChainSelQueue cdbChainSelQueue + checkVolDb <- VolatileDB.getIsMember cdbVolatileDB + return $ \pt -> + case pointToWithOriginRealPoint pt of + Origin -> False + NotOrigin pt' -> checkQueue pt' || checkVolDb (realPointHash pt') getIsInvalidBlock :: forall m blk. (IOLike m, HasHeader blk) @@ -218,10 +215,13 @@ getMaxSlotNo CDB{..} = do -- contains block 9'. The ImmutableDB contains blocks 1-10. The max slot -- of the current chain will be 10 (being the anchor point of the empty -- current chain), while the max slot of the VolatileDB will be 9. + -- + -- Moreover, we have to look in 'ChainSelQueue' too. curChainMaxSlotNo <- maxSlotNoFromWithOrigin . AF.headSlot <$> readTVar cdbChain - volatileDbMaxSlotNo <- VolatileDB.getMaxSlotNo cdbVolatileDB - return $ curChainMaxSlotNo `max` volatileDbMaxSlotNo + volatileDbMaxSlotNo <- VolatileDB.getMaxSlotNo cdbVolatileDB + queuedMaxSlotNo <- getMaxSlotNoChainSelQueue cdbChainSelQueue + return $ curChainMaxSlotNo `max` volatileDbMaxSlotNo `max` queuedMaxSlotNo {------------------------------------------------------------------------------- Unifying interface over the immutable DB and volatile DB, but independent diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs index cda6592b7f..a58c711098 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Types.hs @@ -1,4 +1,5 @@ {-# LANGUAGE BangPatterns #-} +{-# LANGUAGE BlockArguments #-} {-# LANGUAGE DataKinds #-} {-# LANGUAGE DeriveAnyClass #-} {-# LANGUAGE DeriveGeneric #-} @@ -42,12 +43,15 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Types ( -- * Blocks to add , BlockToAdd (..) , ChainSelMessage (..) - , ChainSelQueue + , ChainSelQueue -- opaque , addBlockToAdd , addReprocessLoEBlocks , closeChainSelQueue , getChainSelMessage + , getMaxSlotNoChainSelQueue + , memberChainSelQueue , newChainSelQueue + , processedChainSelMessage -- * Trace types , SelectionChangedInfo (..) , TraceAddBlockEvent (..) @@ -63,7 +67,6 @@ module Ouroboros.Consensus.Storage.ChainDB.Impl.Types ( , TraceValidationEvent (..) ) where -import Cardano.Prelude (whenM) import Control.Monad (when) import Control.ResourceRegistry import Control.Tracer @@ -71,6 +74,8 @@ import Data.Foldable (traverse_) import Data.Map.Strict (Map) import Data.Maybe (mapMaybe) import Data.Maybe.Strict (StrictMaybe (..)) +import Data.MultiSet (MultiSet) +import qualified Data.MultiSet as MultiSet import Data.Set (Set) import Data.Typeable import Data.Void (Void) @@ -107,7 +112,7 @@ import Ouroboros.Consensus.Util.Enclose (Enclosing, Enclosing' (..)) import Ouroboros.Consensus.Util.IOLike import Ouroboros.Consensus.Util.STM (WithFingerprint) import Ouroboros.Network.AnchoredFragment (AnchoredFragment) -import Ouroboros.Network.Block (MaxSlotNo) +import Ouroboros.Network.Block (MaxSlotNo (..)) import Ouroboros.Network.BlockFetch.ConsensusInterface (ChainSelStarvation (..)) @@ -419,7 +424,19 @@ data InvalidBlockInfo blk = InvalidBlockInfo -- | FIFO queue used to add blocks asynchronously to the ChainDB. Blocks are -- read from this queue by a background thread, which processes the blocks -- synchronously. -newtype ChainSelQueue m blk = ChainSelQueue (TBQueue m (ChainSelMessage m blk)) +-- +-- We also maintain a multiset of the points of all of the blocks in the queue, +-- plus potentially the one block for which chain selection is currently in +-- progress. It is used to account for queued blocks in eg 'getIsFetched' and +-- 'getMaxSlotNo'. +-- +-- INVARIANT: Counted with multiplicity, @varChainSelPoints@ contains exactly +-- the same hashes or at most one additional hash compared to the hashes of +-- blocks in @varChainSelQueue@. +data ChainSelQueue m blk = ChainSelQueue { + varChainSelQueue :: TBQueue m (ChainSelMessage m blk) + , varChainSelPoints :: StrictTVar m (MultiSet (RealPoint blk)) + } deriving NoThunks via OnlyCheckWhnfNamed "ChainSelQueue" (ChainSelQueue m blk) -- | Entry in the 'ChainSelQueue' queue: a block together with the 'TMVar's used @@ -445,9 +462,14 @@ data ChainSelMessage m blk -- ^ Used for 'ChainSelectionPromise'. -- | Create a new 'ChainSelQueue' with the given size. -newChainSelQueue :: IOLike m => Word -> m (ChainSelQueue m blk) -newChainSelQueue queueSize = ChainSelQueue <$> - atomically (newTBQueue (fromIntegral queueSize)) +newChainSelQueue :: (IOLike m, StandardHash blk, Typeable blk) => Word -> m (ChainSelQueue m blk) +newChainSelQueue chainSelQueueCapacity = do + varChainSelQueue <- newTBQueueIO (fromIntegral chainSelQueueCapacity) + varChainSelPoints <- newTVarIO MultiSet.empty + pure ChainSelQueue { + varChainSelQueue + , varChainSelPoints + } -- | Add a block to the 'ChainSelQueue' queue. Can block when the queue is full. addBlockToAdd :: @@ -457,7 +479,7 @@ addBlockToAdd :: -> InvalidBlockPunishment m -> blk -> m (AddBlockPromise m blk) -addBlockToAdd tracer (ChainSelQueue queue) punish blk = do +addBlockToAdd tracer (ChainSelQueue {varChainSelQueue, varChainSelPoints}) punish blk = do varBlockWrittenToDisk <- newEmptyTMVarIO varBlockProcessed <- newEmptyTMVarIO let !toAdd = BlockToAdd @@ -466,10 +488,12 @@ addBlockToAdd tracer (ChainSelQueue queue) punish blk = do , varBlockWrittenToDisk , varBlockProcessed } - traceWith tracer $ AddedBlockToQueue (blockRealPoint blk) RisingEdge + pt = blockRealPoint blk + traceWith tracer $ AddedBlockToQueue pt RisingEdge queueSize <- atomically $ do - writeTBQueue queue (ChainSelAddBlock toAdd) - lengthTBQueue queue + writeTBQueue varChainSelQueue (ChainSelAddBlock toAdd) + modifyTVar varChainSelPoints $ MultiSet.insert pt + lengthTBQueue varChainSelQueue traceWith tracer $ AddedBlockToQueue (blockRealPoint blk) (FallingEdgeWith (fromIntegral queueSize)) return AddBlockPromise @@ -483,11 +507,12 @@ addReprocessLoEBlocks => Tracer m (TraceAddBlockEvent blk) -> ChainSelQueue m blk -> m (ChainSelectionPromise m) -addReprocessLoEBlocks tracer (ChainSelQueue queue) = do +addReprocessLoEBlocks tracer ChainSelQueue {varChainSelQueue} = do varProcessed <- newEmptyTMVarIO let waitUntilRan = atomically $ readTMVar varProcessed traceWith tracer $ AddedReprocessLoEBlocksToQueue - atomically $ writeTBQueue queue $ ChainSelReprocessLoEBlocks varProcessed + atomically $ writeTBQueue varChainSelQueue $ + ChainSelReprocessLoEBlocks varProcessed return $ ChainSelectionPromise waitUntilRan -- | Get the oldest message from the 'ChainSelQueue' queue. Can block when the @@ -499,7 +524,7 @@ getChainSelMessage -> StrictTVar m ChainSelStarvation -> ChainSelQueue m blk -> m (ChainSelMessage m blk) -getChainSelMessage starvationTracer starvationVar (ChainSelQueue queue) = +getChainSelMessage starvationTracer starvationVar chainSelQueue = atomically (tryReadTBQueue' queue) >>= \case Just msg -> pure msg Nothing -> do @@ -508,6 +533,10 @@ getChainSelMessage starvationTracer starvationVar (ChainSelQueue queue) = terminateStarvationMeasure msg pure msg where + ChainSelQueue { + varChainSelQueue = queue + } = chainSelQueue + startStarvationMeasure :: m () startStarvationMeasure = do prevStarvation <- atomically $ swapTVar starvationVar ChainSelStarvationOngoing @@ -531,7 +560,7 @@ tryReadTBQueue' q = (Just <$> readTBQueue q) `orElse` pure Nothing -- | Flush the 'ChainSelQueue' queue and notify the waiting threads. -- closeChainSelQueue :: IOLike m => ChainSelQueue m blk -> STM m () -closeChainSelQueue (ChainSelQueue queue) = do +closeChainSelQueue ChainSelQueue{varChainSelQueue = queue} = do as <- mapMaybe blockAdd <$> flushTBQueue queue traverse_ (\a -> tryPutTMVar (varBlockProcessed a) (FailedToAddBlock "Queue flushed")) @@ -541,6 +570,41 @@ closeChainSelQueue (ChainSelQueue queue) = do ChainSelAddBlock ab -> Just ab ChainSelReprocessLoEBlocks _ -> Nothing +-- | To invoke when the given 'ChainSelMessage' has been processed by ChainSel. +-- This is used to remove the respective point from the multiset of points in +-- the 'ChainSelQueue' (as the block has now been written to disk by ChainSel). +processedChainSelMessage :: + (IOLike m, HasHeader blk) + => ChainSelQueue m blk + -> ChainSelMessage m blk + -> STM m () +processedChainSelMessage ChainSelQueue {varChainSelPoints} = \case + ChainSelAddBlock BlockToAdd{blockToAdd = blk} -> + modifyTVar varChainSelPoints $ MultiSet.delete (blockRealPoint blk) + ChainSelReprocessLoEBlocks{} -> + pure () + +-- | Return a function to test the membership +memberChainSelQueue :: + (IOLike m, HasHeader blk) + => ChainSelQueue m blk + -> STM m (RealPoint blk -> Bool) +memberChainSelQueue ChainSelQueue {varChainSelPoints} = + flip MultiSet.member <$> readTVar varChainSelPoints + +getMaxSlotNoChainSelQueue :: + IOLike m + => ChainSelQueue m blk + -> STM m MaxSlotNo +getMaxSlotNoChainSelQueue ChainSelQueue {varChainSelPoints} = + aux <$> readTVar varChainSelPoints + where + -- | The 'Ord' instance of 'RealPoint' orders by 'SlotNo' first, so the + -- maximal key of the map has the greatest 'SlotNo'. + aux :: MultiSet (RealPoint blk) -> MaxSlotNo + aux pts = case MultiSet.maxView pts of + Nothing -> NoMaxSlotNo + Just (RealPoint s _, _) -> MaxSlotNo s {------------------------------------------------------------------------------- Trace types diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/Orphans.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/Orphans.hs index 06c2757d52..0ba537b87b 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/Orphans.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/Orphans.hs @@ -23,6 +23,8 @@ import Data.Bimap (Bimap) import qualified Data.Bimap as Bimap import Data.IntPSQ (IntPSQ) import qualified Data.IntPSQ as PSQ +import Data.MultiSet (MultiSet) +import qualified Data.MultiSet as MultiSet import Data.SOP.BasicFunctors import NoThunks.Class (InspectHeap (..), InspectHeapNamed (..), NoThunks (..), OnlyCheckWhnfNamed (..), allNoThunks, @@ -75,6 +77,10 @@ instance NoThunks a => NoThunks (K a b) where showTypeOf _ = showTypeOf (Proxy @a) wNoThunks ctxt (K a) = wNoThunks ("K":ctxt) a +instance NoThunks a => NoThunks (MultiSet a) where + showTypeOf _ = "MultiSet" + wNoThunks ctxt = wNoThunks ctxt . MultiSet.toMap + {------------------------------------------------------------------------------- fs-api -------------------------------------------------------------------------------}