Skip to content

Commit

Permalink
changes to support saving of job results
Browse files Browse the repository at this point in the history
  • Loading branch information
saurabhnanda committed Oct 5, 2023
1 parent 51c7443 commit 2402d57
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 40 deletions.
7 changes: 5 additions & 2 deletions examples/OddJobsCliExample.lhs
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,22 @@ data MyJob
In this example, the core job-runner function is in the `IO` monad. In all probability, you application's code will be in a custom monad, and not IO. Pleae refer to TODO, on how to work with custom monads.
\begin{code}
myJobRunner :: Job -> IO ()
myJobRunner :: Job -> IO (Maybe Aeson.Value)
myJobRunner job = do
throwParsePayload job >>= \case
SendWelcomeEmail userId -> do
putStrLn $ "This should call the function that actually sends the welcome email. " <>
"\nWe are purposely waiting 60 seconds before completing this job so that graceful shutdown can be demonstrated."
delaySeconds (Seconds 60)
putStrLn $ "SendWelcomeEmail to user: " <> show userId <> " complete (60 second wait is now over...)"
SendPasswordResetEmail _tkn ->
pure Nothing
SendPasswordResetEmail _tkn -> do
putStrLn "This should call the function that actually sends the password-reset email"
pure Nothing
SetupSampleData _userId -> do
_ <- Prelude.error "User onboarding is incomplete"
putStrLn "This should call the function that actually sets up sample data in a newly registered user's account"
pure Nothing
\end{code}
=== 5. Write the main function using `OddJobs.Cli`
Expand Down
2 changes: 1 addition & 1 deletion src/OddJobs/ConfigBuilder.hs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ mkConfig :: (LogLevel -> LogEvent -> IO ())
-- ^ DB connection-pool to be used by job-runner. Ref: 'cfgDbPool'
-> ConcurrencyControl
-- ^ Concurrency configuration. Ref: 'cfgConcurrencyControl'
-> (Job -> IO ())
-> (Job -> IO (Maybe Aeson.Value))
-- ^ The actual "job runner" which contains your application code. Ref: 'cfgJobRunner'
-> (Config -> Config)
-- ^ A function that allows you to modify the \"interim config\". The
Expand Down
44 changes: 23 additions & 21 deletions src/OddJobs/Job.hs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class (MonadUnliftIO m, MonadBaseControl IO m) => HasJobRunner m where
onJobSuccess :: Job -> m ()
deleteSuccessfulJobs :: m Bool
onJobFailed :: m [JobErrHandler]
getJobRunner :: m (Job -> IO ())
getJobRunner :: m (Job -> IO (Maybe Aeson.Value))
getDbPool :: m (Pool Connection)
getTableName :: m TableName
onJobStart :: Job -> m ()
Expand Down Expand Up @@ -220,13 +220,11 @@ instance HasJobRunner RunnerM where
-- standalone daemon.
startJobRunner :: Config -> IO ()
startJobRunner jm = do
traceM "startJobRunner top"
r <- newIORef DM.empty
let monitorEnv = RunnerEnv
{ envConfig = jm
, envJobThreadsRef = r
}
traceM "before runReaderT"
runReaderT jobMonitor monitorEnv

jobWorkerName :: IO String
Expand All @@ -253,6 +251,8 @@ jobDbColumns =
, "attempts"
, "locked_at"
, "locked_by"
, "parent_job_id"
, "result"
]

-- | All 'jobDbColumns' joined together with commas. Useful for constructing SQL
Expand Down Expand Up @@ -309,7 +309,7 @@ findJobByIdIO conn tname jid = PGS.query conn findJobByIdQuery (tname, jid) >>=


saveJobQuery :: PGS.Query
saveJobQuery = "UPDATE ? set run_at = ?, status = ?, payload = ?, last_error = ?, attempts = ?, locked_at = ?, locked_by = ? WHERE id = ? RETURNING " <> concatJobDbColumns
saveJobQuery = "UPDATE ? set run_at = ?, status = ?, payload = ?, last_error = ?, attempts = ?, locked_at = ?, locked_by = ?, result = ? WHERE id = ? RETURNING " <> concatJobDbColumns

deleteJobQuery :: PGS.Query
deleteJobQuery = "DELETE FROM ? WHERE id = ?"
Expand All @@ -320,7 +320,7 @@ saveJob j = do
withDbConnection $ \conn -> liftIO $ saveJobIO conn tname j

saveJobIO :: Connection -> TableName -> Job -> IO Job
saveJobIO conn tname Job{jobRunAt, jobStatus, jobPayload, jobLastError, jobAttempts, jobLockedBy, jobLockedAt, jobId} = do
saveJobIO conn tname Job{jobRunAt, jobStatus, jobPayload, jobLastError, jobAttempts, jobLockedBy, jobLockedAt, jobResult, jobId} = do
rs <- PGS.query conn saveJobQuery
( tname
, jobRunAt
Expand All @@ -330,6 +330,7 @@ saveJobIO conn tname Job{jobRunAt, jobStatus, jobPayload, jobLastError, jobAttem
, jobAttempts
, jobLockedAt
, jobLockedBy
, jobResult
, jobId
)
case rs of
Expand Down Expand Up @@ -389,29 +390,30 @@ instance Exception TimeoutException
runJobWithTimeout :: (HasJobRunner m)
=> Seconds
-> Job
-> m ()
-> m (Maybe Aeson.Value)
runJobWithTimeout timeoutSec job@Job{jobId} = do
threadsRef <- envJobThreadsRef <$> getRunnerEnv
jobRunner_ <- getJobRunner

a <- async $ liftIO $ jobRunner_ job

_x <- atomicModifyIORef' threadsRef $ \threads ->
( DM.insert jobId a threads
, DL.map asyncThreadId $ DM.elems $ DM.insert jobId a threads
)
atomicModifyIORef' threadsRef $ \threads -> ( DM.insert jobId (void a) threads, () )
-- ( DM.insert jobId a threads
-- , DL.map asyncThreadId $ DM.elems $ DM.insert jobId a threads
-- )

-- liftIO $ putStrLn $ "Threads: " <> show x
log LevelDebug $ LogText $ toS $ "Spawned job in " <> show (asyncThreadId a)

t <- async $ do
delaySeconds timeoutSec
throwIO TimeoutException
t <- async $ delaySeconds timeoutSec

void $ finally
(waitEitherCancel a t)

finally
(waitEitherCancel t a >>= either (const $ throwIO TimeoutException) pure)
(atomicModifyIORef' threadsRef $ \threads -> (DM.delete jobId threads, ()))




-- | runs a job, blocks for as long as it's in progress
runJob :: (HasJobRunner m) => JobId -> m ()
Expand All @@ -424,13 +426,17 @@ runJob jid = do
log LevelInfo $ LogJobStart job
flip catch (exceptionHandler job startTime) $ do
onJobStart job
runJobWithTimeout lockTimeout job
jresult <- runJobWithTimeout lockTimeout job
endTime <- liftIO getCurrentTime
shouldDeleteJob <- deleteSuccessfulJobs
let newJob = job{jobStatus=Success, jobLockedBy=Nothing, jobLockedAt=Nothing, jobUpdatedAt = endTime}
let newJob = job{jobStatus=Success, jobLockedBy=Nothing, jobLockedAt=Nothing, jobUpdatedAt = endTime, jobResult = jresult}
if shouldDeleteJob
then deleteJob jid
else void $ saveJob newJob
-- case jobParentJobId job of
-- Nothing -> do
-- Just _ -> do

log LevelInfo $ LogJobSuccess newJob (diffUTCTime endTime startTime)
onJobSuccess newJob
pure ()
Expand Down Expand Up @@ -488,7 +494,6 @@ killJob jid = do
-- TODO: This might have a resource leak.
restartUponCrash :: (HasJobRunner m, Show a) => Text -> m a -> m ()
restartUponCrash name_ action = do
traceM "restartUponCrash top"
a <- async action
finally (waitCatch a >>= fn) $ do
log LevelInfo $ LogText $ "Received shutdown: " <> toS name_
Expand All @@ -498,7 +503,6 @@ restartUponCrash name_ action = do
case x of
Left (e :: SomeException) -> log LevelError $ LogText $ name_ <> " seems to have exited with an error. Restarting: " <> toS (show e)
Right r -> log LevelError $ LogText $ name_ <> " seems to have exited with the folloing result: " <> toS (show r) <> ". Restaring."
traceM "CRASH OCCURRED"
restartUponCrash name_ action

-- | Spawns 'jobPoller' and 'jobEventListener' in separate threads and restarts
Expand All @@ -507,11 +511,9 @@ restartUponCrash name_ action = do
-- executed to finish execution before exiting the main thread.
jobMonitor :: forall m . (HasJobRunner m) => m ()
jobMonitor = do
traceM "jobMonitor top"
a1 <- async $ restartUponCrash "Job poller" jobPoller
a2 <- async $ restartUponCrash "Job event listener" jobEventListener
a3 <- async $ restartUponCrash "Job Kill poller" killJobPoller
traceM "jobMonitor after async"
finally (void $ waitAnyCatch [a1, a2, a3]) $ do
log LevelInfo (LogText "Stopping jobPoller and jobEventListener threads.")
cancel a3
Expand Down
8 changes: 7 additions & 1 deletion src/OddJobs/Migrations.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ createJobTableQuery = "CREATE TABLE IF NOT EXISTS ?" <>
", attempts int not null default 0" <>
", locked_at timestamp with time zone null" <>
", locked_by text null" <>
", result jsonb" <>
", parent_job_id integer references ?(id)" <>
", constraint incorrect_locking_info CHECK (" <>
"(locked_at is null and locked_by is null and status <> 'locked') or " <>
"(locked_at is not null and locked_by is not null and (status = 'locked' or status = 'cancelled')))" <>
Expand All @@ -31,7 +33,8 @@ createJobTableQuery = "CREATE TABLE IF NOT EXISTS ?" <>
"create index if not exists ? on ?(locked_at);" <>
"create index if not exists ? on ?(locked_by);" <>
"create index if not exists ? on ?(status);" <>
"create index if not exists ? on ?(run_at);"
"create index if not exists ? on ?(run_at);" <>
"create index if not exists ? on ?(parent_job_id);"

createNotificationTrigger :: Query
createNotificationTrigger = "create or replace function ?() returns trigger as $$" <>
Expand All @@ -49,6 +52,7 @@ createJobTable conn tname = void $ do
let tnameTxt = getTnameTxt tname
_ <- PGS.execute conn createJobTableQuery
( tname
, tname
, PGS.Identifier $ "idx_" <> tnameTxt <> "_created_at"
, tname
, PGS.Identifier $ "idx_" <> tnameTxt <> "_updated_at"
Expand All @@ -61,6 +65,8 @@ createJobTable conn tname = void $ do
, tname
, PGS.Identifier $ "idx_" <> tnameTxt <> "_run_at"
, tname
, PGS.Identifier $ "idx_" <> tnameTxt <> "_parent_job_id"
, tname
)
PGS.execute conn createNotificationTrigger
( fnName
Expand Down
17 changes: 16 additions & 1 deletion src/OddJobs/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ data Job = Job
, jobAttempts :: Int
, jobLockedAt :: Maybe UTCTime
, jobLockedBy :: Maybe JobRunnerName
, jobParentJobId :: Maybe JobId
, jobResult :: Maybe Aeson.Value
} deriving (Eq, Show, Generic)

instance ToText Status where
Expand Down Expand Up @@ -269,6 +271,8 @@ instance FromRow Job where
<*> field -- attempts
<*> field -- lockedAt
<*> field -- lockedBy
<*> field -- parentJobId
<*> field -- job result

-- TODO: Add a sum-type for return status which can signal the monitor about
-- whether the job needs to be retried, marked successfull, or whether it has
Expand All @@ -293,6 +297,17 @@ data AllJobTypes
-- | A custom 'IO' action for fetching the list of job-types.
| AJTCustom (IO [Text])


-- | TODO: documentation
-- data JobResult = forall out . ToJSON out => JobResult (Maybe out)

-- instance ToField JobResult where
-- toField (JobResult Nothing) = ToField.Plain "null"
-- toField (JobResult (Just x)) = toField (toJSON x)

-- voidJob :: Functor f => f a -> f JobResult
-- voidJob x = JobResult (Nothing :: Maybe Int) <$ x

-- | While odd-jobs is highly configurable and the 'Config' data-type might seem
-- daunting at first, it is not necessary to tweak every single configuration
-- parameter by hand.
Expand All @@ -309,7 +324,7 @@ data Config = Config
-- throws a runtime exception, the job will be retried
-- 'cfgDefaultMaxAttempts' times. Please look at the examples/tutorials if
-- your applicaton's code is not in the @IO@ monad.
, cfgJobRunner :: Job -> IO ()
, cfgJobRunner :: Job -> IO (Maybe Aeson.Value)

-- | The number of times a failing job is retried before it is considered is
-- "permanently failed" and ignored by the job-runner. This config parameter
Expand Down
4 changes: 3 additions & 1 deletion stack.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
resolver: lts-20.26
resolver: lts-18.28
packages:
- .
extra-deps:
- timing-convenience-0.1@sha256:7ff807a9a9e5596f2b18d45c5a01aefb91d4a98f6a1008d183b5c550f68f7cb7,2092
- resource-pool-0.4.0.0@sha256:9c1e448a159875e21a7e68697feee2b61a4e584720974fa465a2fa1bc0776c73,1342
- servant-lucid-0.9.0.6
- servant-static-th-1.0.0.0
26 changes: 13 additions & 13 deletions test/Test.hs
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ main = do
defaultMain $ tests appPool jobPool
where
connInfo = ConnectInfo
{ connectHost = "localhost"
{ connectHost = "vlpostgres"
, connectPort = fromIntegral (5432 :: Int)
, connectUser = "jobs_test"
, connectPassword = "jobs_test"
, connectDatabase = "jobs_test"
, connectUser = "oddjobs"
, connectPassword = "oddjobs"
, connectDatabase = "oddjobs"
}

createAppPool = Pool.newPool $ Pool.defaultPoolConfig
Expand Down Expand Up @@ -133,11 +133,11 @@ type TestM = ReaderT Env IO
testPayload :: Value
testPayload = toJSON (10 :: Int)

jobRunner :: Job.Job -> IO ()
jobRunner Job{jobPayload, jobAttempts} = case fromJSON jobPayload of
jobRunner :: Job.Job -> IO (Maybe Aeson.Value)
jobRunner Job{jobPayload, jobAttempts} = (Nothing <$) $ case fromJSON jobPayload of
Aeson.Error e -> error e
Success (j :: JobPayload) ->
let recur pload idx = case pload of
let recur pload idx = case pload of
PayloadAlwaysFail delay -> delaySeconds delay >> error ("Forced error after " <> show delay <> " seconds")
PayloadSucceed delay -> delaySeconds delay
PayloadFail delay innerpload -> if idx<jobAttempts
Expand Down Expand Up @@ -256,12 +256,12 @@ runSingleJobFromQueue :: Job.Config -> IO (Maybe (Async ()))
runSingleJobFromQueue config' = do
r <- liftIO $ newIORef mempty
waitTillJobStart <- newEmptyMVar
let config = config' {
Job.cfgPollingInterval = 0
, Job.cfgJobRunner = \job -> do
putMVar waitTillJobStart ()
jobRunner job
}
let config = config'
{ Job.cfgPollingInterval = 0
, Job.cfgJobRunner = \job -> do
putMVar waitTillJobStart ()
jobRunner job
}
let monitorEnv = Job.RunnerEnv
{ Job.envConfig = config
, Job.envJobThreadsRef = r
Expand Down

0 comments on commit 2402d57

Please sign in to comment.