From ac1fe0aa41274105fded33b95bbc19479fafaa26 Mon Sep 17 00:00:00 2001 From: patrick brisbin Date: Fri, 7 Jun 2024 09:12:45 -0400 Subject: [PATCH 1/4] Add Faktory.Pool This implements connection pooling by adding `HasFaktoryPool` which can be used to acquire a `Producer`, which is just a `Client`, which is just a `Connection`. I chose the name `FaktoryPool` (instead of `FaktoryProducerPool`) for two reasons, one conceptual and one practical: - Conceptual: I think the `Producer` wrapper over `Client` is unnecessary. It adds nothing and a `Producer` could just as easily be used for any communication with Faktory, to produce, consume, batch, track, etc. So I consider this a "pool for Faktory" and no specifically "for Faktory _producers_". (Same reason we call it `SqlPool` and not `SqlInserterPool` or `SqlQuerierPool`.) - Practical: we have a `FaktoryProducerPool` in `freckle-app`, that this is a generalization and upstreaming of. Changing the name along the way makes the errors clearer and more easy to deal with when converting downstream applications. --- faktory.cabal | 6 +- library/Faktory/Pool.hs | 126 ++++++++++++++++++++++++++++++++++++ library/Faktory/Settings.hs | 47 ++++++++++++++ package.yaml | 3 + 4 files changed, 181 insertions(+), 1 deletion(-) create mode 100644 library/Faktory/Pool.hs diff --git a/faktory.cabal b/faktory.cabal index 953e180..6b04862 100644 --- a/faktory.cabal +++ b/faktory.cabal @@ -4,7 +4,7 @@ cabal-version: 1.18 -- -- see: https://github.com/sol/hpack -- --- hash: 4e8530827601475a141f2b1524153409a869122bcd0dafe12b70368e0aefcf07 +-- hash: 690e0deaa46069fadbd59ee63f2da1a97d4fb5bcadabf75910d088636b34bbf9 name: faktory version: 1.1.2.7 @@ -66,6 +66,7 @@ library Faktory.JobFailure Faktory.JobOptions Faktory.JobState + Faktory.Pool Faktory.Prelude Faktory.Producer Faktory.Protocol @@ -112,15 +113,18 @@ library , errors , megaparsec , memory + , microlens , mtl , network , random + , resource-pool , safe-exceptions , scanner , semigroups >=0.19.1 , text , time , unix + , unliftio , unordered-containers default-language: Haskell2010 if impl(ghc >= 8.10) diff --git a/library/Faktory/Pool.hs b/library/Faktory/Pool.hs new file mode 100644 index 0000000..803e2b3 --- /dev/null +++ b/library/Faktory/Pool.hs @@ -0,0 +1,126 @@ +module Faktory.Pool + ( FaktoryPool + , HasFaktoryPool (..) + + -- * Pool Construction + , Settings + , PoolSettings + , newFaktoryPool + + -- * Pool use + , perform + , buildJob + + -- * Direct access + , withProducer + , takeProducer + + -- * Re-exports + , module Faktory.Job + ) where + +import Faktory.Prelude + +import Control.Monad.IO.Class (MonadIO (..)) +import Control.Monad.Reader (MonadReader, asks) +import Data.Aeson (ToJSON) +import Data.Pool (Pool) +import qualified Data.Pool as Pool +import Faktory.Job hiding (buildJob, perform) +import qualified Faktory.Job as Job +import Faktory.Producer +import Faktory.Settings (PoolSettings (..), Settings) +import GHC.Stack (HasCallStack) +import Lens.Micro (Lens', (^.)) +import UnliftIO (MonadUnliftIO, withRunInIO) + +-- | +-- +-- @since 1.1.3.0 +type FaktoryPool = Pool Producer + +-- | +-- +-- @since 1.1.3.0 +class HasFaktoryPool env where + faktoryPoolL :: Lens' env FaktoryPool + +instance HasFaktoryPool FaktoryPool where + faktoryPoolL = id + +-- | Build a 'FaktoryPool' with the given settings +-- +-- See 'Settings', 'envSettings', 'PoolSettings', and 'envPoolSettings'. +-- +-- @since 1.1.3.0 +newFaktoryPool + :: MonadIO m + => Settings + -> PoolSettings + -> m FaktoryPool +newFaktoryPool settings PoolSettings {..} = do + liftIO + . Pool.newPool + $ Pool.defaultPoolConfig + (newProducer settings) + closeProducer + (fromIntegral settingsTimeout) + (fromIntegral settingsSize) + +-- | 'Faktory.Job.perform' but using a 'Producer' from the pool +-- +-- @since 1.1.3.0 +perform + :: ( MonadUnliftIO m + , MonadReader env m + , HasFaktoryPool env + , ToJSON arg + , HasCallStack + ) + => JobOptions + -> arg + -> m JobId +perform options arg = do + withProducer $ \producer -> do + liftIO $ Job.perform options producer arg + +-- | 'Faktory.Job.buildJob' but using a 'Producer' from the pool +-- +-- @since 1.1.3.0 +buildJob + :: (MonadUnliftIO m, MonadReader env m, HasFaktoryPool env) + => JobOptions + -> arg + -> m (Job arg) +buildJob options arg = do + withProducer $ \producer -> do + liftIO $ Job.buildJob options producer arg + +-- | Acquire a 'Producer', use it, and return it to the pool +-- +-- @since 1.1.3.0 +withProducer + :: (MonadUnliftIO m, MonadReader env m, HasFaktoryPool env) + => (Producer -> m a) + -> m a +withProducer f = do + p <- asks (^. faktoryPoolL) + withRunInIO $ \runInIO -> do + Pool.withResource p $ runInIO . f + +-- | Get a 'Producer' from the pool along with an action to return it +-- +-- You should prefer 'withProducer' if at all possible. With this function you +-- are responsible to ensure the return action is called (e.g. with 'finally'). +-- +-- This is only necessary if you are operating in a monad that doesn't have +-- 'MonadUnliftIO' (like 'ConduitT'), so you need to take and return a +-- 'Producer' separately (e.g. with 'bracketP'). +-- +-- @since 1.1.3.0 +takeProducer + :: (MonadIO m, MonadReader env m, HasFaktoryPool env) => m (Producer, m ()) +takeProducer = do + p <- asks (^. faktoryPoolL) + (producer, lp) <- liftIO $ Pool.takeResource p + pure (producer, liftIO $ Pool.putResource lp producer) diff --git a/library/Faktory/Settings.hs b/library/Faktory/Settings.hs index 3fb3f8a..422f461 100644 --- a/library/Faktory/Settings.hs +++ b/library/Faktory/Settings.hs @@ -2,6 +2,8 @@ module Faktory.Settings ( Settings (..) , defaultSettings , envSettings + + -- * Worker , WorkerSettings (..) , defaultWorkerSettings , envWorkerSettings @@ -12,6 +14,10 @@ module Faktory.Settings , WorkerId , randomWorkerId + -- * Pool + , PoolSettings (..) + , envPoolSettings + -- * Re-exports , ConnectionInfo (..) , Namespace (..) @@ -23,6 +29,7 @@ import Data.Aeson import Faktory.Connection import Faktory.JobOptions (JobOptions) import Faktory.Settings.Queue +import Numeric.Natural import System.Environment (lookupEnv) import System.IO (hPutStrLn, stderr) import System.Random @@ -82,3 +89,43 @@ newtype WorkerId = WorkerId String randomWorkerId :: IO WorkerId randomWorkerId = WorkerId . take 8 . randomRs ('a', 'z') <$> newStdGen + +-- | +-- +-- @since 1.1.3.0 +data PoolSettings = PoolSettings + { settingsSize :: Natural + -- ^ Maximum pool size + -- + -- Default is @10@. Smallest acceptable value is @1@. Note that, due to the + -- striping behavior of @resource-pool@, a configured size @N@ may result in + -- @N - 1@ resources. + , settingsTimeout :: Natural + -- ^ How long before destroying a resource, in seconds + -- + -- Default is @600@. + } + +-- | +-- +-- @since 1.1.3.0 +defaultPoolSettings :: PoolSettings +defaultPoolSettings = + PoolSettings + { settingsSize = 10 + , settingsTimeout = 600 + } + +-- | Read 'PoolSettings' from the environment +-- +-- - @FAKTORY_POOL_SIZE@ +-- - @FAKTORY_POOL_TIMEOUT@ +-- +-- @since 1.1.3.0 +envPoolSettings :: IO PoolSettings +envPoolSettings = + PoolSettings + <$> (maybe settingsSize read <$> lookupEnv "FAKTORY_POOL_SIZE") + <*> (maybe settingsTimeout read <$> lookupEnv "FAKTORY_POOL_TIMEOUT") + where + PoolSettings {..} = defaultPoolSettings diff --git a/package.yaml b/package.yaml index efe5140..463bfd5 100644 --- a/package.yaml +++ b/package.yaml @@ -104,15 +104,18 @@ library: - errors - megaparsec - memory + - microlens - mtl - network - random + - resource-pool - safe-exceptions - semigroups >= 0.19.1 - scanner - text - time - unix + - unliftio - unordered-containers executables: From 91ac8e12245deca80ed90cba97882a159d434434 Mon Sep 17 00:00:00 2001 From: patrick brisbin Date: Fri, 7 Jun 2024 09:19:17 -0400 Subject: [PATCH 2/4] Version bump --- CHANGELOG.md | 9 ++++++++- faktory.cabal | 2 +- package.yaml | 2 +- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c7df754..e2ecc46 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,11 @@ -## [_Unreleased_](https://github.com/frontrowed/faktory_worker_haskell/compare/v1.1.2.7...main) +## [_Unreleased_](https://github.com/frontrowed/faktory_worker_haskell/compare/v1.1.3.0...main) + +## [v1.1.3.0](https://github.com/frontrowed/faktory_worker_haskell/compare/v1.1.2.7...v1.1.3.0) + +- Add `Faktory.Pool` + + This incurs the new dependencies, `unliftio`, `resource-pool`, and + `microlens`. ## [v1.1.2.7](https://github.com/frontrowed/faktory_worker_haskell/compare/v1.1.2.6...v1.1.2.7) diff --git a/faktory.cabal b/faktory.cabal index 6b04862..9c94979 100644 --- a/faktory.cabal +++ b/faktory.cabal @@ -7,7 +7,7 @@ cabal-version: 1.18 -- hash: 690e0deaa46069fadbd59ee63f2da1a97d4fb5bcadabf75910d088636b34bbf9 name: faktory -version: 1.1.2.7 +version: 1.1.3.0 synopsis: Faktory Worker for Haskell description: Haskell client and worker process for the Faktory background job server. . diff --git a/package.yaml b/package.yaml index 463bfd5..2e99672 100644 --- a/package.yaml +++ b/package.yaml @@ -1,6 +1,6 @@ --- name: faktory -version: 1.1.2.7 +version: 1.1.3.0 category: Network author: Freckle Engineering maintainer: engineering@freckle.com From 8ab94c6422bb39ce212d73d41056239549bd71e1 Mon Sep 17 00:00:00 2001 From: patrick brisbin Date: Fri, 7 Jun 2024 12:52:08 -0400 Subject: [PATCH 3/4] Use newtype instead of type --- library/Faktory/Pool.hs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/library/Faktory/Pool.hs b/library/Faktory/Pool.hs index 803e2b3..a88f82e 100644 --- a/library/Faktory/Pool.hs +++ b/library/Faktory/Pool.hs @@ -37,7 +37,7 @@ import UnliftIO (MonadUnliftIO, withRunInIO) -- | -- -- @since 1.1.3.0 -type FaktoryPool = Pool Producer +newtype FaktoryPool = FaktoryPool (Pool Producer) -- | -- @@ -60,6 +60,7 @@ newFaktoryPool -> m FaktoryPool newFaktoryPool settings PoolSettings {..} = do liftIO + . fmap FaktoryPool . Pool.newPool $ Pool.defaultPoolConfig (newProducer settings) @@ -104,7 +105,7 @@ withProducer => (Producer -> m a) -> m a withProducer f = do - p <- asks (^. faktoryPoolL) + FaktoryPool p <- asks (^. faktoryPoolL) withRunInIO $ \runInIO -> do Pool.withResource p $ runInIO . f @@ -121,6 +122,6 @@ withProducer f = do takeProducer :: (MonadIO m, MonadReader env m, HasFaktoryPool env) => m (Producer, m ()) takeProducer = do - p <- asks (^. faktoryPoolL) + FaktoryPool p <- asks (^. faktoryPoolL) (producer, lp) <- liftIO $ Pool.takeResource p pure (producer, liftIO $ Pool.putResource lp producer) From e2a10e94fe766adc665737e185fc23f3af84a2f8 Mon Sep 17 00:00:00 2001 From: patrick brisbin Date: Fri, 7 Jun 2024 13:14:05 -0400 Subject: [PATCH 4/4] Compatibility with resource-pool < 0.3 --- faktory.cabal | 3 ++- library/Data/Pool/Compat.hs | 31 +++++++++++++++++++++++++++++++ library/Faktory/Pool.hs | 19 +++++++++---------- 3 files changed, 42 insertions(+), 11 deletions(-) create mode 100644 library/Data/Pool/Compat.hs diff --git a/faktory.cabal b/faktory.cabal index 9c94979..1f6ff6b 100644 --- a/faktory.cabal +++ b/faktory.cabal @@ -4,7 +4,7 @@ cabal-version: 1.18 -- -- see: https://github.com/sol/hpack -- --- hash: 690e0deaa46069fadbd59ee63f2da1a97d4fb5bcadabf75910d088636b34bbf9 +-- hash: 89f1872b73104a5e3c9199c20692c3b0da3ba450a23bdfd7aad3d2bacc7bcb6c name: faktory version: 1.1.3.0 @@ -56,6 +56,7 @@ source-repository head library exposed-modules: + Data.Pool.Compat Faktory.Client Faktory.Connection Faktory.Ent.Batch diff --git a/library/Data/Pool/Compat.hs b/library/Data/Pool/Compat.hs new file mode 100644 index 0000000..74c69b4 --- /dev/null +++ b/library/Data/Pool/Compat.hs @@ -0,0 +1,31 @@ +{-# LANGUAGE CPP #-} + +module Data.Pool.Compat + ( module Data.Pool + , createPool + ) where + +import Prelude + +import Data.Pool hiding (createPool) +#if MIN_VERSION_resource_pool(0,3,0) +#else +import Control.Concurrent (getNumCapabilities) +import qualified Data.Pool as Pool +#endif + +createPool + :: IO a + -> (a -> IO ()) + -> Double + -> Int + -> IO (Pool a) +createPool create destroy timeout size = do +#if MIN_VERSION_resource_pool(0,3,0) + newPool $ defaultPoolConfig create destroy timeout size +#else + -- Re-implement instead of using the deprecated compatibility function, so + -- that we can get a consistent numStripes and size behavior. + numStripes <- getNumCapabilities + Pool.createPool create destroy numStripes (realToFrac timeout) size +#endif diff --git a/library/Faktory/Pool.hs b/library/Faktory/Pool.hs index a88f82e..54dbfa4 100644 --- a/library/Faktory/Pool.hs +++ b/library/Faktory/Pool.hs @@ -24,8 +24,8 @@ import Faktory.Prelude import Control.Monad.IO.Class (MonadIO (..)) import Control.Monad.Reader (MonadReader, asks) import Data.Aeson (ToJSON) -import Data.Pool (Pool) -import qualified Data.Pool as Pool +import Data.Pool.Compat (Pool) +import qualified Data.Pool.Compat as Pool import Faktory.Job hiding (buildJob, perform) import qualified Faktory.Job as Job import Faktory.Producer @@ -59,14 +59,13 @@ newFaktoryPool -> PoolSettings -> m FaktoryPool newFaktoryPool settings PoolSettings {..} = do - liftIO - . fmap FaktoryPool - . Pool.newPool - $ Pool.defaultPoolConfig - (newProducer settings) - closeProducer - (fromIntegral settingsTimeout) - (fromIntegral settingsSize) + liftIO $ + FaktoryPool + <$> Pool.createPool + (newProducer settings) + closeProducer + (fromIntegral settingsTimeout) + (fromIntegral settingsSize) -- | 'Faktory.Job.perform' but using a 'Producer' from the pool --