From db02f43fb8e337bd224a01b6c423050c37f1f6b3 Mon Sep 17 00:00:00 2001 From: Saurabh Nanda Date: Mon, 9 Oct 2023 11:02:39 +0000 Subject: [PATCH 1/7] first cut of job deletion poller --- src/OddJobs/ConfigBuilder.hs | 26 +++++++++++++++++++++++++- src/OddJobs/Job.hs | 25 +++++++++++++++++++------ src/OddJobs/Types.hs | 20 ++++++++++++++++++-- 3 files changed, 62 insertions(+), 9 deletions(-) diff --git a/src/OddJobs/ConfigBuilder.hs b/src/OddJobs/ConfigBuilder.hs index 695618f..bf5d9e8 100644 --- a/src/OddJobs/ConfigBuilder.hs +++ b/src/OddJobs/ConfigBuilder.hs @@ -23,6 +23,8 @@ import GHC.Exts (toList) import qualified Data.ByteString as BS import UnliftIO (MonadUnliftIO, withRunInIO, bracket, liftIO) import qualified System.Log.FastLogger as FLogger +import GHC.Int (Int64) +import Database.PostgreSQL.Simple.Types as PGS (Identifier(..)) # if MIN_VERSION_aeson(2, 0, 0) import qualified Data.Aeson.KeyMap as HM @@ -78,7 +80,8 @@ mkConfig logger tname dbpool ccControl jrunner configOverridesFn = , cfgConcurrencyControl = ccControl , cfgJobType = defaultJobType , cfgDefaultJobTimeout = Seconds 600 - , cfgDeleteSuccessfulJobs = True + , cfgImmediateJobDeletion = defaultImmediateJobDeletion + , cfgDelayedJobDeletion = Nothing , cfgDefaultRetryBackoff = \attempts -> pure $ Seconds $ 2 ^ attempts } in cfg @@ -344,3 +347,24 @@ defaultJsonJob = genericToJSON Aeson.defaultOptions defaultJsonFailureMode :: FailureMode -> Aeson.Value defaultJsonFailureMode = genericToJSON Aeson.defaultOptions + +defaultImmediateJobDeletion :: Job -> IO Bool +defaultImmediateJobDeletion Job{jobStatus} = + if jobStatus == OddJobs.Types.Success + then pure True + else pure False + +defaultDelayedJobDeletionSql :: TableName -> Int -> PGS.Connection -> IO Int64 +defaultDelayedJobDeletionSql tname d conn = + PGS.execute conn qry (tname, PGS.In statusList, show d <> " days") + where + -- this function has been deliberately written like this to ensure that whenever a new Status is added/removed + -- one is forced to update this list and decide what is to be done about the new Status + statusList = (flip DL.filter) ([minBound..maxBound] :: [OddJobs.Types.Status]) $ \st -> case st of + Success -> True + Queued -> False + Failed -> True + Cancelled -> True + Retry -> False + Locked -> False + qry = "DELETE FROM ? WHERE status in ? AND run_at < current_timestamp - ? :: interval" \ No newline at end of file diff --git a/src/OddJobs/Job.hs b/src/OddJobs/Job.hs index f7b5cf8..ac478f1 100644 --- a/src/OddJobs/Job.hs +++ b/src/OddJobs/Job.hs @@ -79,6 +79,7 @@ module OddJobs.Job , throwParsePayload , eitherParsePayloadWith , throwParsePayloadWith + , noJobResult ) where @@ -113,7 +114,7 @@ import qualified Data.Aeson.Types as Aeson (Parser, parseMaybe) import Data.String.Conv (StringConv(..), toS) import Data.Functor ((<&>), void) import Control.Monad (forever, forM_, join) -import Data.Maybe (isNothing, maybe, fromMaybe, listToMaybe, mapMaybe) +import Data.Maybe (isNothing, maybe, fromMaybe, listToMaybe, mapMaybe, maybeToList) import Data.Either (either) import Control.Monad.Reader import GHC.Generics @@ -130,6 +131,7 @@ import GHC.Exts (toList) import Database.PostgreSQL.Simple.Types as PGS (Identifier(..)) import Database.PostgreSQL.Simple.ToField as PGS (toField) import OddJobs.Job.Query +import GHC.Int (Int64) #if MIN_VERSION_aeson(2,2,0) import Data.Aeson.Types #else @@ -148,7 +150,8 @@ import Data.Aeson.Internal (iparse, IResult(..), formatError) class (MonadUnliftIO m, MonadBaseControl IO m) => HasJobRunner m where getPollingInterval :: m Seconds onJobSuccess :: Job -> m () - deleteSuccessfulJobs :: m Bool + immediateJobDeletion :: m (Job -> IO Bool) + delayedJobDeletion :: m (Maybe (PGS.Connection -> IO Int64)) onJobFailed :: m [JobErrHandler] getJobRunner :: m (Job -> IO ()) getDbPool :: m (Pool Connection) @@ -191,7 +194,8 @@ instance HasJobRunner RunnerM where onJobSuccess job = do fn <- asks (cfgOnJobSuccess . envConfig) logCallbackErrors (jobId job) "onJobSuccess" $ liftIO $ fn job - deleteSuccessfulJobs = asks (cfgDeleteSuccessfulJobs . envConfig) + immediateJobDeletion = asks (cfgImmediateJobDeletion . envConfig) + delayedJobDeletion = asks (cfgDelayedJobDeletion . envConfig) getJobRunner = asks (cfgJobRunner . envConfig) getDbPool = asks (cfgDbPool . envConfig) @@ -398,7 +402,7 @@ runJob jid = do onJobStart job runJobWithTimeout lockTimeout job endTime <- liftIO getCurrentTime - shouldDeleteJob <- deleteSuccessfulJobs + shouldDeleteJob <- immediateJobDeletion >>= (\fn -> liftIO $ fn newJob) let newJob = job{jobStatus=OddJobs.Types.Success, jobLockedBy=Nothing, jobLockedAt=Nothing, jobUpdatedAt = endTime} if shouldDeleteJob then deleteJob jid @@ -483,12 +487,16 @@ jobMonitor = do a1 <- async $ restartUponCrash "Job poller" jobPoller a2 <- async $ restartUponCrash "Job event listener" jobEventListener a3 <- async $ restartUponCrash "Job Kill poller" killJobPoller - traceM "jobMonitor after async" - finally (void $ waitAnyCatch [a1, a2, a3]) $ do + a4 <- delayedJobDeletion >>= \case + Nothing -> pure Nothing + Just _ -> fmap Just $ async $ restartUponCrash "job deletion poller" jobDeletionPoller + let asyncThreads = [a1, a2, a3] <> (maybeToList a4) + finally (void $ waitAnyCatch asyncThreads) $ do log LevelInfo (LogText "Stopping jobPoller and jobEventListener threads.") cancel a3 cancel a2 cancel a1 + maybe (pure ()) cancel a4 log LevelInfo (LogText "Waiting for jobs to complete.") waitForJobs @@ -722,6 +730,11 @@ jobEventListener = do jid <- o .: "id" pure (jid, runAt_, mLockedAt_) + +jobDeletionPoller :: (HasJobRunner m) => m () +jobDeletionPoller = forever $ (delaySeconds =<< getPollingInterval) + + -- $createJobs -- -- Ideally you'd want to create wrappers for 'createJob' and 'scheduleJob' in diff --git a/src/OddJobs/Types.hs b/src/OddJobs/Types.hs index 02e80d4..1a184b6 100644 --- a/src/OddJobs/Types.hs +++ b/src/OddJobs/Types.hs @@ -23,6 +23,7 @@ import Data.String.Conv import Lucid (Html) import Data.Pool (Pool) import Control.Monad.Logger (LogLevel) +import GHC.Int (Int64) -- | An alias for 'QualifiedIdentifier' type. It is used for the job table name. -- Since this type has an instance of 'IsString', @@ -380,8 +381,23 @@ data Config = Config -- picked up for execution again , cfgDefaultJobTimeout :: Seconds - -- | Should successful jobs be deleted from the queue to save on table space? - , cfgDeleteSuccessfulJobs :: Bool + -- | After a job attempt, should it be immediately deleted to save table space? The default + -- behaviour, as defined by 'OddJobs.ConfigBuilder.defaultImmediateJobDeletion' is to delete + -- successful jobs immediately (and retain everything else). If you are providing your + -- own implementation here, __be careful__ to check for the job's status before deciding + -- whether to delete it, or not. + -- + -- A /possible/ use-case for non-successful jobs could be check the 'jobResult' for a failed job + -- and depending up on the 'jobResult' decide if there is no use retrying it, and if it should be + -- immediately deleted. + , cfgImmediateJobDeletion :: Job -> IO Bool + + -- | A funciton which will be run every 'cfgPollingInterval' seconds to delete + -- old jobs that may be hanging around in the @jobs@ table (eg. failed jobs, cancelled jobs, or even + -- successful jobs whose deletion has been delayed via a custom 'cfgImmediateJobDeletion' function). + -- + -- Ref: 'OddJobs.ConfigBuilder.defaultDelayedJobDeletionSql' + , cfgDelayedJobDeletion :: Maybe (PGS.Connection -> IO Int64) -- | How far into the future should jobs which can be retried be queued for? -- From 4d130b5fe8748a0f2c941c24e9c780401d773791 Mon Sep 17 00:00:00 2001 From: Saurabh Nanda Date: Mon, 9 Oct 2023 11:17:58 +0000 Subject: [PATCH 2/7] fixes --- src/OddJobs/ConfigBuilder.hs | 2 +- src/OddJobs/Job.hs | 4 ++-- test/Test.hs | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/OddJobs/ConfigBuilder.hs b/src/OddJobs/ConfigBuilder.hs index bf5d9e8..e9d1ede 100644 --- a/src/OddJobs/ConfigBuilder.hs +++ b/src/OddJobs/ConfigBuilder.hs @@ -360,7 +360,7 @@ defaultDelayedJobDeletionSql tname d conn = where -- this function has been deliberately written like this to ensure that whenever a new Status is added/removed -- one is forced to update this list and decide what is to be done about the new Status - statusList = (flip DL.filter) ([minBound..maxBound] :: [OddJobs.Types.Status]) $ \st -> case st of + statusList = flip DL.filter ([minBound..maxBound] :: [OddJobs.Types.Status]) $ \case Success -> True Queued -> False Failed -> True diff --git a/src/OddJobs/Job.hs b/src/OddJobs/Job.hs index ac478f1..366b863 100644 --- a/src/OddJobs/Job.hs +++ b/src/OddJobs/Job.hs @@ -490,7 +490,7 @@ jobMonitor = do a4 <- delayedJobDeletion >>= \case Nothing -> pure Nothing Just _ -> fmap Just $ async $ restartUponCrash "job deletion poller" jobDeletionPoller - let asyncThreads = [a1, a2, a3] <> (maybeToList a4) + let asyncThreads = [a1, a2, a3] <> maybeToList a4 finally (void $ waitAnyCatch asyncThreads) $ do log LevelInfo (LogText "Stopping jobPoller and jobEventListener threads.") cancel a3 @@ -732,7 +732,7 @@ jobEventListener = do jobDeletionPoller :: (HasJobRunner m) => m () -jobDeletionPoller = forever $ (delaySeconds =<< getPollingInterval) +jobDeletionPoller = forever $ delaySeconds =<< getPollingInterval -- $createJobs diff --git a/test/Test.hs b/test/Test.hs index 4e5261b..ea31d51 100644 --- a/test/Test.hs +++ b/test/Test.hs @@ -641,7 +641,7 @@ setup' jobPool defaultLimit action = cfgFn resCfg cfg = cfg { Job.cfgDefaultMaxAttempts = 1 -- Simplifies some tests where we have a failing job , Job.cfgConcurrencyControl = Job.ResourceLimits resCfg - , Job.cfgDeleteSuccessfulJobs = False + , Job.cfgImmediateJobDeletion = (const $ pure False) } testKillJob appPool jobPool = testCase "killing a ongoing job" $ do From e47880de1071520a26b3727a27c8cfc4aba733e3 Mon Sep 17 00:00:00 2001 From: Saurabh Nanda Date: Mon, 9 Oct 2023 11:24:00 +0000 Subject: [PATCH 3/7] hlint --- test/Test.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/Test.hs b/test/Test.hs index ea31d51..df9a357 100644 --- a/test/Test.hs +++ b/test/Test.hs @@ -641,7 +641,7 @@ setup' jobPool defaultLimit action = cfgFn resCfg cfg = cfg { Job.cfgDefaultMaxAttempts = 1 -- Simplifies some tests where we have a failing job , Job.cfgConcurrencyControl = Job.ResourceLimits resCfg - , Job.cfgImmediateJobDeletion = (const $ pure False) + , Job.cfgImmediateJobDeletion = const $ pure False } testKillJob appPool jobPool = testCase "killing a ongoing job" $ do From cd4a464050b30ebd7d274dfcc859455506073a92 Mon Sep 17 00:00:00 2001 From: Saurabh Nanda Date: Tue, 10 Oct 2023 03:28:59 +0000 Subject: [PATCH 4/7] WIP commit --- src/OddJobs/Job.hs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/OddJobs/Job.hs b/src/OddJobs/Job.hs index 366b863..d9937e2 100644 --- a/src/OddJobs/Job.hs +++ b/src/OddJobs/Job.hs @@ -79,7 +79,6 @@ module OddJobs.Job , throwParsePayload , eitherParsePayloadWith , throwParsePayloadWith - , noJobResult ) where @@ -402,8 +401,8 @@ runJob jid = do onJobStart job runJobWithTimeout lockTimeout job endTime <- liftIO getCurrentTime - shouldDeleteJob <- immediateJobDeletion >>= (\fn -> liftIO $ fn newJob) let newJob = job{jobStatus=OddJobs.Types.Success, jobLockedBy=Nothing, jobLockedAt=Nothing, jobUpdatedAt = endTime} + shouldDeleteJob <- immediateJobDeletion >>= (\fn -> liftIO $ fn newJob) if shouldDeleteJob then deleteJob jid else void $ saveJob newJob From 6afe56d81ae738be3caab560dabfe6c50a16eaa7 Mon Sep 17 00:00:00 2001 From: Saurabh Nanda Date: Tue, 10 Oct 2023 08:16:09 +0000 Subject: [PATCH 5/7] complete implemenation for delayed job deletion along with tests --- src/OddJobs/ConfigBuilder.hs | 30 ++++++++++++++++++++++++++++-- src/OddJobs/Job.hs | 18 ++++++++++-------- src/OddJobs/Types.hs | 2 ++ test/Test.hs | 27 +++++++++++++++++++++++++++ 4 files changed, 67 insertions(+), 10 deletions(-) diff --git a/src/OddJobs/ConfigBuilder.hs b/src/OddJobs/ConfigBuilder.hs index e9d1ede..14c0208 100644 --- a/src/OddJobs/ConfigBuilder.hs +++ b/src/OddJobs/ConfigBuilder.hs @@ -153,6 +153,8 @@ defaultLogStr jobTypeFn logLevel logEvent = "Kill Job Failed | " <> jobToLogStr j <> "(the job might have completed or timed out)" LogPoll -> "Polling jobs table" + LogDeletionPoll n -> + "Job deletion polled and deleted " <> toLogStr n <> " jobs" LogWebUIRequest -> "WebUIRequest (TODO: Log the actual request)" LogText t -> @@ -336,6 +338,8 @@ defaultJsonLogEvent logEvent = , "contents" Aeson..= defaultJsonJob job ] LogPoll -> Aeson.object [ "tag" Aeson..= ("LogJobPoll" :: Text)] + LogDeletionPoll n -> + Aeson.object [ "tag" Aeson..= ("LogDeletionPoll" :: Text), "contents" Aeson..= n ] LogWebUIRequest -> Aeson.object [ "tag" Aeson..= ("LogWebUIRequest" :: Text)] LogText t -> @@ -354,8 +358,30 @@ defaultImmediateJobDeletion Job{jobStatus} = then pure True else pure False -defaultDelayedJobDeletionSql :: TableName -> Int -> PGS.Connection -> IO Int64 -defaultDelayedJobDeletionSql tname d conn = +-- | Use this function to get a sensible default implementation for the 'cfgDelayedJobDeletion'. +-- You would typically use it as such: +-- +-- @ +-- let tname = TableName "jobs" +-- loggingFn _ _ = _todo +-- dbPool = _todo +-- myJobRunner = _todo +-- cfg = mkConfig _loggingFnTODO tname (MaxConcurrentJobs 10) _dbPoolTODO _jobRunnerTODO $ \x -> +-- x { cfgDelayedJobDeletion = Just (defaultDelayedJobDeletion tname 7) } +-- @ +defaultDelayedJobDeletion :: + (Show d, Num d) => + TableName -> + -- ^ DB table which holds your jobs. Ref: 'cfgTableName' + d -> + -- ^ Number of days after which successful, failed, and cancelled jobs + -- should be deleted from the table + PGS.Connection -> + -- ^ the postgres connection that will be provided to this function, + -- to be able to execute the @DELETE@ statement. + IO Int64 + -- ^ number of rows\/jobs deleted +defaultDelayedJobDeletion tname d conn = PGS.execute conn qry (tname, PGS.In statusList, show d <> " days") where -- this function has been deliberately written like this to ensure that whenever a new Status is added/removed diff --git a/src/OddJobs/Job.hs b/src/OddJobs/Job.hs index d9937e2..d13aae6 100644 --- a/src/OddJobs/Job.hs +++ b/src/OddJobs/Job.hs @@ -228,13 +228,11 @@ instance HasJobRunner RunnerM where -- standalone daemon. startJobRunner :: Config -> IO () startJobRunner jm = do - traceM "startJobRunner top" r <- newIORef DM.empty let monitorEnv = RunnerEnv { envConfig = jm , envJobThreadsRef = r } - traceM "before runReaderT" runReaderT jobMonitor monitorEnv jobWorkerName :: IO String @@ -463,7 +461,6 @@ killJob jid = do -- TODO: This might have a resource leak. restartUponCrash :: (HasJobRunner m, Show a) => Text -> m a -> m () restartUponCrash name_ action = do - traceM "restartUponCrash top" a <- async action finally (waitCatch a >>= fn) $ do log LevelInfo $ LogText $ "Received shutdown: " <> toS name_ @@ -482,13 +479,12 @@ restartUponCrash name_ action = do -- executed to finish execution before exiting the main thread. jobMonitor :: forall m . (HasJobRunner m) => m () jobMonitor = do - traceM "jobMonitor top" a1 <- async $ restartUponCrash "Job poller" jobPoller a2 <- async $ restartUponCrash "Job event listener" jobEventListener a3 <- async $ restartUponCrash "Job Kill poller" killJobPoller a4 <- delayedJobDeletion >>= \case Nothing -> pure Nothing - Just _ -> fmap Just $ async $ restartUponCrash "job deletion poller" jobDeletionPoller + Just deletionFn -> fmap Just $ async $ restartUponCrash "job deletion poller" (jobDeletionPoller deletionFn) let asyncThreads = [a1, a2, a3] <> maybeToList a4 finally (void $ waitAnyCatch asyncThreads) $ do log LevelInfo (LogText "Stopping jobPoller and jobEventListener threads.") @@ -730,9 +726,15 @@ jobEventListener = do pure (jid, runAt_, mLockedAt_) -jobDeletionPoller :: (HasJobRunner m) => m () -jobDeletionPoller = forever $ delaySeconds =<< getPollingInterval - +jobDeletionPoller :: (HasJobRunner m) => (Connection -> IO Int64) -> m () +jobDeletionPoller deletionFn = do + i <- getPollingInterval + dbPool <- getDbPool + withDbConnection $ \conn -> do + forever $ do + n <- liftIO $ deletionFn conn + log LevelDebug $ LogDeletionPoll n + delaySeconds i -- $createJobs -- diff --git a/src/OddJobs/Types.hs b/src/OddJobs/Types.hs index 1a184b6..4e8229d 100644 --- a/src/OddJobs/Types.hs +++ b/src/OddJobs/Types.hs @@ -80,6 +80,8 @@ data LogEvent | LogKillJobFailed !Job -- | Emitted whenever 'OddJobs.Job.jobPoller' polls the DB table | LogPoll + -- | Emitted whenever 'OddJobs.Job.jobPoller' polls the DB table + | LogDeletionPoll !Int64 -- | TODO | LogWebUIRequest -- | Emitted whenever any other event occurs diff --git a/test/Test.hs b/test/Test.hs index df9a357..67d987d 100644 --- a/test/Test.hs +++ b/test/Test.hs @@ -91,6 +91,7 @@ tests appPool jobPool = testGroup "All tests" , testGracefulShutdown appPool jobPool , testPushFailedJobEndQueue jobPool , testRetryBackoff appPool jobPool + , testJobDeletion appPool jobPool , testGroup "callback tests" [ testOnJobFailed appPool jobPool , testOnJobStart appPool jobPool @@ -176,6 +177,7 @@ logEventToJob le = case le of Job.LogKillJobSuccess j -> Just j Job.LogKillJobFailed j -> Just j Job.LogPoll -> Nothing + Job.LogDeletionPoll _ -> Nothing Job.LogWebUIRequest -> Nothing Job.LogText _ -> Nothing @@ -209,6 +211,7 @@ assertJobIdStatus conn tname logRef msg st jid = do Job.LogWebUIRequest -> False Job.LogText _ -> False Job.LogPoll -> False + Job.LogDeletionPoll _ -> False Job.Failed -> assertBool (msg <> ": Failed event not found in job-logs for JobId=" <> show jid) $ @@ -379,6 +382,30 @@ testJobScheduling appPool jobPool = testCase "job scheduling" $ do delaySeconds (Job.defaultPollingInterval + Seconds 2) assertJobIdStatus conn tname logRef "Job had a runAt date in the past. It should have been successful by now" Job.Success jobId +testJobDeletion appPool jobPool = testCase "job failure" $ do + withRandomTable jobPool $ \tname -> do + withNamedJobMonitor tname jobPool (\cfg -> cfg { Job.cfgDelayedJobDeletion = Just $ Job.defaultDelayedJobDeletion tname pinterval, Job.cfgDefaultMaxAttempts = 3 }) $ \_logRef -> do + Pool.withResource appPool $ \conn -> do + + let assertDeletedJob msg jid = + Job.findJobByIdIO conn tname jid >>= \case + Nothing -> pure () + Just _ -> assertFailure msg + + successJob <- Job.createJob conn tname (PayloadSucceed 0) + failJob <- Job.createJob conn tname (PayloadAlwaysFail 0) + delaySeconds Job.defaultPollingInterval + + assertDeletedJob "Expecting successful job to be immediately deleted" (jobId successJob) + + j <- ensureJobId conn tname (jobId failJob) + assertEqual "Exepcting job to be in Failed status" Job.Failed (jobStatus j) + + delaySeconds (Job.defaultPollingInterval * 3) + assertDeletedJob "Expecting failed job to be deleted after adequate delay" (jobId failJob) + where + pinterval = 3 * fromIntegral (Job.unSeconds Job.defaultPollingInterval) / (24*60*60 :: Float) + testJobFailure appPool jobPool = testCase "job failure" $ do withNewJobMonitor jobPool $ \tname _logRef -> do Pool.withResource appPool $ \conn -> do From feadd8bd26a4d2609c42325d6c478f60e5480a41 Mon Sep 17 00:00:00 2001 From: Saurabh Nanda Date: Tue, 10 Oct 2023 09:12:52 +0000 Subject: [PATCH 6/7] fixed the tests --- src/OddJobs/ConfigBuilder.hs | 10 +++++----- test/Test.hs | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/OddJobs/ConfigBuilder.hs b/src/OddJobs/ConfigBuilder.hs index 14c0208..6d87184 100644 --- a/src/OddJobs/ConfigBuilder.hs +++ b/src/OddJobs/ConfigBuilder.hs @@ -370,19 +370,19 @@ defaultImmediateJobDeletion Job{jobStatus} = -- x { cfgDelayedJobDeletion = Just (defaultDelayedJobDeletion tname 7) } -- @ defaultDelayedJobDeletion :: - (Show d, Num d) => TableName -> -- ^ DB table which holds your jobs. Ref: 'cfgTableName' - d -> - -- ^ Number of days after which successful, failed, and cancelled jobs - -- should be deleted from the table + String -> + -- ^ Time intterval after which successful, failed, and cancelled jobs + -- should be deleted from the table. __NOTE:__ This needs to be expressed + -- as an actual PostgreSQL interval, such as @"7 days"@ or @"12 hours"@ PGS.Connection -> -- ^ the postgres connection that will be provided to this function, -- to be able to execute the @DELETE@ statement. IO Int64 -- ^ number of rows\/jobs deleted defaultDelayedJobDeletion tname d conn = - PGS.execute conn qry (tname, PGS.In statusList, show d <> " days") + PGS.execute conn qry (tname, PGS.In statusList, d) where -- this function has been deliberately written like this to ensure that whenever a new Status is added/removed -- one is forced to update this list and decide what is to be done about the new Status diff --git a/test/Test.hs b/test/Test.hs index 67d987d..aa7535c 100644 --- a/test/Test.hs +++ b/test/Test.hs @@ -382,9 +382,9 @@ testJobScheduling appPool jobPool = testCase "job scheduling" $ do delaySeconds (Job.defaultPollingInterval + Seconds 2) assertJobIdStatus conn tname logRef "Job had a runAt date in the past. It should have been successful by now" Job.Success jobId -testJobDeletion appPool jobPool = testCase "job failure" $ do +testJobDeletion appPool jobPool = testCase "job immediae deletion" $ do withRandomTable jobPool $ \tname -> do - withNamedJobMonitor tname jobPool (\cfg -> cfg { Job.cfgDelayedJobDeletion = Just $ Job.defaultDelayedJobDeletion tname pinterval, Job.cfgDefaultMaxAttempts = 3 }) $ \_logRef -> do + withNamedJobMonitor tname jobPool (\cfg -> cfg { Job.cfgDelayedJobDeletion = Just $ Job.defaultDelayedJobDeletion tname pinterval, Job.cfgDefaultMaxAttempts = 2 }) $ \_logRef -> do Pool.withResource appPool $ \conn -> do let assertDeletedJob msg jid = @@ -394,17 +394,17 @@ testJobDeletion appPool jobPool = testCase "job failure" $ do successJob <- Job.createJob conn tname (PayloadSucceed 0) failJob <- Job.createJob conn tname (PayloadAlwaysFail 0) - delaySeconds Job.defaultPollingInterval + delaySeconds (Job.defaultPollingInterval * 3) assertDeletedJob "Expecting successful job to be immediately deleted" (jobId successJob) j <- ensureJobId conn tname (jobId failJob) assertEqual "Exepcting job to be in Failed status" Job.Failed (jobStatus j) - delaySeconds (Job.defaultPollingInterval * 3) + delaySeconds (Job.defaultPollingInterval * 4) assertDeletedJob "Expecting failed job to be deleted after adequate delay" (jobId failJob) where - pinterval = 3 * fromIntegral (Job.unSeconds Job.defaultPollingInterval) / (24*60*60 :: Float) + pinterval = show (Job.unSeconds Job.defaultPollingInterval) <> " seconds" testJobFailure appPool jobPool = testCase "job failure" $ do withNewJobMonitor jobPool $ \tname _logRef -> do From abc6ad24e2e3f12a049a676f609f2cacc9b582b3 Mon Sep 17 00:00:00 2001 From: Saurabh Nanda Date: Wed, 11 Oct 2023 04:54:28 +0000 Subject: [PATCH 7/7] documentation and review fixes --- src/OddJobs/ConfigBuilder.hs | 13 +++++++------ src/OddJobs/Job.hs | 2 +- src/OddJobs/Types.hs | 2 +- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/OddJobs/ConfigBuilder.hs b/src/OddJobs/ConfigBuilder.hs index 6d87184..9484d69 100644 --- a/src/OddJobs/ConfigBuilder.hs +++ b/src/OddJobs/ConfigBuilder.hs @@ -23,7 +23,7 @@ import GHC.Exts (toList) import qualified Data.ByteString as BS import UnliftIO (MonadUnliftIO, withRunInIO, bracket, liftIO) import qualified System.Log.FastLogger as FLogger -import GHC.Int (Int64) +import Data.Int (Int64) import Database.PostgreSQL.Simple.Types as PGS (Identifier(..)) # if MIN_VERSION_aeson(2, 0, 0) @@ -366,14 +366,14 @@ defaultImmediateJobDeletion Job{jobStatus} = -- loggingFn _ _ = _todo -- dbPool = _todo -- myJobRunner = _todo --- cfg = mkConfig _loggingFnTODO tname (MaxConcurrentJobs 10) _dbPoolTODO _jobRunnerTODO $ \x -> --- x { cfgDelayedJobDeletion = Just (defaultDelayedJobDeletion tname 7) } +-- cfg = mkConfig loggingFn tname (MaxConcurrentJobs 10) dbPool jobRunner $ \x -> +-- x { cfgDelayedJobDeletion = Just (defaultDelayedJobDeletion tname "7 days") } -- @ defaultDelayedJobDeletion :: TableName -> -- ^ DB table which holds your jobs. Ref: 'cfgTableName' String -> - -- ^ Time intterval after which successful, failed, and cancelled jobs + -- ^ Time interval after which successful, failed, and cancelled jobs -- should be deleted from the table. __NOTE:__ This needs to be expressed -- as an actual PostgreSQL interval, such as @"7 days"@ or @"12 hours"@ PGS.Connection -> @@ -384,8 +384,9 @@ defaultDelayedJobDeletion :: defaultDelayedJobDeletion tname d conn = PGS.execute conn qry (tname, PGS.In statusList, d) where - -- this function has been deliberately written like this to ensure that whenever a new Status is added/removed - -- one is forced to update this list and decide what is to be done about the new Status + -- this function has been deliberately written like this to ensure that + -- whenever a new Status is added/removed one is forced to update this + -- list and decide what is to be done about the new Status statusList = flip DL.filter ([minBound..maxBound] :: [OddJobs.Types.Status]) $ \case Success -> True Queued -> False diff --git a/src/OddJobs/Job.hs b/src/OddJobs/Job.hs index d13aae6..9e42dfe 100644 --- a/src/OddJobs/Job.hs +++ b/src/OddJobs/Job.hs @@ -130,7 +130,7 @@ import GHC.Exts (toList) import Database.PostgreSQL.Simple.Types as PGS (Identifier(..)) import Database.PostgreSQL.Simple.ToField as PGS (toField) import OddJobs.Job.Query -import GHC.Int (Int64) +import Data.Int (Int64) #if MIN_VERSION_aeson(2,2,0) import Data.Aeson.Types #else diff --git a/src/OddJobs/Types.hs b/src/OddJobs/Types.hs index 4e8229d..177cf13 100644 --- a/src/OddJobs/Types.hs +++ b/src/OddJobs/Types.hs @@ -23,7 +23,7 @@ import Data.String.Conv import Lucid (Html) import Data.Pool (Pool) import Control.Monad.Logger (LogLevel) -import GHC.Int (Int64) +import Data.Int (Int64) -- | An alias for 'QualifiedIdentifier' type. It is used for the job table name. -- Since this type has an instance of 'IsString',