Skip to content

Commit

Permalink
got rid of workflowEnabled feature flag
Browse files Browse the repository at this point in the history
  • Loading branch information
saurabhnanda committed Nov 19, 2023
1 parent 2a312c9 commit cbce4e6
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 67 deletions.
25 changes: 19 additions & 6 deletions odd-jobs.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ cabal-version: 1.12
-- This file has been generated from package.yaml by hpack version 0.35.2.
--
-- see: https://github.com/sol/hpack
--
-- hash: dff1ced317b40e54688799203ec3743b119bf71f2fc98109a5e79f12c6e10fba

name: odd-jobs
version: 0.2.3
version: 0.2.2
synopsis: A full-featured PostgreSQL-backed job queue (with an admin UI)
description: - Background jobs library for Haskell.
- Extracted from production code at [Vacation Labs](https://www.vacationlabs.com).
Expand Down Expand Up @@ -47,6 +45,11 @@ extra-source-files:
assets/js/logo-slider.js
assets/odd-jobs-color-logo.png

flag jobresult
description: Controls whether the job is extended to have a result/return value along with workflow features
manual: True
default: False

library
exposed-modules:
OddJobs.Job
Expand All @@ -57,8 +60,8 @@ library
OddJobs.Types
OddJobs.ConfigBuilder
other-modules:
UI
OddJobs.Job.Query
UI
Paths_odd_jobs
hs-source-dirs:
src
Expand Down Expand Up @@ -91,7 +94,7 @@ library
, mtl
, optparse-applicative
, postgresql-simple
, resource-pool >= 0.4.0.0 && < 0.5.0.0
, resource-pool
, safe
, servant
, servant-lucid
Expand All @@ -109,6 +112,8 @@ library
, wai
, warp
default-language: Haskell2010
if flag(jobresult)
cpp-options: -D JOB_RESULT

executable devel
main-is: DevelMain.hs
Expand All @@ -117,8 +122,8 @@ executable devel
OddJobs.ConfigBuilder
OddJobs.Endpoints
OddJobs.Job
OddJobs.Migrations
OddJobs.Job.Query
OddJobs.Migrations
OddJobs.Types
OddJobs.Web
UI
Expand Down Expand Up @@ -175,6 +180,8 @@ executable devel
, wai
, warp
default-language: Haskell2010
if flag(jobresult)
cpp-options: -D JOB_RESULT

executable odd-jobs-cli-example
main-is: OddJobsCliExample.lhs
Expand All @@ -196,6 +203,7 @@ executable odd-jobs-cli-example
, async ==2.2.4
, base >=4.7 && <5
, bytestring
, containers
, directory
, either
, fast-logger
Expand Down Expand Up @@ -229,6 +237,8 @@ executable odd-jobs-cli-example
, wai
, warp
default-language: Haskell2010
if flag(jobresult)
cpp-options: -D JOB_RESULT

test-suite jobrunner
type: exitcode-stdio-1.0
Expand All @@ -239,6 +249,7 @@ test-suite jobrunner
OddJobs.ConfigBuilder
OddJobs.Endpoints
OddJobs.Job
OddJobs.Job.Query
OddJobs.Migrations
OddJobs.Types
OddJobs.Web
Expand Down Expand Up @@ -303,3 +314,5 @@ test-suite jobrunner
, wai
, warp
default-language: Haskell2010
if flag(jobresult)
cpp-options: -D JOB_RESULT
2 changes: 1 addition & 1 deletion src/OddJobs/ConfigBuilder.hs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ mkConfig logger tname dbpool ccControl jrunner configOverridesFn =
, cfgImmediateJobDeletion = defaultImmediateJobDeletion
, cfgDelayedJobDeletion = Nothing
, cfgDefaultRetryBackoff = \attempts -> pure $ Seconds $ 2 ^ attempts
, cfgEnableWorkflows = False
-- , cfgEnableWorkflows = False
}
in cfg

Expand Down
24 changes: 11 additions & 13 deletions src/OddJobs/Job.hs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ import Data.Aeson.Internal (iparse, IResult(..), formatError)
-- type-class based interface as well (similar to what
-- 'Yesod.JobQueue.YesodJobQueue' provides).
class (MonadUnliftIO m, MonadBaseControl IO m) => HasJobRunner m where
isWorkflowEnabled :: m Bool
-- isWorkflowEnabled :: m Bool
getPollingInterval :: m Seconds
onJobSuccess :: Job -> m ()
immediateJobDeletion :: m (Job -> IO Bool)
Expand Down Expand Up @@ -191,7 +191,7 @@ logCallbackErrors :: (HasJobRunner m) => JobId -> Text -> m () -> m ()
logCallbackErrors jid msg action = catchAny action $ \e -> log LevelError $ LogText $ msg <> " Job ID=" <> toS (show jid) <> ": " <> toS (show e)

instance HasJobRunner RunnerM where
isWorkflowEnabled = asks (cfgEnableWorkflows . envConfig)
-- isWorkflowEnabled = asks (cfgEnableWorkflows . envConfig)
getPollingInterval = asks (cfgPollingInterval . envConfig)
onJobFailed = asks (cfgOnJobFailed . envConfig)
onJobSuccess job = do
Expand Down Expand Up @@ -291,24 +291,23 @@ deleteJobQuery = "DELETE FROM ? WHERE id = ?"
saveJob :: (HasJobRunner m) => Job -> m Job
saveJob j = do
tname <- getTableName
workflowEnabled <- isWorkflowEnabled
withDbConnection $ \conn -> liftIO $ saveJobIO workflowEnabled conn tname j

saveJobIO :: Bool -> Connection -> TableName -> Job -> IO Job
saveJobIO workflowEnabled conn tname Job{jobRunAt, jobStatus, jobPayload, jobLastError, jobAttempts, jobLockedBy, jobLockedAt, jobResult, jobId} = do
let finalFields = if workflowEnabled
then PGS.toRow $ a1 PGS.:. (jobResult, jobId)
else PGS.toRow $ a1 PGS.:. (Only jobId)
a1 = ( tname
-- workflowEnabled <- isWorkflowEnabled
withDbConnection $ \conn -> liftIO $ saveJobIO conn tname j

saveJobIO :: Connection -> TableName -> Job -> IO Job
saveJobIO conn tname Job{jobRunAt, jobStatus, jobPayload, jobLastError, jobAttempts, jobLockedBy, jobLockedAt, jobResult, jobId, jobParentId} = do
let args = ( tname
, jobRunAt
, jobStatus
, jobPayload
, jobLastError
, jobAttempts
, jobLockedAt
, jobLockedBy
, jobResult
, jobParentId
)
rs <- PGS.queryWith (if workflowEnabled then jobRowParserWithWorkflow else jobRowParserSimple) conn (saveJobQuery workflowEnabled) finalFields
rs <- PGS.query conn saveJobQuery args
case rs of
[] -> Prelude.error $ "Could not find job while updating it id=" <> show jobId
[j] -> pure j
Expand Down Expand Up @@ -733,7 +732,6 @@ jobEventListener = do
jobDeletionPoller :: (HasJobRunner m) => (Connection -> IO Int64) -> m ()
jobDeletionPoller deletionFn = do
i <- getPollingInterval
dbPool <- getDbPool
withDbConnection $ \conn -> do
forever $ do
n <- liftIO $ deletionFn conn
Expand Down
10 changes: 5 additions & 5 deletions src/OddJobs/Job/Query.hs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ jobDbColumns =
, "attempts"
, "locked_at"
, "locked_by"
, "result"
, "parent_id"
]

{-# INLINE concatJobDbColumnsInternal #-}
Expand Down Expand Up @@ -116,8 +118,6 @@ jobDbColumnsWorkflow =
, "parent_job_id"
]

saveJobQuery :: Bool -> Query
saveJobQuery workflowEnabled =
"UPDATE ? set run_at = ?, status = ?, payload = ?, last_error = ?, attempts = ?, locked_at = ?, locked_by = ?" <>
if workflowEnabled then ", result = ?" else "" <>
" WHERE id = ? RETURNING " <> concatJobDbColumns
saveJobQuery :: Query
saveJobQuery =
"UPDATE ? set run_at = ?, status = ?, payload = ?, last_error = ?, attempts = ?, locked_at = ?, locked_by = ?, result = ? WHERE id = ? return " <> concatJobDbColumns
65 changes: 26 additions & 39 deletions src/OddJobs/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -221,18 +221,18 @@ instance FromJSON Status where
newtype JobRunnerName = JobRunnerName { unJobRunnerName :: Text } deriving (Eq, Show, FromField, ToField, Generic, ToJSON, FromJSON)

data Job = Job
{ jobId :: JobId
, jobCreatedAt :: UTCTime
, jobUpdatedAt :: UTCTime
, jobRunAt :: UTCTime
, jobStatus :: Status
, jobPayload :: Aeson.Value
, jobLastError :: Maybe Value
, jobAttempts :: Int
, jobLockedAt :: Maybe UTCTime
, jobLockedBy :: Maybe JobRunnerName
, jobResult :: Maybe Aeson.Value
, jobParentId :: Maybe JobId
{ jobId :: !JobId
, jobCreatedAt :: !UTCTime
, jobUpdatedAt :: !UTCTime
, jobRunAt :: !UTCTime
, jobStatus :: !Status
, jobPayload :: !Aeson.Value
, jobLastError :: !(Maybe Value)
, jobAttempts :: !Int
, jobLockedAt :: !(Maybe UTCTime)
, jobLockedBy :: !(Maybe JobRunnerName)
, jobResult :: !(Maybe Aeson.Value)
, jobParentId :: !(Maybe JobId)
} deriving (Eq, Show, Generic)

instance ToText Status where
Expand Down Expand Up @@ -263,34 +263,21 @@ instance FromField Status where
instance ToField Status where
toField s = toField $ toText s

jobRowParserSimple :: RowParser Job
jobRowParserSimple = jobRowParserInternal (pure Nothing) (pure Nothing)

jobRowParserWithWorkflow :: RowParser Job
jobRowParserWithWorkflow = jobRowParserInternal field field

{-# INLINE jobRowParserInternal #-}
jobRowParserInternal ::
RowParser (Maybe Aeson.Value) ->
RowParser (Maybe JobId) ->
RowParser Job
jobRowParserInternal resultParser parentJobIdParser = Job
<$> field -- jobId
<*> field -- createdAt
<*> field -- updatedAt
<*> field -- runAt
<*> field -- status
<*> field -- payload
<*> field -- lastError
<*> field -- attempts
<*> field -- lockedAt
<*> field -- lockedBy
<*> resultParser -- job result
<*> parentJobIdParser -- parentJobId

instance FromRow Job where
-- "Please do not depend on the FromRow instance of the Job type. It does not handle optional features, such as job-results and job-workflows. Depending upon your scenario, use 'jobRowParserSimple' or 'jobRowParserWithWorkflow' directly." #-}
fromRow = jobRowParserSimple
fromRow = Job
<$> field -- jobId
<*> field -- createdAt
<*> field -- updatedAt
<*> field -- runAt
<*> field -- status
<*> field -- payload
<*> field -- lastError
<*> field -- attempts
<*> field -- lockedAt
<*> field -- lockedBy
<*> field -- job result
<*> field -- parentJobId

-- TODO: Add a sum-type for return status which can signal the monitor about
-- whether the job needs to be retried, marked successfull, or whether it has
Expand Down Expand Up @@ -440,7 +427,7 @@ data Config = Config
-- via the `OddJobs.Migrations.createJobTableWithWorkflow' function, else your jobs table
-- will not have the @results@ and @parent_job_id@ fields, and ALL your jobs will start
-- failing at runtime.
, cfgEnableWorkflows :: Bool
-- , cfgEnableWorkflows :: Bool
}


Expand Down
6 changes: 3 additions & 3 deletions test/Test.hs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ testJobScheduling appPool jobPool = testCase "job scheduling" $ do
job@Job{jobId} <- Job.scheduleJob conn tname (PayloadSucceed 0 Nothing) (addUTCTime (fromIntegral (3600 :: Integer)) t)
delaySeconds $ Seconds 2
assertJobIdStatus conn tname logRef "Job is scheduled in the future. It should NOT have been successful by now" Job.Queued jobId
_ <- Job.saveJobIO False conn tname job{jobRunAt = addUTCTime (fromIntegral (-1 :: Integer)) t}
_ <- Job.saveJobIO conn tname job{jobRunAt = addUTCTime (fromIntegral (-1 :: Integer)) t}
delaySeconds (Job.defaultPollingInterval + Seconds 2)
assertJobIdStatus conn tname logRef "Job had a runAt date in the past. It should have been successful by now" Job.Success jobId

Expand Down Expand Up @@ -434,7 +434,7 @@ testPushFailedJobEndQueue jobPool = testCase "testPushFailedJobEndQueue" $ do
Pool.withResource jobPool $ \conn -> do
job1 <- Job.createJob conn tname (PayloadAlwaysFail 0)
job2 <- Job.createJob conn tname (PayloadAlwaysFail 0)
_ <- Job.saveJobIO False conn tname (job1 {jobAttempts = 1})
_ <- Job.saveJobIO conn tname (job1 {jobAttempts = 1})
[Only resId] <- Job.jobPollingIO conn "testPushFailedJobEndQueue" tname 5
assertEqual
"Expecting the current job to be 2 since job 1 has been modified"
Expand Down Expand Up @@ -506,7 +506,7 @@ testRetryBackoff appPool jobPool = testCase "retry backoff" $ do

-- Run it for another attempt and check the backoff scales
job <- ensureJobId conn tname jobId
_ <- Job.saveJobIO False conn tname (job {jobRunAt = jobQueueTime})
_ <- Job.saveJobIO conn tname (job {jobRunAt = jobQueueTime})
delaySeconds (2 * Job.defaultPollingInterval)

assertBackedOff
Expand Down

0 comments on commit cbce4e6

Please sign in to comment.