Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Faktory.Pool #102

Merged
merged 4 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
## [_Unreleased_](https://github.com/frontrowed/faktory_worker_haskell/compare/v1.1.2.7...main)
## [_Unreleased_](https://github.com/frontrowed/faktory_worker_haskell/compare/v1.1.3.0...main)

## [v1.1.3.0](https://github.com/frontrowed/faktory_worker_haskell/compare/v1.1.2.7...v1.1.3.0)

- Add `Faktory.Pool`

This incurs the new dependencies, `unliftio`, `resource-pool`, and
`microlens`.

## [v1.1.2.7](https://github.com/frontrowed/faktory_worker_haskell/compare/v1.1.2.6...v1.1.2.7)

Expand Down
9 changes: 7 additions & 2 deletions faktory.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ cabal-version: 1.18
--
-- see: https://github.com/sol/hpack
--
-- hash: 4e8530827601475a141f2b1524153409a869122bcd0dafe12b70368e0aefcf07
-- hash: 89f1872b73104a5e3c9199c20692c3b0da3ba450a23bdfd7aad3d2bacc7bcb6c

name: faktory
version: 1.1.2.7
version: 1.1.3.0
synopsis: Faktory Worker for Haskell
description: Haskell client and worker process for the Faktory background job server.
.
Expand Down Expand Up @@ -56,6 +56,7 @@ source-repository head

library
exposed-modules:
Data.Pool.Compat
Faktory.Client
Faktory.Connection
Faktory.Ent.Batch
Expand All @@ -66,6 +67,7 @@ library
Faktory.JobFailure
Faktory.JobOptions
Faktory.JobState
Faktory.Pool
Faktory.Prelude
Faktory.Producer
Faktory.Protocol
Expand Down Expand Up @@ -112,15 +114,18 @@ library
, errors
, megaparsec
, memory
, microlens
, mtl
, network
, random
, resource-pool
, safe-exceptions
, scanner
, semigroups >=0.19.1
, text
, time
, unix
, unliftio
, unordered-containers
default-language: Haskell2010
if impl(ghc >= 8.10)
Expand Down
31 changes: 31 additions & 0 deletions library/Data/Pool/Compat.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{-# LANGUAGE CPP #-}

module Data.Pool.Compat
( module Data.Pool
, createPool
) where

import Prelude

import Data.Pool hiding (createPool)
#if MIN_VERSION_resource_pool(0,3,0)
#else
import Control.Concurrent (getNumCapabilities)
import qualified Data.Pool as Pool
#endif

createPool
:: IO a
-> (a -> IO ())
-> Double
-> Int
-> IO (Pool a)
createPool create destroy timeout size = do
#if MIN_VERSION_resource_pool(0,3,0)
newPool $ defaultPoolConfig create destroy timeout size
#else
-- Re-implement instead of using the deprecated compatibility function, so
-- that we can get a consistent numStripes and size behavior.
numStripes <- getNumCapabilities
Pool.createPool create destroy numStripes (realToFrac timeout) size
#endif
126 changes: 126 additions & 0 deletions library/Faktory/Pool.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
module Faktory.Pool
( FaktoryPool
, HasFaktoryPool (..)

-- * Pool Construction
, Settings
, PoolSettings
, newFaktoryPool

-- * Pool use
, perform
, buildJob

-- * Direct access
, withProducer
, takeProducer

-- * Re-exports
, module Faktory.Job
) where

import Faktory.Prelude

import Control.Monad.IO.Class (MonadIO (..))
import Control.Monad.Reader (MonadReader, asks)
import Data.Aeson (ToJSON)
import Data.Pool.Compat (Pool)
import qualified Data.Pool.Compat as Pool
import Faktory.Job hiding (buildJob, perform)
import qualified Faktory.Job as Job
import Faktory.Producer
import Faktory.Settings (PoolSettings (..), Settings)
import GHC.Stack (HasCallStack)
import Lens.Micro (Lens', (^.))
import UnliftIO (MonadUnliftIO, withRunInIO)

-- |
--
-- @since 1.1.3.0
newtype FaktoryPool = FaktoryPool (Pool Producer)

-- |
--
-- @since 1.1.3.0
class HasFaktoryPool env where
faktoryPoolL :: Lens' env FaktoryPool

instance HasFaktoryPool FaktoryPool where
faktoryPoolL = id

-- | Build a 'FaktoryPool' with the given settings
--
-- See 'Settings', 'envSettings', 'PoolSettings', and 'envPoolSettings'.
--
-- @since 1.1.3.0
newFaktoryPool
:: MonadIO m
=> Settings
-> PoolSettings
-> m FaktoryPool
newFaktoryPool settings PoolSettings {..} = do
liftIO $
FaktoryPool
<$> Pool.createPool
(newProducer settings)
closeProducer
(fromIntegral settingsTimeout)
(fromIntegral settingsSize)

-- | 'Faktory.Job.perform' but using a 'Producer' from the pool
--
-- @since 1.1.3.0
perform
:: ( MonadUnliftIO m
, MonadReader env m
, HasFaktoryPool env
, ToJSON arg
, HasCallStack
)
=> JobOptions
-> arg
-> m JobId
perform options arg = do
withProducer $ \producer -> do
liftIO $ Job.perform options producer arg

-- | 'Faktory.Job.buildJob' but using a 'Producer' from the pool
--
-- @since 1.1.3.0
buildJob
:: (MonadUnliftIO m, MonadReader env m, HasFaktoryPool env)
=> JobOptions
-> arg
-> m (Job arg)
buildJob options arg = do
withProducer $ \producer -> do
liftIO $ Job.buildJob options producer arg

-- | Acquire a 'Producer', use it, and return it to the pool
--
-- @since 1.1.3.0
withProducer
:: (MonadUnliftIO m, MonadReader env m, HasFaktoryPool env)
=> (Producer -> m a)
-> m a
withProducer f = do
FaktoryPool p <- asks (^. faktoryPoolL)
withRunInIO $ \runInIO -> do
Pool.withResource p $ runInIO . f

-- | Get a 'Producer' from the pool along with an action to return it
--
-- You should prefer 'withProducer' if at all possible. With this function you
-- are responsible to ensure the return action is called (e.g. with 'finally').
--
-- This is only necessary if you are operating in a monad that doesn't have
-- 'MonadUnliftIO' (like 'ConduitT'), so you need to take and return a
-- 'Producer' separately (e.g. with 'bracketP').
--
-- @since 1.1.3.0
takeProducer
pbrisbin marked this conversation as resolved.
Show resolved Hide resolved
:: (MonadIO m, MonadReader env m, HasFaktoryPool env) => m (Producer, m ())
takeProducer = do
FaktoryPool p <- asks (^. faktoryPoolL)
(producer, lp) <- liftIO $ Pool.takeResource p
pure (producer, liftIO $ Pool.putResource lp producer)
47 changes: 47 additions & 0 deletions library/Faktory/Settings.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module Faktory.Settings
( Settings (..)
, defaultSettings
, envSettings

-- * Worker
, WorkerSettings (..)
, defaultWorkerSettings
, envWorkerSettings
Expand All @@ -12,6 +14,10 @@ module Faktory.Settings
, WorkerId
, randomWorkerId

-- * Pool
, PoolSettings (..)
, envPoolSettings

-- * Re-exports
, ConnectionInfo (..)
, Namespace (..)
Expand All @@ -23,6 +29,7 @@ import Data.Aeson
import Faktory.Connection
import Faktory.JobOptions (JobOptions)
import Faktory.Settings.Queue
import Numeric.Natural
import System.Environment (lookupEnv)
import System.IO (hPutStrLn, stderr)
import System.Random
Expand Down Expand Up @@ -82,3 +89,43 @@ newtype WorkerId = WorkerId String

randomWorkerId :: IO WorkerId
randomWorkerId = WorkerId . take 8 . randomRs ('a', 'z') <$> newStdGen

-- |
--
-- @since 1.1.3.0
data PoolSettings = PoolSettings
{ settingsSize :: Natural
-- ^ Maximum pool size
--
-- Default is @10@. Smallest acceptable value is @1@. Note that, due to the
-- striping behavior of @resource-pool@, a configured size @N@ may result in
-- @N - 1@ resources.
, settingsTimeout :: Natural
-- ^ How long before destroying a resource, in seconds
--
-- Default is @600@.
}

-- |
--
-- @since 1.1.3.0
defaultPoolSettings :: PoolSettings
defaultPoolSettings =
PoolSettings
{ settingsSize = 10
, settingsTimeout = 600
}

-- | Read 'PoolSettings' from the environment
--
-- - @FAKTORY_POOL_SIZE@
-- - @FAKTORY_POOL_TIMEOUT@
--
-- @since 1.1.3.0
envPoolSettings :: IO PoolSettings
envPoolSettings =
PoolSettings
<$> (maybe settingsSize read <$> lookupEnv "FAKTORY_POOL_SIZE")
<*> (maybe settingsTimeout read <$> lookupEnv "FAKTORY_POOL_TIMEOUT")
where
PoolSettings {..} = defaultPoolSettings
5 changes: 4 additions & 1 deletion package.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
name: faktory
version: 1.1.2.7
version: 1.1.3.0
category: Network
author: Freckle Engineering
maintainer: [email protected]
Expand Down Expand Up @@ -104,15 +104,18 @@ library:
- errors
- megaparsec
- memory
- microlens
- mtl
- network
- random
- resource-pool
- safe-exceptions
- semigroups >= 0.19.1
- scanner
- text
- time
- unix
- unliftio
- unordered-containers

executables:
Expand Down
Loading