diff --git a/src/OddJobs/ConfigBuilder.hs b/src/OddJobs/ConfigBuilder.hs index 695618f..9484d69 100644 --- a/src/OddJobs/ConfigBuilder.hs +++ b/src/OddJobs/ConfigBuilder.hs @@ -23,6 +23,8 @@ import GHC.Exts (toList) import qualified Data.ByteString as BS import UnliftIO (MonadUnliftIO, withRunInIO, bracket, liftIO) import qualified System.Log.FastLogger as FLogger +import Data.Int (Int64) +import Database.PostgreSQL.Simple.Types as PGS (Identifier(..)) # if MIN_VERSION_aeson(2, 0, 0) import qualified Data.Aeson.KeyMap as HM @@ -78,7 +80,8 @@ mkConfig logger tname dbpool ccControl jrunner configOverridesFn = , cfgConcurrencyControl = ccControl , cfgJobType = defaultJobType , cfgDefaultJobTimeout = Seconds 600 - , cfgDeleteSuccessfulJobs = True + , cfgImmediateJobDeletion = defaultImmediateJobDeletion + , cfgDelayedJobDeletion = Nothing , cfgDefaultRetryBackoff = \attempts -> pure $ Seconds $ 2 ^ attempts } in cfg @@ -150,6 +153,8 @@ defaultLogStr jobTypeFn logLevel logEvent = "Kill Job Failed | " <> jobToLogStr j <> "(the job might have completed or timed out)" LogPoll -> "Polling jobs table" + LogDeletionPoll n -> + "Job deletion polled and deleted " <> toLogStr n <> " jobs" LogWebUIRequest -> "WebUIRequest (TODO: Log the actual request)" LogText t -> @@ -333,6 +338,8 @@ defaultJsonLogEvent logEvent = , "contents" Aeson..= defaultJsonJob job ] LogPoll -> Aeson.object [ "tag" Aeson..= ("LogJobPoll" :: Text)] + LogDeletionPoll n -> + Aeson.object [ "tag" Aeson..= ("LogDeletionPoll" :: Text), "contents" Aeson..= n ] LogWebUIRequest -> Aeson.object [ "tag" Aeson..= ("LogWebUIRequest" :: Text)] LogText t -> @@ -344,3 +351,47 @@ defaultJsonJob = genericToJSON Aeson.defaultOptions defaultJsonFailureMode :: FailureMode -> Aeson.Value defaultJsonFailureMode = genericToJSON Aeson.defaultOptions + +defaultImmediateJobDeletion :: Job -> IO Bool +defaultImmediateJobDeletion Job{jobStatus} = + if jobStatus == OddJobs.Types.Success + then pure True + else pure False + +-- | Use this function to get a sensible default implementation for the 'cfgDelayedJobDeletion'. +-- You would typically use it as such: +-- +-- @ +-- let tname = TableName "jobs" +-- loggingFn _ _ = _todo +-- dbPool = _todo +-- myJobRunner = _todo +-- cfg = mkConfig loggingFn tname (MaxConcurrentJobs 10) dbPool jobRunner $ \x -> +-- x { cfgDelayedJobDeletion = Just (defaultDelayedJobDeletion tname "7 days") } +-- @ +defaultDelayedJobDeletion :: + TableName -> + -- ^ DB table which holds your jobs. Ref: 'cfgTableName' + String -> + -- ^ Time interval after which successful, failed, and cancelled jobs + -- should be deleted from the table. __NOTE:__ This needs to be expressed + -- as an actual PostgreSQL interval, such as @"7 days"@ or @"12 hours"@ + PGS.Connection -> + -- ^ the postgres connection that will be provided to this function, + -- to be able to execute the @DELETE@ statement. + IO Int64 + -- ^ number of rows\/jobs deleted +defaultDelayedJobDeletion tname d conn = + PGS.execute conn qry (tname, PGS.In statusList, d) + where + -- this function has been deliberately written like this to ensure that + -- whenever a new Status is added/removed one is forced to update this + -- list and decide what is to be done about the new Status + statusList = flip DL.filter ([minBound..maxBound] :: [OddJobs.Types.Status]) $ \case + Success -> True + Queued -> False + Failed -> True + Cancelled -> True + Retry -> False + Locked -> False + qry = "DELETE FROM ? WHERE status in ? AND run_at < current_timestamp - ? :: interval" \ No newline at end of file diff --git a/src/OddJobs/Job.hs b/src/OddJobs/Job.hs index f7b5cf8..9e42dfe 100644 --- a/src/OddJobs/Job.hs +++ b/src/OddJobs/Job.hs @@ -113,7 +113,7 @@ import qualified Data.Aeson.Types as Aeson (Parser, parseMaybe) import Data.String.Conv (StringConv(..), toS) import Data.Functor ((<&>), void) import Control.Monad (forever, forM_, join) -import Data.Maybe (isNothing, maybe, fromMaybe, listToMaybe, mapMaybe) +import Data.Maybe (isNothing, maybe, fromMaybe, listToMaybe, mapMaybe, maybeToList) import Data.Either (either) import Control.Monad.Reader import GHC.Generics @@ -130,6 +130,7 @@ import GHC.Exts (toList) import Database.PostgreSQL.Simple.Types as PGS (Identifier(..)) import Database.PostgreSQL.Simple.ToField as PGS (toField) import OddJobs.Job.Query +import Data.Int (Int64) #if MIN_VERSION_aeson(2,2,0) import Data.Aeson.Types #else @@ -148,7 +149,8 @@ import Data.Aeson.Internal (iparse, IResult(..), formatError) class (MonadUnliftIO m, MonadBaseControl IO m) => HasJobRunner m where getPollingInterval :: m Seconds onJobSuccess :: Job -> m () - deleteSuccessfulJobs :: m Bool + immediateJobDeletion :: m (Job -> IO Bool) + delayedJobDeletion :: m (Maybe (PGS.Connection -> IO Int64)) onJobFailed :: m [JobErrHandler] getJobRunner :: m (Job -> IO ()) getDbPool :: m (Pool Connection) @@ -191,7 +193,8 @@ instance HasJobRunner RunnerM where onJobSuccess job = do fn <- asks (cfgOnJobSuccess . envConfig) logCallbackErrors (jobId job) "onJobSuccess" $ liftIO $ fn job - deleteSuccessfulJobs = asks (cfgDeleteSuccessfulJobs . envConfig) + immediateJobDeletion = asks (cfgImmediateJobDeletion . envConfig) + delayedJobDeletion = asks (cfgDelayedJobDeletion . envConfig) getJobRunner = asks (cfgJobRunner . envConfig) getDbPool = asks (cfgDbPool . envConfig) @@ -225,13 +228,11 @@ instance HasJobRunner RunnerM where -- standalone daemon. startJobRunner :: Config -> IO () startJobRunner jm = do - traceM "startJobRunner top" r <- newIORef DM.empty let monitorEnv = RunnerEnv { envConfig = jm , envJobThreadsRef = r } - traceM "before runReaderT" runReaderT jobMonitor monitorEnv jobWorkerName :: IO String @@ -398,8 +399,8 @@ runJob jid = do onJobStart job runJobWithTimeout lockTimeout job endTime <- liftIO getCurrentTime - shouldDeleteJob <- deleteSuccessfulJobs let newJob = job{jobStatus=OddJobs.Types.Success, jobLockedBy=Nothing, jobLockedAt=Nothing, jobUpdatedAt = endTime} + shouldDeleteJob <- immediateJobDeletion >>= (\fn -> liftIO $ fn newJob) if shouldDeleteJob then deleteJob jid else void $ saveJob newJob @@ -460,7 +461,6 @@ killJob jid = do -- TODO: This might have a resource leak. restartUponCrash :: (HasJobRunner m, Show a) => Text -> m a -> m () restartUponCrash name_ action = do - traceM "restartUponCrash top" a <- async action finally (waitCatch a >>= fn) $ do log LevelInfo $ LogText $ "Received shutdown: " <> toS name_ @@ -479,16 +479,19 @@ restartUponCrash name_ action = do -- executed to finish execution before exiting the main thread. jobMonitor :: forall m . (HasJobRunner m) => m () jobMonitor = do - traceM "jobMonitor top" a1 <- async $ restartUponCrash "Job poller" jobPoller a2 <- async $ restartUponCrash "Job event listener" jobEventListener a3 <- async $ restartUponCrash "Job Kill poller" killJobPoller - traceM "jobMonitor after async" - finally (void $ waitAnyCatch [a1, a2, a3]) $ do + a4 <- delayedJobDeletion >>= \case + Nothing -> pure Nothing + Just deletionFn -> fmap Just $ async $ restartUponCrash "job deletion poller" (jobDeletionPoller deletionFn) + let asyncThreads = [a1, a2, a3] <> maybeToList a4 + finally (void $ waitAnyCatch asyncThreads) $ do log LevelInfo (LogText "Stopping jobPoller and jobEventListener threads.") cancel a3 cancel a2 cancel a1 + maybe (pure ()) cancel a4 log LevelInfo (LogText "Waiting for jobs to complete.") waitForJobs @@ -722,6 +725,17 @@ jobEventListener = do jid <- o .: "id" pure (jid, runAt_, mLockedAt_) + +jobDeletionPoller :: (HasJobRunner m) => (Connection -> IO Int64) -> m () +jobDeletionPoller deletionFn = do + i <- getPollingInterval + dbPool <- getDbPool + withDbConnection $ \conn -> do + forever $ do + n <- liftIO $ deletionFn conn + log LevelDebug $ LogDeletionPoll n + delaySeconds i + -- $createJobs -- -- Ideally you'd want to create wrappers for 'createJob' and 'scheduleJob' in diff --git a/src/OddJobs/Types.hs b/src/OddJobs/Types.hs index 02e80d4..177cf13 100644 --- a/src/OddJobs/Types.hs +++ b/src/OddJobs/Types.hs @@ -23,6 +23,7 @@ import Data.String.Conv import Lucid (Html) import Data.Pool (Pool) import Control.Monad.Logger (LogLevel) +import Data.Int (Int64) -- | An alias for 'QualifiedIdentifier' type. It is used for the job table name. -- Since this type has an instance of 'IsString', @@ -79,6 +80,8 @@ data LogEvent | LogKillJobFailed !Job -- | Emitted whenever 'OddJobs.Job.jobPoller' polls the DB table | LogPoll + -- | Emitted whenever 'OddJobs.Job.jobPoller' polls the DB table + | LogDeletionPoll !Int64 -- | TODO | LogWebUIRequest -- | Emitted whenever any other event occurs @@ -380,8 +383,23 @@ data Config = Config -- picked up for execution again , cfgDefaultJobTimeout :: Seconds - -- | Should successful jobs be deleted from the queue to save on table space? - , cfgDeleteSuccessfulJobs :: Bool + -- | After a job attempt, should it be immediately deleted to save table space? The default + -- behaviour, as defined by 'OddJobs.ConfigBuilder.defaultImmediateJobDeletion' is to delete + -- successful jobs immediately (and retain everything else). If you are providing your + -- own implementation here, __be careful__ to check for the job's status before deciding + -- whether to delete it, or not. + -- + -- A /possible/ use-case for non-successful jobs could be check the 'jobResult' for a failed job + -- and depending up on the 'jobResult' decide if there is no use retrying it, and if it should be + -- immediately deleted. + , cfgImmediateJobDeletion :: Job -> IO Bool + + -- | A funciton which will be run every 'cfgPollingInterval' seconds to delete + -- old jobs that may be hanging around in the @jobs@ table (eg. failed jobs, cancelled jobs, or even + -- successful jobs whose deletion has been delayed via a custom 'cfgImmediateJobDeletion' function). + -- + -- Ref: 'OddJobs.ConfigBuilder.defaultDelayedJobDeletionSql' + , cfgDelayedJobDeletion :: Maybe (PGS.Connection -> IO Int64) -- | How far into the future should jobs which can be retried be queued for? -- diff --git a/test/Test.hs b/test/Test.hs index 4e5261b..aa7535c 100644 --- a/test/Test.hs +++ b/test/Test.hs @@ -91,6 +91,7 @@ tests appPool jobPool = testGroup "All tests" , testGracefulShutdown appPool jobPool , testPushFailedJobEndQueue jobPool , testRetryBackoff appPool jobPool + , testJobDeletion appPool jobPool , testGroup "callback tests" [ testOnJobFailed appPool jobPool , testOnJobStart appPool jobPool @@ -176,6 +177,7 @@ logEventToJob le = case le of Job.LogKillJobSuccess j -> Just j Job.LogKillJobFailed j -> Just j Job.LogPoll -> Nothing + Job.LogDeletionPoll _ -> Nothing Job.LogWebUIRequest -> Nothing Job.LogText _ -> Nothing @@ -209,6 +211,7 @@ assertJobIdStatus conn tname logRef msg st jid = do Job.LogWebUIRequest -> False Job.LogText _ -> False Job.LogPoll -> False + Job.LogDeletionPoll _ -> False Job.Failed -> assertBool (msg <> ": Failed event not found in job-logs for JobId=" <> show jid) $ @@ -379,6 +382,30 @@ testJobScheduling appPool jobPool = testCase "job scheduling" $ do delaySeconds (Job.defaultPollingInterval + Seconds 2) assertJobIdStatus conn tname logRef "Job had a runAt date in the past. It should have been successful by now" Job.Success jobId +testJobDeletion appPool jobPool = testCase "job immediae deletion" $ do + withRandomTable jobPool $ \tname -> do + withNamedJobMonitor tname jobPool (\cfg -> cfg { Job.cfgDelayedJobDeletion = Just $ Job.defaultDelayedJobDeletion tname pinterval, Job.cfgDefaultMaxAttempts = 2 }) $ \_logRef -> do + Pool.withResource appPool $ \conn -> do + + let assertDeletedJob msg jid = + Job.findJobByIdIO conn tname jid >>= \case + Nothing -> pure () + Just _ -> assertFailure msg + + successJob <- Job.createJob conn tname (PayloadSucceed 0) + failJob <- Job.createJob conn tname (PayloadAlwaysFail 0) + delaySeconds (Job.defaultPollingInterval * 3) + + assertDeletedJob "Expecting successful job to be immediately deleted" (jobId successJob) + + j <- ensureJobId conn tname (jobId failJob) + assertEqual "Exepcting job to be in Failed status" Job.Failed (jobStatus j) + + delaySeconds (Job.defaultPollingInterval * 4) + assertDeletedJob "Expecting failed job to be deleted after adequate delay" (jobId failJob) + where + pinterval = show (Job.unSeconds Job.defaultPollingInterval) <> " seconds" + testJobFailure appPool jobPool = testCase "job failure" $ do withNewJobMonitor jobPool $ \tname _logRef -> do Pool.withResource appPool $ \conn -> do @@ -641,7 +668,7 @@ setup' jobPool defaultLimit action = cfgFn resCfg cfg = cfg { Job.cfgDefaultMaxAttempts = 1 -- Simplifies some tests where we have a failing job , Job.cfgConcurrencyControl = Job.ResourceLimits resCfg - , Job.cfgDeleteSuccessfulJobs = False + , Job.cfgImmediateJobDeletion = const $ pure False } testKillJob appPool jobPool = testCase "killing a ongoing job" $ do