From c47ebcd400493617f98251307265cb5b5caf2f1b Mon Sep 17 00:00:00 2001 From: Pavan Varma Date: Mon, 2 Aug 2021 18:35:31 +0530 Subject: [PATCH 1/3] Add ability to kill ongoing job This adds the ability to kill a running job from the web UI, by adding an action to the menu for running jobs. When the "kill" button is clicked, the job is marked for cancellation and an asynchronous poller is responsible for reaping it later. This adds a new status "Cancelled", which represents a job that has been manually killed and is either waiting to be reaped, or has been reaped and can be queued again. To allow for cancelling job threads by ID, the internal runner now tracks job IDs alongside the spawned threads. Co-authored-by: Isaac van Bakel --- examples/OddJobsCliExample.lhs | 4 +- odd-jobs.cabal | 4 ++ package.yaml | 2 +- src/OddJobs/ConfigBuilder.hs | 10 ++++ src/OddJobs/Endpoints.hs | 12 ++++ src/OddJobs/Job.hs | 101 +++++++++++++++++++++++++++++---- src/OddJobs/Migrations.hs | 4 +- src/OddJobs/Types.hs | 14 ++++- src/OddJobs/Web.hs | 26 ++++++++- test/Test.hs | 24 ++++++++ 10 files changed, 182 insertions(+), 19 deletions(-) diff --git a/examples/OddJobsCliExample.lhs b/examples/OddJobsCliExample.lhs index dc8b6f6..eef5735 100644 --- a/examples/OddJobsCliExample.lhs +++ b/examples/OddJobsCliExample.lhs @@ -72,11 +72,11 @@ In this example, the core job-runner function is in the `IO` monad. In all proba myJobRunner :: Job -> IO () myJobRunner job = do throwParsePayload job >>= \case - SendWelcomeEmail _userId -> do + SendWelcomeEmail userId -> do putStrLn $ "This should call the function that actually sends the welcome email. " <> "\nWe are purposely waiting 60 seconds before completing this job so that graceful shutdown can be demonstrated." delaySeconds (Seconds 60) - putStrLn "60 second wait is now over..." + putStrLn $ "SendWelcomeEmail to user: " <> show userId <> " complete (60 second wait is now over...)" SendPasswordResetEmail _tkn -> putStrLn "This should call the function that actually sends the password-reset email" SetupSampleData _userId -> do diff --git a/odd-jobs.cabal b/odd-jobs.cabal index 94af975..ae8bc0a 100644 --- a/odd-jobs.cabal +++ b/odd-jobs.cabal @@ -75,6 +75,8 @@ library , async ==2.2.4 , base >=4.7 && <5 , bytestring + , containers + , daemons , directory , either , fast-logger @@ -137,6 +139,8 @@ executable devel , async ==2.2.4 , base >=4.7 && <5 , bytestring + , containers + , daemons , directory , either , fast-logger diff --git a/package.yaml b/package.yaml index f8580c7..55d2a0b 100644 --- a/package.yaml +++ b/package.yaml @@ -85,6 +85,7 @@ dependencies: - servant-server - servant-lucid - warp + - containers - unordered-containers - optparse-applicative - filepath @@ -149,4 +150,3 @@ tests: - mmorph - lifted-base - lifted-async - - containers diff --git a/src/OddJobs/ConfigBuilder.hs b/src/OddJobs/ConfigBuilder.hs index 9d5ed22..695618f 100644 --- a/src/OddJobs/ConfigBuilder.hs +++ b/src/OddJobs/ConfigBuilder.hs @@ -144,6 +144,10 @@ defaultLogStr jobTypeFn logLevel logEvent = LogJobTimeout j@Job{jobLockedAt, jobLockedBy} -> "Timeout | " <> jobToLogStr j <> " | lockedBy=" <> toLogStr (maybe "unknown" unJobRunnerName jobLockedBy) <> " lockedAt=" <> toLogStr (maybe "unknown" show jobLockedAt) + LogKillJobSuccess j -> + "Kill Job Success | " <> jobToLogStr j + LogKillJobFailed j -> + "Kill Job Failed | " <> jobToLogStr j <> "(the job might have completed or timed out)" LogPoll -> "Polling jobs table" LogWebUIRequest -> @@ -321,6 +325,12 @@ defaultJsonLogEvent logEvent = LogJobTimeout job -> Aeson.object [ "tag" Aeson..= ("LogJobTimeout" :: Text) , "contents" Aeson..= defaultJsonJob job ] + LogKillJobSuccess job -> + Aeson.object [ "tag" Aeson..= ("LogKillJobSuccess" :: Text) + , "contents" Aeson..= defaultJsonJob job ] + LogKillJobFailed job -> + Aeson.object [ "tag" Aeson..= ("LogKillJobFailed" :: Text) + , "contents" Aeson..= defaultJsonJob job ] LogPoll -> Aeson.object [ "tag" Aeson..= ("LogJobPoll" :: Text)] LogWebUIRequest -> diff --git a/src/OddJobs/Endpoints.hs b/src/OddJobs/Endpoints.hs index f2edcc0..1b652cf 100644 --- a/src/OddJobs/Endpoints.hs +++ b/src/OddJobs/Endpoints.hs @@ -60,6 +60,7 @@ data Routes route = Routes , rEnqueue :: route :- "enqueue" :> Capture "jobId" JobId :> Post '[HTML] NoContent , rRunNow :: route :- "run" :> Capture "jobId" JobId :> Post '[HTML] NoContent , rCancel :: route :- "cancel" :> Capture "jobId" JobId :> Post '[HTML] NoContent + , rKill :: route :- "kill" :> Capture "jobId" JobId :> Post '[HTML] NoContent , rRefreshJobTypes :: route :- "refresh-job-types" :> Post '[HTML] NoContent , rRefreshJobRunners :: route :- "refresh-job-runners" :> Post '[HTML] NoContent } deriving (Generic) @@ -114,6 +115,7 @@ server cfg env nt = { rFilterResults = nt . filterResults cfg env , rEnqueue = nt . enqueueJob cfg env , rCancel = nt . cancelJob cfg env + , rKill = nt . killJob cfg env , rRunNow = nt . runJobNow cfg env , rRefreshJobTypes = nt $ refreshJobTypes cfg env , rRefreshJobRunners = nt $ refreshJobRunners cfg env @@ -126,6 +128,7 @@ server2 cfg env = Routes { rFilterResults = filterResults cfg env , rEnqueue = enqueueJob cfg env , rCancel = cancelJob cfg env + , rKill = killJob cfg env , rRunNow = runJobNow cfg env , rRefreshJobTypes = refreshJobTypes cfg env , rRefreshJobRunners = refreshJobRunners cfg env @@ -156,6 +159,14 @@ cancelJob UIConfig{..} env jid = do liftIO $ withResource uicfgDbPool $ \conn -> void $ cancelJobIO conn uicfgTableName jid redirectToHome env +killJob :: UIConfig + -> Env + -> JobId + -> Handler NoContent +killJob UIConfig{..} env jid = do + liftIO $ withResource uicfgDbPool $ \conn -> void $ killJobIO conn uicfgTableName jid + redirectToHome env + runJobNow :: UIConfig -> Env -> JobId @@ -201,6 +212,7 @@ routes linkFn = Web.Routes , Web.rEnqueue = rEnqueue , Web.rRunNow = rRunNow , Web.rCancel = rCancel + , Web.rKill = rKill , Web.rRefreshJobTypes = rRefreshJobTypes , Web.rRefreshJobRunners = rRefreshJobRunners , Web.rStaticAsset = linkFn diff --git a/src/OddJobs/Job.hs b/src/OddJobs/Job.hs index f93f78a..b887ac3 100644 --- a/src/OddJobs/Job.hs +++ b/src/OddJobs/Job.hs @@ -65,6 +65,7 @@ module OddJobs.Job , runJobNowIO , unlockJobIO , cancelJobIO + , killJobIO , jobDbColumns , jobPollingIO , concatJobDbColumns @@ -110,14 +111,16 @@ import Data.Aeson hiding (Success) import qualified Data.Aeson as Aeson import qualified Data.Aeson.Types as Aeson (Parser, parseMaybe) import Data.String.Conv (StringConv(..), toS) -import Data.Functor (void) +import Data.Functor ((<&>), void) import Control.Monad (forever, forM_, join) import Data.Maybe (isNothing, maybe, fromMaybe, listToMaybe, mapMaybe) import Data.Either (either) import Control.Monad.Reader import GHC.Generics +import Data.Map (Map) import qualified Data.HashMap.Strict as HM import qualified Data.List as DL +import qualified Data.Map as DM import qualified Data.ByteString as BS import qualified Data.ByteString.Lazy as BSL import System.FilePath (FilePath) @@ -169,7 +172,7 @@ class (MonadUnliftIO m, MonadBaseControl IO m) => HasJobRunner m where data RunnerEnv = RunnerEnv { envConfig :: !Config - , envJobThreadsRef :: !(IORef [Async ()]) + , envJobThreadsRef :: !(IORef (Map JobId (Async ()))) } type RunnerM = ReaderT RunnerEnv IO @@ -218,7 +221,7 @@ instance HasJobRunner RunnerM where startJobRunner :: Config -> IO () startJobRunner jm = do traceM "startJobRunner top" - r <- newIORef [] + r <- newIORef DM.empty let monitorEnv = RunnerEnv { envConfig = jm , envJobThreadsRef = r @@ -346,7 +349,7 @@ deleteJobIO conn tname jid = do runJobNowIO :: Connection -> TableName -> JobId -> IO (Maybe Job) runJobNowIO conn tname jid = do t <- getCurrentTime - updateJobHelper tname conn (Queued, [Queued, Retry, Failed], Just t, jid) + updateJobHelper tname conn (Queued, [Queued, Retry, Failed, Cancelled], Just t, jid) -- | TODO: First check in all job-runners if this job is still running, or not, -- and somehow send an uninterruptibleCancel to that thread. @@ -358,7 +361,11 @@ unlockJobIO conn tname jid = do cancelJobIO :: Connection -> TableName -> JobId -> IO (Maybe Job) cancelJobIO conn tname jid = - updateJobHelper tname conn (Failed, [Queued, Retry], Nothing, jid) + updateJobHelper tname conn (Cancelled, [Queued, Retry], Nothing, jid) + +killJobIO :: Connection -> TableName -> JobId -> IO (Maybe Job) +killJobIO conn tname jid = + updateJobHelper tname conn (Cancelled, [Locked], Nothing, jid) updateJobHelper :: TableName -> Connection @@ -383,13 +390,17 @@ runJobWithTimeout :: (HasJobRunner m) => Seconds -> Job -> m () -runJobWithTimeout timeoutSec job = do +runJobWithTimeout timeoutSec job@Job{jobId} = do threadsRef <- envJobThreadsRef <$> getRunnerEnv jobRunner_ <- getJobRunner a <- async $ liftIO $ jobRunner_ job - _x <- atomicModifyIORef' threadsRef $ \threads -> (a:threads, DL.map asyncThreadId (a:threads)) + _x <- atomicModifyIORef' threadsRef $ \threads -> + ( DM.insert jobId a threads + , DL.map asyncThreadId $ DM.elems $ DM.insert jobId a threads + ) + -- liftIO $ putStrLn $ "Threads: " <> show x log LevelDebug $ LogText $ toS $ "Spawned job in " <> show (asyncThreadId a) @@ -399,7 +410,7 @@ runJobWithTimeout timeoutSec job = do void $ finally (waitEitherCancel a t) - (atomicModifyIORef' threadsRef $ \threads -> (DL.delete a threads, ())) + (atomicModifyIORef' threadsRef $ \threads -> (DM.delete jobId threads, ())) -- | runs a job, blocks for as long as it's in progress @@ -455,6 +466,25 @@ runJob jid = do pure () +killJob :: (HasJobRunner m) => JobId -> m () +killJob jid = do + threadsRef <- envJobThreadsRef <$> getRunnerEnv + threads <- liftIO $ readIORef threadsRef + mJob <- findJobById jid + + case (mJob, jid `DM.lookup` threads) of + (Just job, Just thread) -> do + log LevelInfo $ LogKillJobSuccess job + void $ finally + (uninterruptibleCancel thread) + (atomicModifyIORef' threadsRef $ \threads' -> (DM.delete jid threads', ())) + + (Just job, Nothing) -> do + log LevelInfo $ LogKillJobFailed job + + (Nothing, _) -> + log LevelError $ LogText $ "Unable to find job in db to kill, jobId = " <> toS (show jid) + -- TODO: This might have a resource leak. restartUponCrash :: (HasJobRunner m, Show a) => Text -> m a -> m () restartUponCrash name_ action = do @@ -480,9 +510,11 @@ jobMonitor = do traceM "jobMonitor top" a1 <- async $ restartUponCrash "Job poller" jobPoller a2 <- async $ restartUponCrash "Job event listener" jobEventListener + a3 <- async $ restartUponCrash "Job Kill poller" killJobPoller traceM "jobMonitor after async" - finally (void $ waitAnyCatch [a1, a2]) $ do + finally (void $ waitAnyCatch [a1, a2, a3]) $ do log LevelInfo (LogText "Stopping jobPoller and jobEventListener threads.") + cancel a3 cancel a2 cancel a1 log LevelInfo (LogText "Waiting for jobs to complete.") @@ -503,11 +535,19 @@ jobPollingWithResourceSql = \ ORDER BY attempts ASC, run_at ASC LIMIT 1) \ \ RETURNING id" +-- | Ref: 'killJobPoller' +killJobPollingSql :: Query +killJobPollingSql = + "UPDATE ? SET locked_at = NULL, locked_by = NULL \ + \ WHERE id IN (SELECT id FROM ? WHERE status = ? AND locked_by = ? AND locked_at <= ? \ + \ ORDER BY locked_at ASC LIMIT 1 FOR UPDATE \ + \ ) RETURNING id" + waitForJobs :: (HasJobRunner m) => m () waitForJobs = do - threadsRef <- envJobThreadsRef <$> getRunnerEnv - readIORef threadsRef >>= \case + curJobs <- getRunnerEnv >>= (readIORef . envJobThreadsRef) <&> DM.elems + case curJobs of [] -> log LevelInfo $ LogText "All job-threads exited" as -> do tid <- myThreadId @@ -526,7 +566,7 @@ getConcurrencyControlFn :: (HasJobRunner m) getConcurrencyControlFn = getConcurrencyControl >>= \case UnlimitedConcurrentJobs -> pure $ const $ pure PollAny MaxConcurrentJobs maxJobs -> pure $ const $ do - curJobs <- getRunnerEnv >>= (readIORef . envJobThreadsRef) + curJobs <- getRunnerEnv >>= (readIORef . envJobThreadsRef) <&> DM.elems pure $ pollIf $ DL.length curJobs < maxJobs ResourceLimits resCfg -> pure $ const $ pure $ PollWithResources resCfg DynamicConcurrency fn -> pure $ const $ pollIf <$> liftIO fn @@ -630,6 +670,43 @@ pollRunJob processName mResCfg = do delayAction = delaySeconds =<< getPollingInterval noDelayAction = pure () +-- | Executes 'killJobPollingSql' every 'cfgPollingInterval' seconds to pick up jobs +-- that are cancelled and need to be killed. Uses @UPDATE@ along with @SELECT... +-- ..FOR UPDATE@ to efficiently find a job that matches /all/ of the following +-- conditions: +-- +-- * 'jobStatus' should be 'cancelled' +-- * 'jobLockedAt' should be in the past +-- * 'jobLockedBy' should be the current job worker name + +killJobPoller :: (HasJobRunner m) => m () +killJobPoller = do + processName <- liftIO jobWorkerName + pool <- getDbPool + tname <- getTableName + + let pollJobToKill conn = join $ mask_ $ do + currentTime <- liftIO getCurrentTime + result <- liftIO $ PGS.query conn killJobPollingSql + (tname, tname, Cancelled, processName, currentTime) + + case result of + [] -> + pure delayAction + + [Only (jobId :: JobId)] -> do + void $ async $ killJob jobId + pure noDelayAction + + x -> + error $ "I was supposed to get only a single row, but got: " ++ show x + + withResource pool (forever . pollJobToKill) + + where + delayAction = delaySeconds =<< getPollingInterval + noDelayAction = pure () + -- | Uses PostgreSQL's LISTEN/NOTIFY to be immediately notified of newly created -- jobs. jobEventListener :: (HasJobRunner m) diff --git a/src/OddJobs/Migrations.hs b/src/OddJobs/Migrations.hs index a675c51..4b125e0 100644 --- a/src/OddJobs/Migrations.hs +++ b/src/OddJobs/Migrations.hs @@ -22,7 +22,9 @@ createJobTableQuery = "CREATE TABLE IF NOT EXISTS ?" <> ", attempts int not null default 0" <> ", locked_at timestamp with time zone null" <> ", locked_by text null" <> - ", constraint incorrect_locking_info CHECK ((status <> 'locked' and locked_at is null and locked_by is null) or (status = 'locked' and locked_at is not null and locked_by is not null))" <> + ", constraint incorrect_locking_info CHECK (" <> + "(locked_at is null and locked_by is null and status <> 'locked') or " <> + "(locked_at is not null and locked_by is not null and (status = 'locked' or status = 'cancelled')))" <> ");" <> "create index if not exists ? on ?(created_at);" <> "create index if not exists ? on ?(updated_at);" <> diff --git a/src/OddJobs/Types.hs b/src/OddJobs/Types.hs index cb46c56..02e80d4 100644 --- a/src/OddJobs/Types.hs +++ b/src/OddJobs/Types.hs @@ -35,7 +35,7 @@ import Control.Monad.Logger (LogLevel) -- myJobsTable :: TableName -- myJobsTable = "my_jobs" -- @ --- +-- -- This should also work for table names qualified by the schema name. For example: -- -- @ @@ -72,6 +72,11 @@ data LogEvent | LogJobFailed !Job !SomeException !FailureMode !NominalDiffTime -- | Emitted when a job times out and is picked-up again for execution | LogJobTimeout !Job + -- | Emitted when user kills a job and the job thread sucessfully cancelled thereafter. + | LogKillJobSuccess !Job + -- | Emitted when user kills a job and the job thread is not found in the threadRef + -- | (most likely the job has either got completed or timed out). + | LogKillJobFailed !Job -- | Emitted whenever 'OddJobs.Job.jobPoller' polls the DB table | LogPoll -- | TODO @@ -182,8 +187,11 @@ data Status -- | Jobs in 'Queued' status /may/ be picked up by the job-runner on the basis -- of the 'jobRunAt' field. | Queued - -- | Jobs in 'Failed' status will will not be retried by the job-runner. + -- | Jobs in 'Failed' status will not be retried by the job-runner. | Failed + -- | Jobs with 'Cancelled' status are cancelled by the user and will not be + -- retried by the job-runner + | Cancelled -- | Jobs in 'Retry' status will be retried by the job-runner on the basis of -- the 'jobRunAt' field. | Retry @@ -227,6 +235,7 @@ instance ToText Status where Queued -> "queued" Retry -> "retry" Failed -> "failed" + Cancelled -> "cancelled" Locked -> "locked" instance (StringConv Text a) => FromText (Either a Status) where @@ -234,6 +243,7 @@ instance (StringConv Text a) => FromText (Either a Status) where "success" -> Right Success "queued" -> Right Queued "failed" -> Right Failed + "cancelled" -> Right Cancelled "retry" -> Right Retry "locked" -> Right Locked x -> Left $ toS $ "Unknown job status: " <> x diff --git a/src/OddJobs/Web.hs b/src/OddJobs/Web.hs index 5725195..31bb57b 100644 --- a/src/OddJobs/Web.hs +++ b/src/OddJobs/Web.hs @@ -116,6 +116,7 @@ instance ToHttpApiData Filter where -- , rEnqueue :: route :- "enqueue" :> Capture "jobId" JobId :> Post '[HTML] NoContent -- , rRunNow :: route :- "run" :> Capture "jobId" JobId :> Post '[HTML] NoContent -- , rCancel :: route :- "cancel" :> Capture "jobId" JobId :> Post '[HTML] NoContent +-- , rKill :: route :- "kill" :> Capture "jobId" JobId :> Post '[HTML] NoContent -- , rRefreshJobTypes :: route :- "refresh-job-types" :> Post '[HTML] NoContent -- , rRefreshJobRunners :: route :- "refresh-job-runners" :> Post '[HTML] NoContent -- } deriving (Generic) @@ -126,6 +127,7 @@ data Routes = Routes , rEnqueue :: JobId -> Text , rRunNow :: JobId -> Text , rCancel :: JobId -> Text + , rKill :: JobId -> Text , rRefreshJobTypes :: Text , rRefreshJobRunners :: Text , rStaticAsset :: Text -> Text @@ -382,6 +384,7 @@ jobRow routes t (job@Job{..}, jobHtml) = do let statusFn = case jobStatus of Job.Success -> statusSuccess Job.Failed -> statusFailed + Job.Cancelled -> statusCancelled Job.Queued -> if jobRunAt > t then statusFuture else statusWaiting @@ -394,11 +397,12 @@ jobRow routes t (job@Job{..}, jobHtml) = do let actionsFn = case jobStatus of Job.Success -> const mempty Job.Failed -> actionsFailed + Job.Cancelled -> actionsCancelled Job.Queued -> if jobRunAt > t then actionsFuture else actionsWaiting Job.Retry -> actionsRetry - Job.Locked -> const mempty + Job.Locked -> actionsLocked actionsFn routes job @@ -407,6 +411,15 @@ actionsFailed Routes{..} Job{..} = do form_ [ action_ (rEnqueue jobId), method_ "post" ] $ do button_ [ class_ "btn btn-secondary", type_ "submit" ] "Enqueue again" +actionsCancelled :: Routes -> Job -> Html () +actionsCancelled Routes{..} Job{..} = case (jobLockedAt, jobLockedBy) of + (Just _, Just _) -> do + span_ [ class_ "badge badge-light" ] "Killing job" + + _ -> do + form_ [ action_ (rEnqueue jobId), method_ "post" ] $ do + button_ [ class_ "btn btn-secondary", type_ "submit" ] "Enqueue again" + actionsRetry :: Routes -> Job -> Html () actionsRetry Routes{..} Job{..} = do form_ [ action_ (rRunNow jobId), method_ "post" ] $ do @@ -422,6 +435,11 @@ actionsWaiting Routes{..} Job{..} = do form_ [ action_ (rCancel jobId), method_ "post" ] $ do button_ [ class_ "btn btn-danger", type_ "submit" ] "Cancel" +actionsLocked :: Routes -> Job -> Html () +actionsLocked Routes{..} Job{..} = do + form_ [ action_ (rKill jobId), method_ "post" ] $ do + button_ [ class_ "btn btn-warning", type_ "submit" ] "Kill" + statusSuccess :: UTCTime -> Job -> Html () statusSuccess t Job{..} = do span_ [ class_ "badge badge-success" ] "Success" @@ -436,6 +454,12 @@ statusFailed t Job{..} = do span_ [ class_ "job-run-time" ] $ do abbr_ [ title_ (showText jobUpdatedAt) ] $ toHtml $ "Failed " <> humanReadableTime' t jobUpdatedAt <> " after " <> show jobAttempts <> " attempts" +statusCancelled :: UTCTime -> Job -> Html () +statusCancelled t Job{..} = do + span_ [ class_ "badge badge-danger" ] "Cancelled" + span_ [ class_ "job-run-time" ] $ do + abbr_ [ title_ (showText jobUpdatedAt) ] $ toHtml $ "Cancelled " <> humanReadableTime' t jobUpdatedAt + statusFuture :: UTCTime -> Job -> Html () statusFuture t Job{..} = do span_ [ class_ "badge badge-secondary" ] "Future" diff --git a/test/Test.hs b/test/Test.hs index 341d5a7..de27912 100644 --- a/test/Test.hs +++ b/test/Test.hs @@ -92,6 +92,7 @@ tests appPool jobPool = testGroup "All tests" , testOnJobTimeout appPool jobPool ] , testResourceLimitedScheduling appPool jobPool + , testKillJob appPool jobPool ] -- , testGroup "property tests" [ testEverything appPool jobPool -- -- , propFilterJobs appPool jobPool @@ -166,6 +167,8 @@ logEventToJob le = case le of Job.LogJobSuccess j _ -> Just j Job.LogJobFailed j _ _ _ -> Just j Job.LogJobTimeout j -> Just j + Job.LogKillJobSuccess j -> Just j + Job.LogKillJobFailed j -> Just j Job.LogPoll -> Nothing Job.LogWebUIRequest -> Nothing Job.LogText _ -> Nothing @@ -195,6 +198,8 @@ assertJobIdStatus conn tname logRef msg st jid = do Job.LogJobSuccess j _ -> jid == Job.jobId j Job.LogJobFailed j _ _ _ -> jid == Job.jobId j Job.LogJobTimeout j -> jid == Job.jobId j + Job.LogKillJobSuccess _ -> False + Job.LogKillJobFailed _ -> False Job.LogWebUIRequest -> False Job.LogText _ -> False Job.LogPoll -> False @@ -205,6 +210,12 @@ assertJobIdStatus conn tname logRef msg st jid = do Job.LogJobFailed j _ _ _ -> jid == Job.jobId j _ -> False + Job.Cancelled -> + assertBool (msg <> ": Job killed event not found in job-logs for JobId=" <> show jid) $ + flip DL.any logs $ \case + Job.LogKillJobSuccess j -> jid == Job.jobId j + _ -> False + Job.Retry -> assertBool (msg <> ": Failed event not found in job-logs for JobId=" <> show jid) $ flip DL.any logs $ \case @@ -627,6 +638,19 @@ setup' jobPool defaultLimit action = , Job.cfgDeleteSuccessfulJobs = False } +testKillJob appPool jobPool = testCase "killing a ongoing job" $ do + withRandomTable jobPool $ \tname -> withNamedJobMonitor tname jobPool Prelude.id $ \logRef -> do + Pool.withResource appPool $ \conn -> do + Job{jobId = jid} <- Job.createJob conn tname (PayloadSucceed 60) + delaySeconds $ Job.defaultPollingInterval + Seconds 2 + + assertJobIdStatus conn tname logRef "Job should be running" Job.Locked jid + + _ <- Job.killJobIO conn tname jid + delaySeconds $ Job.defaultPollingInterval + Seconds 2 + + assertJobIdStatus conn tname logRef "Job is cancelled and the job thread should be killed" Job.Cancelled jid + data JobEvent = JobStart | JobRetry | JobSuccess From 47ca817380fdef2709ce88fced0641ee1a67dd7d Mon Sep 17 00:00:00 2001 From: Jappie Klooster Date: Wed, 5 Jul 2023 14:53:10 -0400 Subject: [PATCH 2/3] undo weird merge --- odd-jobs.cabal | 2 -- 1 file changed, 2 deletions(-) diff --git a/odd-jobs.cabal b/odd-jobs.cabal index ae8bc0a..8104388 100644 --- a/odd-jobs.cabal +++ b/odd-jobs.cabal @@ -76,7 +76,6 @@ library , base >=4.7 && <5 , bytestring , containers - , daemons , directory , either , fast-logger @@ -140,7 +139,6 @@ executable devel , base >=4.7 && <5 , bytestring , containers - , daemons , directory , either , fast-logger From d730c0091c0de89620f7f7df2957bf363c320206 Mon Sep 17 00:00:00 2001 From: Jappie Klooster Date: Wed, 5 Jul 2023 15:10:44 -0400 Subject: [PATCH 3/3] add note on killing jobs --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e287e09..cae0cec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## 0.2.3 + Upgrade resource pool ++ Add ability to kill jobs ## 0.2.2