Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade resource-pool dependency #4

Open
wants to merge 4 commits into
base: awake
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
cabal.sandbox.config
.cabal-sandbox/
dist/
dist-newstyle/
/.stack-work/
27 changes: 17 additions & 10 deletions Network/Kafka.hs
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
{-# LANGUAGE RankNTypes, ScopedTypeVariables #-}
module Network.Kafka where

import Prelude

-- base
import Control.Applicative
import Control.Exception (Exception, IOException, bracketOnError)
import qualified Data.List.NonEmpty as NE
import Data.List.NonEmpty (NonEmpty(..))
import Data.Monoid ((<>))
import GHC.Generics (Generic)
import System.IO

-- Hackage
import Control.Exception.Lifted (catch)
import Control.Lens
import Control.Monad.Trans.Control (MonadBaseControl)
import Control.Monad.Trans.Control (MonadBaseControl(..))
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Except (ExceptT(..), runExceptT, MonadError(..))
import Control.Monad.Trans.State
Expand Down Expand Up @@ -53,7 +52,7 @@ data KafkaState = KafkaState { -- | Name to use as a client ID.
, _stateTopicMetadata :: M.Map TopicName TopicMetadata
-- | Address cache
, _stateAddresses :: NonEmpty KafkaAddress
} deriving (Generic, Show)
} deriving (Generic)

makeLenses ''KafkaState

Expand Down Expand Up @@ -333,21 +332,29 @@ withBrokerHandle broker = withAddressHandle (broker2address broker)
-- Note that when the given action throws an exception, any state changes will
-- be discarded. This includes both 'IOException's and exceptions thrown by
-- 'throwError' from 'Control.Monad.Except'.
withAddressHandle :: Kafka m => KafkaAddress -> (Handle -> m a) -> m a
withAddressHandle :: forall m a . Kafka m => KafkaAddress -> (Handle -> m a) -> m a
withAddressHandle address kafkaAction = do
conns <- use stateConnections
let foundPool = conns ^. at address
pool <- case foundPool of
Nothing -> do
newPool <- tryKafka $ liftIO $ mkPool address
stateConnections .= (at address ?~ newPool $ conns)
return newPool
Just p -> return p
tryKafka $ Pool.withResource pool kafkaAction
pure newPool
Just p -> pure p
tryKafka $ withResourcePool pool kafkaAction
where
withResourcePool :: Pool.Pool Handle -> (Handle -> m a) -> m a
withResourcePool p f = restoreM =<< liftBaseWith (\k -> Pool.withResource p (k . f))

createHandle :: (Host, Port) -> IO Handle
createHandle (h, p) = connectTo (h ^. hostString) (p ^. portNumber)

poolConfig :: (Host, Port) -> Pool.PoolConfig Handle
poolConfig a = Pool.setNumStripes (Just 1) (Pool.defaultPoolConfig (createHandle a) hClose 10 1)

mkPool :: KafkaAddress -> IO (Pool.Pool Handle)
mkPool a = Pool.createPool (createHandle a) hClose 1 10 1
where createHandle (h, p) = connectTo (h ^. hostString) (p ^. portNumber)
mkPool a = Pool.newPool (poolConfig a)

connectTo :: Network.HostName -> Network.PortNumber -> IO Handle
connectTo host port = do
Expand Down
10 changes: 6 additions & 4 deletions Network/Kafka/Producer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import qualified Data.Digest.Murmur32 as Murmur32
import Control.Applicative
import Control.Lens
import Control.Monad.Trans (liftIO)
import Data.Monoid ((<>))
import Data.Set (Set)
import qualified Data.Set as Set
import System.IO
Expand Down Expand Up @@ -85,9 +84,12 @@ send l ts = do
withBrokerHandle broker $ \handle -> produce handle $ produceRequest requiredAcks requestTimeout ts

getRandPartition :: Kafka m => Set PartitionAndLeader -> m (Maybe PartitionAndLeader)
getRandPartition ps =
liftIO $ (ps' ^?) . element <$> getStdRandom (randomR (0, length ps' - 1))
where ps' = ps ^.. folded . filtered (has $ palLeader . leaderId . _Just)
getRandPartition ps = do
i <- liftIO (getStdRandom (randomR (0, length ps' - 1)))

pure (snd <$> (ps' ^@? element i))

where ps' = ps ^.. folded . filtered (has $ palLeader . leaderId . _Just)

-- * Messages

Expand Down
82 changes: 82 additions & 0 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions flake.nix
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/24.05";
flake-utils.url = "github:numtide/flake-utils";
gitignore = {
url = "github:hercules-ci/gitignore.nix";
inputs.nixpkgs.follows = "nixpkgs";
};
};

outputs = { self, nixpkgs, flake-utils, gitignore }:
flake-utils.lib.eachSystem ["x86_64-linux" "x86_64-darwin"] (system:
let
ghc = "ghc94";

haskellOverlay = import ./overlay.nix {
inherit gitignore ghc;
};

pkgs = import nixpkgs {
inherit system;
overlays = [ haskellOverlay ];
};
in {
packages.default = pkgs.haskell.packages.${ghc}.milena;
devShells.default = pkgs.milena-dev-shell;
});
}
6 changes: 3 additions & 3 deletions milena.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,18 @@ library
ghc-options: -Wall -fwarn-unused-imports -Wincomplete-uni-patterns -Wincomplete-record-updates
build-depends:
base >=4.7 && <5
, bytestring ==0.10.*
, bytestring >=0.10 && <0.12
, cereal >=0.4 && <0.6
, containers >=0.5 && <0.7
, digest >=0.0.1.0 && <0.1
, lens >=4.4 && <4.20
, lens >=5 && <6
, lifted-base >=0.2.3.6 && <0.3
, monad-control ==1.0.*
, mtl >=2.1 && <2.3
, murmur-hash >=0.1.0.8 && <0.2
, network >=2.4 && <3.2
, random >=1.0 && <1.2
, resource-pool >=0.2.3.2 && <0.3
, resource-pool >=0.4 && <0.5
, semigroups >=0.16.2.2 && <0.19
, transformers >=0.3 && <0.6
, zlib >=0.6.1.2 && <0.7
Expand Down
28 changes: 28 additions & 0 deletions overlay.nix
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{ gitignore, ghc }:

final: prev: {
haskell = prev.haskell // {
packages = prev.haskell.packages // {
"${ghc}" = prev.haskell.packages."${ghc}".override (old: {
overrides = hfinal: _: {
milena = hfinal.callCabal2nix "milena" (gitignore.lib.gitignoreSource ./.) { };
};
});
};
};

milena-dev-shell =
let
hsPkgs = final.haskell.packages.${ghc};
in
hsPkgs.shellFor {
name = "milena";

buildInputs = [
final.cabal-install
final.haskell-language-server
];

packages = pkgs: [pkgs.milena];
};
}