From 3cff0cd0a68972356a43676eeca21107dffb7b1e Mon Sep 17 00:00:00 2001 From: Saurabh Nanda Date: Fri, 11 Oct 2024 11:08:49 +0530 Subject: [PATCH] first attempt to implement rescheduleJob safely --- src/OddJobs/Job.hs | 55 ++++++++++++++++++++++++++++++++++++++++ src/OddJobs/Job/Query.hs | 13 ++++++++-- src/OddJobs/Types.hs | 2 ++ 3 files changed, 68 insertions(+), 2 deletions(-) diff --git a/src/OddJobs/Job.hs b/src/OddJobs/Job.hs index 9e42dfe..e0ac39f 100644 --- a/src/OddJobs/Job.hs +++ b/src/OddJobs/Job.hs @@ -20,6 +20,7 @@ module OddJobs.Job -- $createJobs , createJob , scheduleJob + , rescheduleJob , createJobWithResources , scheduleJobWithResources @@ -39,6 +40,7 @@ module OddJobs.Job , ResourceId(..) , FunctionName , RunnerEnv(..) + , RescheduleError(..) -- ** Structured logging -- @@ -781,6 +783,59 @@ scheduleJob conn tname payload runAt = do [r] -> pure r _ -> Prelude.error . (<> "Not expecting multiple rows when creating a single job. Query=") <$> queryFormatter +fetchJobByIdForUpdate :: Connection -> TableName -> JobId -> IO (Maybe Job) +fetchJobByIdForUpdate conn tname jid = do + let args = (tname, jid) + queryFormatter = toS <$> PGS.formatQuery conn fetchJobByIdForUpdateSql args + PGS.query conn fetchJobByIdForUpdateSql args >>= \case + [] -> pure Nothing + [j] -> pure $ Just j + _ -> Prelude.error . (<> "Not expecting multiple rows when querying a job by Job ID. Query=") <$> queryFormatter + +-- | Reschedule a job (among other things) +-- +-- This function can be used to __safely__ change the following three values of a job: +-- +-- * Job's status (so, you can use it to pre-maturely cancel a job) +-- * Job's attempts (so, you can use it to reduce the number of attempts, thus effectively +-- /increasing/ the number of times the job will be attempted) +-- * Job's run-at (so, you can use it to /prepone/ or /postpone/ a job, especially useful +-- if your job is a result of some user-action, and you want to implement a /debounce/ +-- logic) +-- +-- In case the job is currently locked, or not found, this returns a 'RescheduleError' instead. +rescheduleJob :: Connection + -- ^DB connection to use. __Note:__ This should /ideally/ come out of your + -- application's DB pool, not the 'cfgDbPool' you used in the job-runner. + -> TableName + -- ^ DB table which holds your jobs + -> JobId + -- ^ the JobId which you want to reschedule + -> (Job -> IO (Status, Int, UTCTime)) + -- ^ a "rescheduling function" which will be passed the Job and will need to + -- return a 3-tuple of @(newStatus, newAttempts, newRunAt)@ + -- + -- __Note:__ This rescheduliung function is in @IO@ monad to allow you to + -- do interesting things, but please be __careful__; while the rescheduling + -- function is execute, a DB transaction is being kept open with this particular + -- Job in a LOCKED state (via @SELECT FOR UPDATE@) + -> IO (Either RescheduleError Job) + -- ^ Either the updated job is returned or a 'RescheduleError +rescheduleJob conn tname jid reschedulingFn = do + withTransaction conn $ do + fetchJobByIdForUpdate conn tname jid >>= \case + Nothing -> pure $ Left RescheduleJobNotFound + Just j -> case jobLockedAt j of + Just _ -> pure $ Left RescheduleJobLocked + Nothing -> do + (newStatus, newAttempts, newRunAt) <- reschedulingFn j + let args = (tname, newStatus, newAttempts, newRunAt, jid) + queryFormatter = toS <$> PGS.formatQuery conn rescheduleJobSql args + PGS.query conn rescheduleJobSql args >>= \case + [newjob] -> pure $ Right newjob + [] -> Prelude.error . (<> "Not expecting zero rows when updating a LOCKED job by Job ID. Query=") <$> queryFormatter + _ -> Prelude.error . (<> "Not expecting multiple rows when updating a LOCKED job by Job ID. Query=") <$> queryFormatter + type ResourceList = [(ResourceId, Int)] createJobWithResources diff --git a/src/OddJobs/Job/Query.hs b/src/OddJobs/Job/Query.hs index 8c3d94e..d886760 100644 --- a/src/OddJobs/Job/Query.hs +++ b/src/OddJobs/Job/Query.hs @@ -12,6 +12,8 @@ module OddJobs.Job.Query , registerResourceUsage , concatJobDbColumns , jobDbColumns + , rescheduleJobSql + , fetchJobByIdForUpdateSql ) where @@ -23,7 +25,7 @@ jobPollingSql :: Query jobPollingSql = "update ? set status = ?, locked_at = ?, locked_by = ?, attempts=attempts+1 \ \ WHERE id in (select id from ? where (run_at<=? AND ((status in ?) OR (status = ? and locked_at concatJobDbColumns @@ -68,6 +69,14 @@ concatJobDbColumns = concatJobDbColumns_ jobDbColumns "" concatJobDbColumns_ [col] x = x <> col concatJobDbColumns_ (col:cols) x = concatJobDbColumns_ cols (x <> col <> ", ") +-- | Ref: 'rescheduleJob' +fetchJobByIdForUpdateSql :: Query +fetchJobByIdForUpdateSql = "select " <> concatJobDbColumns <> " from ? where id = ? for update" + +-- | Ref: 'rescheduleJob' +rescheduleJobSql :: Query +rescheduleJobSql = "update ? set status = ?, attempts = ?, run_at = ? where id = ? returning " <> concatJobDbColumns + -- | If you are writing SQL queries where you want to return ALL columns from -- the jobs table it is __recommended__ that you do not issue a @SELECT *@ or -- @RETURNIG *@. List out specific DB columns using 'jobDbColumns' and diff --git a/src/OddJobs/Types.hs b/src/OddJobs/Types.hs index 177cf13..0859c90 100644 --- a/src/OddJobs/Types.hs +++ b/src/OddJobs/Types.hs @@ -117,6 +117,8 @@ data FailureMode -- by 'JobErrHandler' and 'cfgOnJobFailed'. data JobErrHandler = forall a e . (Exception e) => JobErrHandler (e -> Job -> FailureMode -> IO a) +data RescheduleError = RescheduleJobNotFound | RescheduleJobLocked deriving (Eq, Show) + type FunctionName = PGS.Identifier data ResourceCfg = ResourceCfg