diff --git a/odd-jobs.cabal b/odd-jobs.cabal index ebaf1a0..99e3f9c 100644 --- a/odd-jobs.cabal +++ b/odd-jobs.cabal @@ -3,11 +3,9 @@ cabal-version: 1.12 -- This file has been generated from package.yaml by hpack version 0.35.2. -- -- see: https://github.com/sol/hpack --- --- hash: dff1ced317b40e54688799203ec3743b119bf71f2fc98109a5e79f12c6e10fba name: odd-jobs -version: 0.2.3 +version: 0.2.2 synopsis: A full-featured PostgreSQL-backed job queue (with an admin UI) description: - Background jobs library for Haskell. - Extracted from production code at [Vacation Labs](https://www.vacationlabs.com). @@ -47,6 +45,11 @@ extra-source-files: assets/js/logo-slider.js assets/odd-jobs-color-logo.png +flag jobresult + description: Controls whether the job is extended to have a result/return value along with workflow features + manual: True + default: False + library exposed-modules: OddJobs.Job @@ -57,8 +60,8 @@ library OddJobs.Types OddJobs.ConfigBuilder other-modules: - UI OddJobs.Job.Query + UI Paths_odd_jobs hs-source-dirs: src @@ -91,7 +94,7 @@ library , mtl , optparse-applicative , postgresql-simple - , resource-pool >= 0.4.0.0 && < 0.5.0.0 + , resource-pool , safe , servant , servant-lucid @@ -109,6 +112,8 @@ library , wai , warp default-language: Haskell2010 + if flag(jobresult) + cpp-options: -D JOB_RESULT executable devel main-is: DevelMain.hs @@ -117,8 +122,8 @@ executable devel OddJobs.ConfigBuilder OddJobs.Endpoints OddJobs.Job - OddJobs.Migrations OddJobs.Job.Query + OddJobs.Migrations OddJobs.Types OddJobs.Web UI @@ -175,6 +180,8 @@ executable devel , wai , warp default-language: Haskell2010 + if flag(jobresult) + cpp-options: -D JOB_RESULT executable odd-jobs-cli-example main-is: OddJobsCliExample.lhs @@ -196,6 +203,7 @@ executable odd-jobs-cli-example , async ==2.2.4 , base >=4.7 && <5 , bytestring + , containers , directory , either , fast-logger @@ -229,6 +237,8 @@ executable odd-jobs-cli-example , wai , warp default-language: Haskell2010 + if flag(jobresult) + cpp-options: -D JOB_RESULT test-suite jobrunner type: exitcode-stdio-1.0 @@ -239,6 +249,7 @@ test-suite jobrunner OddJobs.ConfigBuilder OddJobs.Endpoints OddJobs.Job + OddJobs.Job.Query OddJobs.Migrations OddJobs.Types OddJobs.Web @@ -303,3 +314,5 @@ test-suite jobrunner , wai , warp default-language: Haskell2010 + if flag(jobresult) + cpp-options: -D JOB_RESULT diff --git a/src/OddJobs/ConfigBuilder.hs b/src/OddJobs/ConfigBuilder.hs index 87439a4..a98e321 100644 --- a/src/OddJobs/ConfigBuilder.hs +++ b/src/OddJobs/ConfigBuilder.hs @@ -83,7 +83,7 @@ mkConfig logger tname dbpool ccControl jrunner configOverridesFn = , cfgImmediateJobDeletion = defaultImmediateJobDeletion , cfgDelayedJobDeletion = Nothing , cfgDefaultRetryBackoff = \attempts -> pure $ Seconds $ 2 ^ attempts - , cfgEnableWorkflows = False + -- , cfgEnableWorkflows = False } in cfg diff --git a/src/OddJobs/Job.hs b/src/OddJobs/Job.hs index b9712ab..fd8626b 100644 --- a/src/OddJobs/Job.hs +++ b/src/OddJobs/Job.hs @@ -149,7 +149,7 @@ import Data.Aeson.Internal (iparse, IResult(..), formatError) -- type-class based interface as well (similar to what -- 'Yesod.JobQueue.YesodJobQueue' provides). class (MonadUnliftIO m, MonadBaseControl IO m) => HasJobRunner m where - isWorkflowEnabled :: m Bool + -- isWorkflowEnabled :: m Bool getPollingInterval :: m Seconds onJobSuccess :: Job -> m () immediateJobDeletion :: m (Job -> IO Bool) @@ -191,7 +191,7 @@ logCallbackErrors :: (HasJobRunner m) => JobId -> Text -> m () -> m () logCallbackErrors jid msg action = catchAny action $ \e -> log LevelError $ LogText $ msg <> " Job ID=" <> toS (show jid) <> ": " <> toS (show e) instance HasJobRunner RunnerM where - isWorkflowEnabled = asks (cfgEnableWorkflows . envConfig) + -- isWorkflowEnabled = asks (cfgEnableWorkflows . envConfig) getPollingInterval = asks (cfgPollingInterval . envConfig) onJobFailed = asks (cfgOnJobFailed . envConfig) onJobSuccess job = do @@ -291,15 +291,12 @@ deleteJobQuery = "DELETE FROM ? WHERE id = ?" saveJob :: (HasJobRunner m) => Job -> m Job saveJob j = do tname <- getTableName - workflowEnabled <- isWorkflowEnabled - withDbConnection $ \conn -> liftIO $ saveJobIO workflowEnabled conn tname j - -saveJobIO :: Bool -> Connection -> TableName -> Job -> IO Job -saveJobIO workflowEnabled conn tname Job{jobRunAt, jobStatus, jobPayload, jobLastError, jobAttempts, jobLockedBy, jobLockedAt, jobResult, jobId} = do - let finalFields = if workflowEnabled - then PGS.toRow $ a1 PGS.:. (jobResult, jobId) - else PGS.toRow $ a1 PGS.:. (Only jobId) - a1 = ( tname + -- workflowEnabled <- isWorkflowEnabled + withDbConnection $ \conn -> liftIO $ saveJobIO conn tname j + +saveJobIO :: Connection -> TableName -> Job -> IO Job +saveJobIO conn tname Job{jobRunAt, jobStatus, jobPayload, jobLastError, jobAttempts, jobLockedBy, jobLockedAt, jobResult, jobId, jobParentId} = do + let args = ( tname , jobRunAt , jobStatus , jobPayload @@ -307,8 +304,10 @@ saveJobIO workflowEnabled conn tname Job{jobRunAt, jobStatus, jobPayload, jobLas , jobAttempts , jobLockedAt , jobLockedBy + , jobResult + , jobParentId ) - rs <- PGS.queryWith (if workflowEnabled then jobRowParserWithWorkflow else jobRowParserSimple) conn (saveJobQuery workflowEnabled) finalFields + rs <- PGS.query conn saveJobQuery args case rs of [] -> Prelude.error $ "Could not find job while updating it id=" <> show jobId [j] -> pure j @@ -733,7 +732,6 @@ jobEventListener = do jobDeletionPoller :: (HasJobRunner m) => (Connection -> IO Int64) -> m () jobDeletionPoller deletionFn = do i <- getPollingInterval - dbPool <- getDbPool withDbConnection $ \conn -> do forever $ do n <- liftIO $ deletionFn conn diff --git a/src/OddJobs/Job/Query.hs b/src/OddJobs/Job/Query.hs index 9312e39..46c9938 100644 --- a/src/OddJobs/Job/Query.hs +++ b/src/OddJobs/Job/Query.hs @@ -83,6 +83,8 @@ jobDbColumns = , "attempts" , "locked_at" , "locked_by" + , "result" + , "parent_id" ] {-# INLINE concatJobDbColumnsInternal #-} @@ -116,8 +118,6 @@ jobDbColumnsWorkflow = , "parent_job_id" ] -saveJobQuery :: Bool -> Query -saveJobQuery workflowEnabled = - "UPDATE ? set run_at = ?, status = ?, payload = ?, last_error = ?, attempts = ?, locked_at = ?, locked_by = ?" <> - if workflowEnabled then ", result = ?" else "" <> - " WHERE id = ? RETURNING " <> concatJobDbColumns +saveJobQuery :: Query +saveJobQuery = + "UPDATE ? set run_at = ?, status = ?, payload = ?, last_error = ?, attempts = ?, locked_at = ?, locked_by = ?, result = ? WHERE id = ? return " <> concatJobDbColumns diff --git a/src/OddJobs/Types.hs b/src/OddJobs/Types.hs index 5306cff..a506e56 100644 --- a/src/OddJobs/Types.hs +++ b/src/OddJobs/Types.hs @@ -221,18 +221,18 @@ instance FromJSON Status where newtype JobRunnerName = JobRunnerName { unJobRunnerName :: Text } deriving (Eq, Show, FromField, ToField, Generic, ToJSON, FromJSON) data Job = Job - { jobId :: JobId - , jobCreatedAt :: UTCTime - , jobUpdatedAt :: UTCTime - , jobRunAt :: UTCTime - , jobStatus :: Status - , jobPayload :: Aeson.Value - , jobLastError :: Maybe Value - , jobAttempts :: Int - , jobLockedAt :: Maybe UTCTime - , jobLockedBy :: Maybe JobRunnerName - , jobResult :: Maybe Aeson.Value - , jobParentId :: Maybe JobId + { jobId :: !JobId + , jobCreatedAt :: !UTCTime + , jobUpdatedAt :: !UTCTime + , jobRunAt :: !UTCTime + , jobStatus :: !Status + , jobPayload :: !Aeson.Value + , jobLastError :: !(Maybe Value) + , jobAttempts :: !Int + , jobLockedAt :: !(Maybe UTCTime) + , jobLockedBy :: !(Maybe JobRunnerName) + , jobResult :: !(Maybe Aeson.Value) + , jobParentId :: !(Maybe JobId) } deriving (Eq, Show, Generic) instance ToText Status where @@ -263,34 +263,21 @@ instance FromField Status where instance ToField Status where toField s = toField $ toText s -jobRowParserSimple :: RowParser Job -jobRowParserSimple = jobRowParserInternal (pure Nothing) (pure Nothing) - -jobRowParserWithWorkflow :: RowParser Job -jobRowParserWithWorkflow = jobRowParserInternal field field - -{-# INLINE jobRowParserInternal #-} -jobRowParserInternal :: - RowParser (Maybe Aeson.Value) -> - RowParser (Maybe JobId) -> - RowParser Job -jobRowParserInternal resultParser parentJobIdParser = Job - <$> field -- jobId - <*> field -- createdAt - <*> field -- updatedAt - <*> field -- runAt - <*> field -- status - <*> field -- payload - <*> field -- lastError - <*> field -- attempts - <*> field -- lockedAt - <*> field -- lockedBy - <*> resultParser -- job result - <*> parentJobIdParser -- parentJobId - instance FromRow Job where -- "Please do not depend on the FromRow instance of the Job type. It does not handle optional features, such as job-results and job-workflows. Depending upon your scenario, use 'jobRowParserSimple' or 'jobRowParserWithWorkflow' directly." #-} - fromRow = jobRowParserSimple + fromRow = Job + <$> field -- jobId + <*> field -- createdAt + <*> field -- updatedAt + <*> field -- runAt + <*> field -- status + <*> field -- payload + <*> field -- lastError + <*> field -- attempts + <*> field -- lockedAt + <*> field -- lockedBy + <*> field -- job result + <*> field -- parentJobId -- TODO: Add a sum-type for return status which can signal the monitor about -- whether the job needs to be retried, marked successfull, or whether it has @@ -440,7 +427,7 @@ data Config = Config -- via the `OddJobs.Migrations.createJobTableWithWorkflow' function, else your jobs table -- will not have the @results@ and @parent_job_id@ fields, and ALL your jobs will start -- failing at runtime. - , cfgEnableWorkflows :: Bool + -- , cfgEnableWorkflows :: Bool } diff --git a/test/Test.hs b/test/Test.hs index 62a462d..ad89768 100644 --- a/test/Test.hs +++ b/test/Test.hs @@ -380,7 +380,7 @@ testJobScheduling appPool jobPool = testCase "job scheduling" $ do job@Job{jobId} <- Job.scheduleJob conn tname (PayloadSucceed 0 Nothing) (addUTCTime (fromIntegral (3600 :: Integer)) t) delaySeconds $ Seconds 2 assertJobIdStatus conn tname logRef "Job is scheduled in the future. It should NOT have been successful by now" Job.Queued jobId - _ <- Job.saveJobIO False conn tname job{jobRunAt = addUTCTime (fromIntegral (-1 :: Integer)) t} + _ <- Job.saveJobIO conn tname job{jobRunAt = addUTCTime (fromIntegral (-1 :: Integer)) t} delaySeconds (Job.defaultPollingInterval + Seconds 2) assertJobIdStatus conn tname logRef "Job had a runAt date in the past. It should have been successful by now" Job.Success jobId @@ -434,7 +434,7 @@ testPushFailedJobEndQueue jobPool = testCase "testPushFailedJobEndQueue" $ do Pool.withResource jobPool $ \conn -> do job1 <- Job.createJob conn tname (PayloadAlwaysFail 0) job2 <- Job.createJob conn tname (PayloadAlwaysFail 0) - _ <- Job.saveJobIO False conn tname (job1 {jobAttempts = 1}) + _ <- Job.saveJobIO conn tname (job1 {jobAttempts = 1}) [Only resId] <- Job.jobPollingIO conn "testPushFailedJobEndQueue" tname 5 assertEqual "Expecting the current job to be 2 since job 1 has been modified" @@ -506,7 +506,7 @@ testRetryBackoff appPool jobPool = testCase "retry backoff" $ do -- Run it for another attempt and check the backoff scales job <- ensureJobId conn tname jobId - _ <- Job.saveJobIO False conn tname (job {jobRunAt = jobQueueTime}) + _ <- Job.saveJobIO conn tname (job {jobRunAt = jobQueueTime}) delaySeconds (2 * Job.defaultPollingInterval) assertBackedOff