diff --git a/.gitignore b/.gitignore index c8b5735..d0813af 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ cabal.sandbox.config .cabal-sandbox/ dist/ +dist-newstyle/ /.stack-work/ diff --git a/Network/Kafka.hs b/Network/Kafka.hs index 1f68e6b..53b6bbc 100644 --- a/Network/Kafka.hs +++ b/Network/Kafka.hs @@ -1,20 +1,19 @@ +{-# LANGUAGE RankNTypes, ScopedTypeVariables #-} module Network.Kafka where import Prelude -- base -import Control.Applicative import Control.Exception (Exception, IOException, bracketOnError) import qualified Data.List.NonEmpty as NE import Data.List.NonEmpty (NonEmpty(..)) -import Data.Monoid ((<>)) import GHC.Generics (Generic) import System.IO -- Hackage import Control.Exception.Lifted (catch) import Control.Lens -import Control.Monad.Trans.Control (MonadBaseControl) +import Control.Monad.Trans.Control (MonadBaseControl(..)) import Control.Monad.IO.Class (MonadIO, liftIO) import Control.Monad.Except (ExceptT(..), runExceptT, MonadError(..)) import Control.Monad.Trans.State @@ -53,7 +52,7 @@ data KafkaState = KafkaState { -- | Name to use as a client ID. , _stateTopicMetadata :: M.Map TopicName TopicMetadata -- | Address cache , _stateAddresses :: NonEmpty KafkaAddress - } deriving (Generic, Show) + } deriving (Generic) makeLenses ''KafkaState @@ -333,7 +332,7 @@ withBrokerHandle broker = withAddressHandle (broker2address broker) -- Note that when the given action throws an exception, any state changes will -- be discarded. This includes both 'IOException's and exceptions thrown by -- 'throwError' from 'Control.Monad.Except'. -withAddressHandle :: Kafka m => KafkaAddress -> (Handle -> m a) -> m a +withAddressHandle :: forall m a . Kafka m => KafkaAddress -> (Handle -> m a) -> m a withAddressHandle address kafkaAction = do conns <- use stateConnections let foundPool = conns ^. at address @@ -341,13 +340,21 @@ withAddressHandle address kafkaAction = do Nothing -> do newPool <- tryKafka $ liftIO $ mkPool address stateConnections .= (at address ?~ newPool $ conns) - return newPool - Just p -> return p - tryKafka $ Pool.withResource pool kafkaAction + pure newPool + Just p -> pure p + tryKafka $ withResourcePool pool kafkaAction where + withResourcePool :: Pool.Pool Handle -> (Handle -> m a) -> m a + withResourcePool p f = restoreM =<< liftBaseWith (\k -> Pool.withResource p (k . f)) + + createHandle :: (Host, Port) -> IO Handle + createHandle (h, p) = connectTo (h ^. hostString) (p ^. portNumber) + + poolConfig :: (Host, Port) -> Pool.PoolConfig Handle + poolConfig a = Pool.setNumStripes (Just 1) (Pool.defaultPoolConfig (createHandle a) hClose 10 1) + mkPool :: KafkaAddress -> IO (Pool.Pool Handle) - mkPool a = Pool.createPool (createHandle a) hClose 1 10 1 - where createHandle (h, p) = connectTo (h ^. hostString) (p ^. portNumber) + mkPool a = Pool.newPool (poolConfig a) connectTo :: Network.HostName -> Network.PortNumber -> IO Handle connectTo host port = do diff --git a/Network/Kafka/Producer.hs b/Network/Kafka/Producer.hs index dd87999..f738f7c 100644 --- a/Network/Kafka/Producer.hs +++ b/Network/Kafka/Producer.hs @@ -6,7 +6,6 @@ import qualified Data.Digest.Murmur32 as Murmur32 import Control.Applicative import Control.Lens import Control.Monad.Trans (liftIO) -import Data.Monoid ((<>)) import Data.Set (Set) import qualified Data.Set as Set import System.IO @@ -85,9 +84,12 @@ send l ts = do withBrokerHandle broker $ \handle -> produce handle $ produceRequest requiredAcks requestTimeout ts getRandPartition :: Kafka m => Set PartitionAndLeader -> m (Maybe PartitionAndLeader) -getRandPartition ps = - liftIO $ (ps' ^?) . element <$> getStdRandom (randomR (0, length ps' - 1)) - where ps' = ps ^.. folded . filtered (has $ palLeader . leaderId . _Just) +getRandPartition ps = do + i <- liftIO (getStdRandom (randomR (0, length ps' - 1))) + + pure (snd <$> (ps' ^@? element i)) + + where ps' = ps ^.. folded . filtered (has $ palLeader . leaderId . _Just) -- * Messages diff --git a/flake.lock b/flake.lock new file mode 100644 index 0000000..a7b24ab --- /dev/null +++ b/flake.lock @@ -0,0 +1,82 @@ +{ + "nodes": { + "flake-utils": { + "inputs": { + "systems": "systems" + }, + "locked": { + "lastModified": 1710146030, + "narHash": "sha256-SZ5L6eA7HJ/nmkzGG7/ISclqe6oZdOZTNoesiInkXPQ=", + "owner": "numtide", + "repo": "flake-utils", + "rev": "b1d9ab70662946ef0850d488da1c9019f3a9752a", + "type": "github" + }, + "original": { + "owner": "numtide", + "repo": "flake-utils", + "type": "github" + } + }, + "gitignore": { + "inputs": { + "nixpkgs": [ + "nixpkgs" + ] + }, + "locked": { + "lastModified": 1709087332, + "narHash": "sha256-HG2cCnktfHsKV0s4XW83gU3F57gaTljL9KNSuG6bnQs=", + "owner": "hercules-ci", + "repo": "gitignore.nix", + "rev": "637db329424fd7e46cf4185293b9cc8c88c95394", + "type": "github" + }, + "original": { + "owner": "hercules-ci", + "repo": "gitignore.nix", + "type": "github" + } + }, + "nixpkgs": { + "locked": { + "lastModified": 1717179513, + "narHash": "sha256-vboIEwIQojofItm2xGCdZCzW96U85l9nDW3ifMuAIdM=", + "owner": "NixOS", + "repo": "nixpkgs", + "rev": "63dacb46bf939521bdc93981b4cbb7ecb58427a0", + "type": "github" + }, + "original": { + "owner": "NixOS", + "ref": "24.05", + "repo": "nixpkgs", + "type": "github" + } + }, + "root": { + "inputs": { + "flake-utils": "flake-utils", + "gitignore": "gitignore", + "nixpkgs": "nixpkgs" + } + }, + "systems": { + "locked": { + "lastModified": 1681028828, + "narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=", + "owner": "nix-systems", + "repo": "default", + "rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e", + "type": "github" + }, + "original": { + "owner": "nix-systems", + "repo": "default", + "type": "github" + } + } + }, + "root": "root", + "version": 7 +} diff --git a/flake.nix b/flake.nix new file mode 100644 index 0000000..9b96268 --- /dev/null +++ b/flake.nix @@ -0,0 +1,28 @@ +{ + inputs = { + nixpkgs.url = "github:NixOS/nixpkgs/24.05"; + flake-utils.url = "github:numtide/flake-utils"; + gitignore = { + url = "github:hercules-ci/gitignore.nix"; + inputs.nixpkgs.follows = "nixpkgs"; + }; + }; + + outputs = { self, nixpkgs, flake-utils, gitignore }: + flake-utils.lib.eachSystem ["x86_64-linux" "x86_64-darwin"] (system: + let + ghc = "ghc94"; + + haskellOverlay = import ./overlay.nix { + inherit gitignore ghc; + }; + + pkgs = import nixpkgs { + inherit system; + overlays = [ haskellOverlay ]; + }; + in { + packages.default = pkgs.haskell.packages.${ghc}.milena; + devShells.default = pkgs.milena-dev-shell; + }); +} diff --git a/milena.cabal b/milena.cabal index 30665ea..048edc6 100644 --- a/milena.cabal +++ b/milena.cabal @@ -44,18 +44,18 @@ library ghc-options: -Wall -fwarn-unused-imports -Wincomplete-uni-patterns -Wincomplete-record-updates build-depends: base >=4.7 && <5 - , bytestring ==0.10.* + , bytestring >=0.10 && <0.12 , cereal >=0.4 && <0.6 , containers >=0.5 && <0.7 , digest >=0.0.1.0 && <0.1 - , lens >=4.4 && <4.20 + , lens >=5 && <6 , lifted-base >=0.2.3.6 && <0.3 , monad-control ==1.0.* , mtl >=2.1 && <2.3 , murmur-hash >=0.1.0.8 && <0.2 , network >=2.4 && <3.2 , random >=1.0 && <1.2 - , resource-pool >=0.2.3.2 && <0.3 + , resource-pool >=0.4 && <0.5 , semigroups >=0.16.2.2 && <0.19 , transformers >=0.3 && <0.6 , zlib >=0.6.1.2 && <0.7 diff --git a/overlay.nix b/overlay.nix new file mode 100644 index 0000000..b1099e3 --- /dev/null +++ b/overlay.nix @@ -0,0 +1,28 @@ +{ gitignore, ghc }: + +final: prev: { + haskell = prev.haskell // { + packages = prev.haskell.packages // { + "${ghc}" = prev.haskell.packages."${ghc}".override (old: { + overrides = hfinal: _: { + milena = hfinal.callCabal2nix "milena" (gitignore.lib.gitignoreSource ./.) { }; + }; + }); + }; + }; + + milena-dev-shell = + let + hsPkgs = final.haskell.packages.${ghc}; + in + hsPkgs.shellFor { + name = "milena"; + + buildInputs = [ + final.cabal-install + final.haskell-language-server + ]; + + packages = pkgs: [pkgs.milena]; + }; +} \ No newline at end of file