Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow immediate & delayed job deletion #106

Merged
merged 7 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 52 additions & 1 deletion src/OddJobs/ConfigBuilder.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import GHC.Exts (toList)
import qualified Data.ByteString as BS
import UnliftIO (MonadUnliftIO, withRunInIO, bracket, liftIO)
import qualified System.Log.FastLogger as FLogger
import Data.Int (Int64)
import Database.PostgreSQL.Simple.Types as PGS (Identifier(..))

# if MIN_VERSION_aeson(2, 0, 0)
import qualified Data.Aeson.KeyMap as HM
Expand Down Expand Up @@ -78,7 +80,8 @@ mkConfig logger tname dbpool ccControl jrunner configOverridesFn =
, cfgConcurrencyControl = ccControl
, cfgJobType = defaultJobType
, cfgDefaultJobTimeout = Seconds 600
, cfgDeleteSuccessfulJobs = True
, cfgImmediateJobDeletion = defaultImmediateJobDeletion
, cfgDelayedJobDeletion = Nothing
, cfgDefaultRetryBackoff = \attempts -> pure $ Seconds $ 2 ^ attempts
}
in cfg
Expand Down Expand Up @@ -150,6 +153,8 @@ defaultLogStr jobTypeFn logLevel logEvent =
"Kill Job Failed | " <> jobToLogStr j <> "(the job might have completed or timed out)"
LogPoll ->
"Polling jobs table"
LogDeletionPoll n ->
"Job deletion polled and deleted " <> toLogStr n <> " jobs"
LogWebUIRequest ->
"WebUIRequest (TODO: Log the actual request)"
LogText t ->
Expand Down Expand Up @@ -333,6 +338,8 @@ defaultJsonLogEvent logEvent =
, "contents" Aeson..= defaultJsonJob job ]
LogPoll ->
Aeson.object [ "tag" Aeson..= ("LogJobPoll" :: Text)]
LogDeletionPoll n ->
Aeson.object [ "tag" Aeson..= ("LogDeletionPoll" :: Text), "contents" Aeson..= n ]
LogWebUIRequest ->
Aeson.object [ "tag" Aeson..= ("LogWebUIRequest" :: Text)]
LogText t ->
Expand All @@ -344,3 +351,47 @@ defaultJsonJob = genericToJSON Aeson.defaultOptions

defaultJsonFailureMode :: FailureMode -> Aeson.Value
defaultJsonFailureMode = genericToJSON Aeson.defaultOptions

defaultImmediateJobDeletion :: Job -> IO Bool
defaultImmediateJobDeletion Job{jobStatus} =
if jobStatus == OddJobs.Types.Success
then pure True
else pure False

-- | Use this function to get a sensible default implementation for the 'cfgDelayedJobDeletion'.
-- You would typically use it as such:
--
-- @
-- let tname = TableName "jobs"
-- loggingFn _ _ = _todo
-- dbPool = _todo
-- myJobRunner = _todo
-- cfg = mkConfig loggingFn tname (MaxConcurrentJobs 10) dbPool jobRunner $ \x ->
-- x { cfgDelayedJobDeletion = Just (defaultDelayedJobDeletion tname "7 days") }
-- @
defaultDelayedJobDeletion ::
TableName ->
-- ^ DB table which holds your jobs. Ref: 'cfgTableName'
String ->
-- ^ Time interval after which successful, failed, and cancelled jobs
-- should be deleted from the table. __NOTE:__ This needs to be expressed
-- as an actual PostgreSQL interval, such as @"7 days"@ or @"12 hours"@
PGS.Connection ->
-- ^ the postgres connection that will be provided to this function,
-- to be able to execute the @DELETE@ statement.
IO Int64
-- ^ number of rows\/jobs deleted
defaultDelayedJobDeletion tname d conn =
PGS.execute conn qry (tname, PGS.In statusList, d)
where
-- this function has been deliberately written like this to ensure that
-- whenever a new Status is added/removed one is forced to update this
-- list and decide what is to be done about the new Status
statusList = flip DL.filter ([minBound..maxBound] :: [OddJobs.Types.Status]) $ \case
Success -> True
Queued -> False
Failed -> True
Cancelled -> True
Retry -> False
Locked -> False
qry = "DELETE FROM ? WHERE status in ? AND run_at < current_timestamp - ? :: interval"
34 changes: 24 additions & 10 deletions src/OddJobs/Job.hs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ import qualified Data.Aeson.Types as Aeson (Parser, parseMaybe)
import Data.String.Conv (StringConv(..), toS)
import Data.Functor ((<&>), void)
import Control.Monad (forever, forM_, join)
import Data.Maybe (isNothing, maybe, fromMaybe, listToMaybe, mapMaybe)
import Data.Maybe (isNothing, maybe, fromMaybe, listToMaybe, mapMaybe, maybeToList)
import Data.Either (either)
import Control.Monad.Reader
import GHC.Generics
Expand All @@ -130,6 +130,7 @@ import GHC.Exts (toList)
import Database.PostgreSQL.Simple.Types as PGS (Identifier(..))
import Database.PostgreSQL.Simple.ToField as PGS (toField)
import OddJobs.Job.Query
import Data.Int (Int64)
#if MIN_VERSION_aeson(2,2,0)
import Data.Aeson.Types
#else
Expand All @@ -148,7 +149,8 @@ import Data.Aeson.Internal (iparse, IResult(..), formatError)
class (MonadUnliftIO m, MonadBaseControl IO m) => HasJobRunner m where
getPollingInterval :: m Seconds
onJobSuccess :: Job -> m ()
deleteSuccessfulJobs :: m Bool
immediateJobDeletion :: m (Job -> IO Bool)
delayedJobDeletion :: m (Maybe (PGS.Connection -> IO Int64))
onJobFailed :: m [JobErrHandler]
getJobRunner :: m (Job -> IO ())
getDbPool :: m (Pool Connection)
Expand Down Expand Up @@ -191,7 +193,8 @@ instance HasJobRunner RunnerM where
onJobSuccess job = do
fn <- asks (cfgOnJobSuccess . envConfig)
logCallbackErrors (jobId job) "onJobSuccess" $ liftIO $ fn job
deleteSuccessfulJobs = asks (cfgDeleteSuccessfulJobs . envConfig)
immediateJobDeletion = asks (cfgImmediateJobDeletion . envConfig)
delayedJobDeletion = asks (cfgDelayedJobDeletion . envConfig)

getJobRunner = asks (cfgJobRunner . envConfig)
getDbPool = asks (cfgDbPool . envConfig)
Expand Down Expand Up @@ -225,13 +228,11 @@ instance HasJobRunner RunnerM where
-- standalone daemon.
startJobRunner :: Config -> IO ()
startJobRunner jm = do
traceM "startJobRunner top"
r <- newIORef DM.empty
let monitorEnv = RunnerEnv
{ envConfig = jm
, envJobThreadsRef = r
}
traceM "before runReaderT"
runReaderT jobMonitor monitorEnv

jobWorkerName :: IO String
Expand Down Expand Up @@ -398,8 +399,8 @@ runJob jid = do
onJobStart job
runJobWithTimeout lockTimeout job
endTime <- liftIO getCurrentTime
shouldDeleteJob <- deleteSuccessfulJobs
let newJob = job{jobStatus=OddJobs.Types.Success, jobLockedBy=Nothing, jobLockedAt=Nothing, jobUpdatedAt = endTime}
shouldDeleteJob <- immediateJobDeletion >>= (\fn -> liftIO $ fn newJob)
if shouldDeleteJob
then deleteJob jid
else void $ saveJob newJob
Expand Down Expand Up @@ -460,7 +461,6 @@ killJob jid = do
-- TODO: This might have a resource leak.
restartUponCrash :: (HasJobRunner m, Show a) => Text -> m a -> m ()
restartUponCrash name_ action = do
traceM "restartUponCrash top"
a <- async action
finally (waitCatch a >>= fn) $ do
log LevelInfo $ LogText $ "Received shutdown: " <> toS name_
Expand All @@ -479,16 +479,19 @@ restartUponCrash name_ action = do
-- executed to finish execution before exiting the main thread.
jobMonitor :: forall m . (HasJobRunner m) => m ()
jobMonitor = do
traceM "jobMonitor top"
a1 <- async $ restartUponCrash "Job poller" jobPoller
a2 <- async $ restartUponCrash "Job event listener" jobEventListener
a3 <- async $ restartUponCrash "Job Kill poller" killJobPoller
traceM "jobMonitor after async"
finally (void $ waitAnyCatch [a1, a2, a3]) $ do
a4 <- delayedJobDeletion >>= \case
Nothing -> pure Nothing
Just deletionFn -> fmap Just $ async $ restartUponCrash "job deletion poller" (jobDeletionPoller deletionFn)
let asyncThreads = [a1, a2, a3] <> maybeToList a4
finally (void $ waitAnyCatch asyncThreads) $ do
log LevelInfo (LogText "Stopping jobPoller and jobEventListener threads.")
cancel a3
cancel a2
cancel a1
maybe (pure ()) cancel a4
log LevelInfo (LogText "Waiting for jobs to complete.")
waitForJobs

Expand Down Expand Up @@ -722,6 +725,17 @@ jobEventListener = do
jid <- o .: "id"
pure (jid, runAt_, mLockedAt_)


jobDeletionPoller :: (HasJobRunner m) => (Connection -> IO Int64) -> m ()
jobDeletionPoller deletionFn = do
i <- getPollingInterval
dbPool <- getDbPool
withDbConnection $ \conn -> do
forever $ do
n <- liftIO $ deletionFn conn
log LevelDebug $ LogDeletionPoll n
delaySeconds i

-- $createJobs
--
-- Ideally you'd want to create wrappers for 'createJob' and 'scheduleJob' in
Expand Down
22 changes: 20 additions & 2 deletions src/OddJobs/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import Data.String.Conv
import Lucid (Html)
import Data.Pool (Pool)
import Control.Monad.Logger (LogLevel)
import Data.Int (Int64)

-- | An alias for 'QualifiedIdentifier' type. It is used for the job table name.
-- Since this type has an instance of 'IsString',
Expand Down Expand Up @@ -79,6 +80,8 @@ data LogEvent
| LogKillJobFailed !Job
-- | Emitted whenever 'OddJobs.Job.jobPoller' polls the DB table
| LogPoll
-- | Emitted whenever 'OddJobs.Job.jobPoller' polls the DB table
| LogDeletionPoll !Int64
-- | TODO
| LogWebUIRequest
-- | Emitted whenever any other event occurs
Expand Down Expand Up @@ -380,8 +383,23 @@ data Config = Config
-- picked up for execution again
, cfgDefaultJobTimeout :: Seconds

-- | Should successful jobs be deleted from the queue to save on table space?
, cfgDeleteSuccessfulJobs :: Bool
-- | After a job attempt, should it be immediately deleted to save table space? The default
-- behaviour, as defined by 'OddJobs.ConfigBuilder.defaultImmediateJobDeletion' is to delete
-- successful jobs immediately (and retain everything else). If you are providing your
-- own implementation here, __be careful__ to check for the job's status before deciding
-- whether to delete it, or not.
--
-- A /possible/ use-case for non-successful jobs could be check the 'jobResult' for a failed job
-- and depending up on the 'jobResult' decide if there is no use retrying it, and if it should be
-- immediately deleted.
, cfgImmediateJobDeletion :: Job -> IO Bool

-- | A funciton which will be run every 'cfgPollingInterval' seconds to delete
-- old jobs that may be hanging around in the @jobs@ table (eg. failed jobs, cancelled jobs, or even
-- successful jobs whose deletion has been delayed via a custom 'cfgImmediateJobDeletion' function).
--
-- Ref: 'OddJobs.ConfigBuilder.defaultDelayedJobDeletionSql'
, cfgDelayedJobDeletion :: Maybe (PGS.Connection -> IO Int64)

-- | How far into the future should jobs which can be retried be queued for?
--
Expand Down
29 changes: 28 additions & 1 deletion test/Test.hs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ tests appPool jobPool = testGroup "All tests"
, testGracefulShutdown appPool jobPool
, testPushFailedJobEndQueue jobPool
, testRetryBackoff appPool jobPool
, testJobDeletion appPool jobPool
, testGroup "callback tests"
[ testOnJobFailed appPool jobPool
, testOnJobStart appPool jobPool
Expand Down Expand Up @@ -176,6 +177,7 @@ logEventToJob le = case le of
Job.LogKillJobSuccess j -> Just j
Job.LogKillJobFailed j -> Just j
Job.LogPoll -> Nothing
Job.LogDeletionPoll _ -> Nothing
Job.LogWebUIRequest -> Nothing
Job.LogText _ -> Nothing

Expand Down Expand Up @@ -209,6 +211,7 @@ assertJobIdStatus conn tname logRef msg st jid = do
Job.LogWebUIRequest -> False
Job.LogText _ -> False
Job.LogPoll -> False
Job.LogDeletionPoll _ -> False

Job.Failed ->
assertBool (msg <> ": Failed event not found in job-logs for JobId=" <> show jid) $
Expand Down Expand Up @@ -379,6 +382,30 @@ testJobScheduling appPool jobPool = testCase "job scheduling" $ do
delaySeconds (Job.defaultPollingInterval + Seconds 2)
assertJobIdStatus conn tname logRef "Job had a runAt date in the past. It should have been successful by now" Job.Success jobId

testJobDeletion appPool jobPool = testCase "job immediae deletion" $ do
withRandomTable jobPool $ \tname -> do
withNamedJobMonitor tname jobPool (\cfg -> cfg { Job.cfgDelayedJobDeletion = Just $ Job.defaultDelayedJobDeletion tname pinterval, Job.cfgDefaultMaxAttempts = 2 }) $ \_logRef -> do
Pool.withResource appPool $ \conn -> do

let assertDeletedJob msg jid =
Job.findJobByIdIO conn tname jid >>= \case
Nothing -> pure ()
Just _ -> assertFailure msg

successJob <- Job.createJob conn tname (PayloadSucceed 0)
failJob <- Job.createJob conn tname (PayloadAlwaysFail 0)
delaySeconds (Job.defaultPollingInterval * 3)

assertDeletedJob "Expecting successful job to be immediately deleted" (jobId successJob)

j <- ensureJobId conn tname (jobId failJob)
assertEqual "Exepcting job to be in Failed status" Job.Failed (jobStatus j)

delaySeconds (Job.defaultPollingInterval * 4)
assertDeletedJob "Expecting failed job to be deleted after adequate delay" (jobId failJob)
where
pinterval = show (Job.unSeconds Job.defaultPollingInterval) <> " seconds"

testJobFailure appPool jobPool = testCase "job failure" $ do
withNewJobMonitor jobPool $ \tname _logRef -> do
Pool.withResource appPool $ \conn -> do
Expand Down Expand Up @@ -641,7 +668,7 @@ setup' jobPool defaultLimit action =
cfgFn resCfg cfg = cfg
{ Job.cfgDefaultMaxAttempts = 1 -- Simplifies some tests where we have a failing job
, Job.cfgConcurrencyControl = Job.ResourceLimits resCfg
, Job.cfgDeleteSuccessfulJobs = False
, Job.cfgImmediateJobDeletion = const $ pure False
}

testKillJob appPool jobPool = testCase "killing a ongoing job" $ do
Expand Down
Loading