diff --git a/examples/OddJobsCliExample.lhs b/examples/OddJobsCliExample.lhs index eef5735..59d457f 100644 --- a/examples/OddJobsCliExample.lhs +++ b/examples/OddJobsCliExample.lhs @@ -69,7 +69,7 @@ data MyJob In this example, the core job-runner function is in the `IO` monad. In all probability, you application's code will be in a custom monad, and not IO. Pleae refer to TODO, on how to work with custom monads. \begin{code} -myJobRunner :: Job -> IO () +myJobRunner :: Job -> IO (Maybe Aeson.Value) myJobRunner job = do throwParsePayload job >>= \case SendWelcomeEmail userId -> do @@ -77,11 +77,14 @@ myJobRunner job = do "\nWe are purposely waiting 60 seconds before completing this job so that graceful shutdown can be demonstrated." delaySeconds (Seconds 60) putStrLn $ "SendWelcomeEmail to user: " <> show userId <> " complete (60 second wait is now over...)" - SendPasswordResetEmail _tkn -> + pure Nothing + SendPasswordResetEmail _tkn -> do putStrLn "This should call the function that actually sends the password-reset email" + pure Nothing SetupSampleData _userId -> do _ <- Prelude.error "User onboarding is incomplete" putStrLn "This should call the function that actually sets up sample data in a newly registered user's account" + pure Nothing \end{code} === 5. Write the main function using `OddJobs.Cli` diff --git a/src/OddJobs/ConfigBuilder.hs b/src/OddJobs/ConfigBuilder.hs index 695618f..1b61c78 100644 --- a/src/OddJobs/ConfigBuilder.hs +++ b/src/OddJobs/ConfigBuilder.hs @@ -49,7 +49,7 @@ mkConfig :: (LogLevel -> LogEvent -> IO ()) -- ^ DB connection-pool to be used by job-runner. Ref: 'cfgDbPool' -> ConcurrencyControl -- ^ Concurrency configuration. Ref: 'cfgConcurrencyControl' - -> (Job -> IO ()) + -> (Job -> IO (Maybe Aeson.Value)) -- ^ The actual "job runner" which contains your application code. Ref: 'cfgJobRunner' -> (Config -> Config) -- ^ A function that allows you to modify the \"interim config\". The diff --git a/src/OddJobs/Job.hs b/src/OddJobs/Job.hs index b887ac3..ae1a624 100644 --- a/src/OddJobs/Job.hs +++ b/src/OddJobs/Job.hs @@ -145,7 +145,7 @@ class (MonadUnliftIO m, MonadBaseControl IO m) => HasJobRunner m where onJobSuccess :: Job -> m () deleteSuccessfulJobs :: m Bool onJobFailed :: m [JobErrHandler] - getJobRunner :: m (Job -> IO ()) + getJobRunner :: m (Job -> IO (Maybe Aeson.Value)) getDbPool :: m (Pool Connection) getTableName :: m TableName onJobStart :: Job -> m () @@ -220,13 +220,11 @@ instance HasJobRunner RunnerM where -- standalone daemon. startJobRunner :: Config -> IO () startJobRunner jm = do - traceM "startJobRunner top" r <- newIORef DM.empty let monitorEnv = RunnerEnv { envConfig = jm , envJobThreadsRef = r } - traceM "before runReaderT" runReaderT jobMonitor monitorEnv jobWorkerName :: IO String @@ -253,6 +251,8 @@ jobDbColumns = , "attempts" , "locked_at" , "locked_by" + , "parent_job_id" + , "result" ] -- | All 'jobDbColumns' joined together with commas. Useful for constructing SQL @@ -309,7 +309,7 @@ findJobByIdIO conn tname jid = PGS.query conn findJobByIdQuery (tname, jid) >>= saveJobQuery :: PGS.Query -saveJobQuery = "UPDATE ? set run_at = ?, status = ?, payload = ?, last_error = ?, attempts = ?, locked_at = ?, locked_by = ? WHERE id = ? RETURNING " <> concatJobDbColumns +saveJobQuery = "UPDATE ? set run_at = ?, status = ?, payload = ?, last_error = ?, attempts = ?, locked_at = ?, locked_by = ?, result = ? WHERE id = ? RETURNING " <> concatJobDbColumns deleteJobQuery :: PGS.Query deleteJobQuery = "DELETE FROM ? WHERE id = ?" @@ -320,7 +320,7 @@ saveJob j = do withDbConnection $ \conn -> liftIO $ saveJobIO conn tname j saveJobIO :: Connection -> TableName -> Job -> IO Job -saveJobIO conn tname Job{jobRunAt, jobStatus, jobPayload, jobLastError, jobAttempts, jobLockedBy, jobLockedAt, jobId} = do +saveJobIO conn tname Job{jobRunAt, jobStatus, jobPayload, jobLastError, jobAttempts, jobLockedBy, jobLockedAt, jobResult, jobId} = do rs <- PGS.query conn saveJobQuery ( tname , jobRunAt @@ -330,6 +330,7 @@ saveJobIO conn tname Job{jobRunAt, jobStatus, jobPayload, jobLastError, jobAttem , jobAttempts , jobLockedAt , jobLockedBy + , jobResult , jobId ) case rs of @@ -389,29 +390,30 @@ instance Exception TimeoutException runJobWithTimeout :: (HasJobRunner m) => Seconds -> Job - -> m () + -> m (Maybe Aeson.Value) runJobWithTimeout timeoutSec job@Job{jobId} = do threadsRef <- envJobThreadsRef <$> getRunnerEnv jobRunner_ <- getJobRunner a <- async $ liftIO $ jobRunner_ job - _x <- atomicModifyIORef' threadsRef $ \threads -> - ( DM.insert jobId a threads - , DL.map asyncThreadId $ DM.elems $ DM.insert jobId a threads - ) + atomicModifyIORef' threadsRef $ \threads -> ( DM.insert jobId (void a) threads, () ) + -- ( DM.insert jobId a threads + -- , DL.map asyncThreadId $ DM.elems $ DM.insert jobId a threads + -- ) -- liftIO $ putStrLn $ "Threads: " <> show x log LevelDebug $ LogText $ toS $ "Spawned job in " <> show (asyncThreadId a) - t <- async $ do - delaySeconds timeoutSec - throwIO TimeoutException + t <- async $ delaySeconds timeoutSec - void $ finally - (waitEitherCancel a t) + + finally + (waitEitherCancel t a >>= either (const $ throwIO TimeoutException) pure) (atomicModifyIORef' threadsRef $ \threads -> (DM.delete jobId threads, ())) + + -- | runs a job, blocks for as long as it's in progress runJob :: (HasJobRunner m) => JobId -> m () @@ -424,13 +426,17 @@ runJob jid = do log LevelInfo $ LogJobStart job flip catch (exceptionHandler job startTime) $ do onJobStart job - runJobWithTimeout lockTimeout job + jresult <- runJobWithTimeout lockTimeout job endTime <- liftIO getCurrentTime shouldDeleteJob <- deleteSuccessfulJobs - let newJob = job{jobStatus=Success, jobLockedBy=Nothing, jobLockedAt=Nothing, jobUpdatedAt = endTime} + let newJob = job{jobStatus=Success, jobLockedBy=Nothing, jobLockedAt=Nothing, jobUpdatedAt = endTime, jobResult = jresult} if shouldDeleteJob then deleteJob jid else void $ saveJob newJob + -- case jobParentJobId job of + -- Nothing -> do + -- Just _ -> do + log LevelInfo $ LogJobSuccess newJob (diffUTCTime endTime startTime) onJobSuccess newJob pure () @@ -488,7 +494,6 @@ killJob jid = do -- TODO: This might have a resource leak. restartUponCrash :: (HasJobRunner m, Show a) => Text -> m a -> m () restartUponCrash name_ action = do - traceM "restartUponCrash top" a <- async action finally (waitCatch a >>= fn) $ do log LevelInfo $ LogText $ "Received shutdown: " <> toS name_ @@ -498,7 +503,6 @@ restartUponCrash name_ action = do case x of Left (e :: SomeException) -> log LevelError $ LogText $ name_ <> " seems to have exited with an error. Restarting: " <> toS (show e) Right r -> log LevelError $ LogText $ name_ <> " seems to have exited with the folloing result: " <> toS (show r) <> ". Restaring." - traceM "CRASH OCCURRED" restartUponCrash name_ action -- | Spawns 'jobPoller' and 'jobEventListener' in separate threads and restarts @@ -507,11 +511,9 @@ restartUponCrash name_ action = do -- executed to finish execution before exiting the main thread. jobMonitor :: forall m . (HasJobRunner m) => m () jobMonitor = do - traceM "jobMonitor top" a1 <- async $ restartUponCrash "Job poller" jobPoller a2 <- async $ restartUponCrash "Job event listener" jobEventListener a3 <- async $ restartUponCrash "Job Kill poller" killJobPoller - traceM "jobMonitor after async" finally (void $ waitAnyCatch [a1, a2, a3]) $ do log LevelInfo (LogText "Stopping jobPoller and jobEventListener threads.") cancel a3 diff --git a/src/OddJobs/Migrations.hs b/src/OddJobs/Migrations.hs index 4b125e0..1bfb31b 100644 --- a/src/OddJobs/Migrations.hs +++ b/src/OddJobs/Migrations.hs @@ -22,6 +22,8 @@ createJobTableQuery = "CREATE TABLE IF NOT EXISTS ?" <> ", attempts int not null default 0" <> ", locked_at timestamp with time zone null" <> ", locked_by text null" <> + ", result jsonb" <> + ", parent_job_id integer references ?(id)" <> ", constraint incorrect_locking_info CHECK (" <> "(locked_at is null and locked_by is null and status <> 'locked') or " <> "(locked_at is not null and locked_by is not null and (status = 'locked' or status = 'cancelled')))" <> @@ -31,7 +33,8 @@ createJobTableQuery = "CREATE TABLE IF NOT EXISTS ?" <> "create index if not exists ? on ?(locked_at);" <> "create index if not exists ? on ?(locked_by);" <> "create index if not exists ? on ?(status);" <> - "create index if not exists ? on ?(run_at);" + "create index if not exists ? on ?(run_at);" <> + "create index if not exists ? on ?(parent_job_id);" createNotificationTrigger :: Query createNotificationTrigger = "create or replace function ?() returns trigger as $$" <> @@ -49,6 +52,7 @@ createJobTable conn tname = void $ do let tnameTxt = getTnameTxt tname _ <- PGS.execute conn createJobTableQuery ( tname + , tname , PGS.Identifier $ "idx_" <> tnameTxt <> "_created_at" , tname , PGS.Identifier $ "idx_" <> tnameTxt <> "_updated_at" @@ -61,6 +65,8 @@ createJobTable conn tname = void $ do , tname , PGS.Identifier $ "idx_" <> tnameTxt <> "_run_at" , tname + , PGS.Identifier $ "idx_" <> tnameTxt <> "_parent_job_id" + , tname ) PGS.execute conn createNotificationTrigger ( fnName diff --git a/src/OddJobs/Types.hs b/src/OddJobs/Types.hs index 02e80d4..102a5da 100644 --- a/src/OddJobs/Types.hs +++ b/src/OddJobs/Types.hs @@ -227,6 +227,8 @@ data Job = Job , jobAttempts :: Int , jobLockedAt :: Maybe UTCTime , jobLockedBy :: Maybe JobRunnerName + , jobParentJobId :: Maybe JobId + , jobResult :: Maybe Aeson.Value } deriving (Eq, Show, Generic) instance ToText Status where @@ -269,6 +271,8 @@ instance FromRow Job where <*> field -- attempts <*> field -- lockedAt <*> field -- lockedBy + <*> field -- parentJobId + <*> field -- job result -- TODO: Add a sum-type for return status which can signal the monitor about -- whether the job needs to be retried, marked successfull, or whether it has @@ -293,6 +297,17 @@ data AllJobTypes -- | A custom 'IO' action for fetching the list of job-types. | AJTCustom (IO [Text]) + +-- | TODO: documentation +-- data JobResult = forall out . ToJSON out => JobResult (Maybe out) + +-- instance ToField JobResult where +-- toField (JobResult Nothing) = ToField.Plain "null" +-- toField (JobResult (Just x)) = toField (toJSON x) + +-- voidJob :: Functor f => f a -> f JobResult +-- voidJob x = JobResult (Nothing :: Maybe Int) <$ x + -- | While odd-jobs is highly configurable and the 'Config' data-type might seem -- daunting at first, it is not necessary to tweak every single configuration -- parameter by hand. @@ -309,7 +324,7 @@ data Config = Config -- throws a runtime exception, the job will be retried -- 'cfgDefaultMaxAttempts' times. Please look at the examples/tutorials if -- your applicaton's code is not in the @IO@ monad. - , cfgJobRunner :: Job -> IO () + , cfgJobRunner :: Job -> IO (Maybe Aeson.Value) -- | The number of times a failing job is retried before it is considered is -- "permanently failed" and ignored by the job-runner. This config parameter diff --git a/stack.yaml b/stack.yaml index 0e1c474..11820d8 100644 --- a/stack.yaml +++ b/stack.yaml @@ -1,6 +1,8 @@ -resolver: lts-20.26 +resolver: lts-18.28 packages: - . extra-deps: - timing-convenience-0.1@sha256:7ff807a9a9e5596f2b18d45c5a01aefb91d4a98f6a1008d183b5c550f68f7cb7,2092 - resource-pool-0.4.0.0@sha256:9c1e448a159875e21a7e68697feee2b61a4e584720974fa465a2fa1bc0776c73,1342 + - servant-lucid-0.9.0.6 + - servant-static-th-1.0.0.0 diff --git a/test/Test.hs b/test/Test.hs index de27912..6acd9fe 100644 --- a/test/Test.hs +++ b/test/Test.hs @@ -58,11 +58,11 @@ main = do defaultMain $ tests appPool jobPool where connInfo = ConnectInfo - { connectHost = "localhost" + { connectHost = "vlpostgres" , connectPort = fromIntegral (5432 :: Int) - , connectUser = "jobs_test" - , connectPassword = "jobs_test" - , connectDatabase = "jobs_test" + , connectUser = "oddjobs" + , connectPassword = "oddjobs" + , connectDatabase = "oddjobs" } createAppPool = Pool.newPool $ Pool.defaultPoolConfig @@ -133,11 +133,11 @@ type TestM = ReaderT Env IO testPayload :: Value testPayload = toJSON (10 :: Int) -jobRunner :: Job.Job -> IO () -jobRunner Job{jobPayload, jobAttempts} = case fromJSON jobPayload of +jobRunner :: Job.Job -> IO (Maybe Aeson.Value) +jobRunner Job{jobPayload, jobAttempts} = (Nothing <$) $ case fromJSON jobPayload of Aeson.Error e -> error e Success (j :: JobPayload) -> - let recur pload idx = case pload of + let recur pload idx = case pload of PayloadAlwaysFail delay -> delaySeconds delay >> error ("Forced error after " <> show delay <> " seconds") PayloadSucceed delay -> delaySeconds delay PayloadFail delay innerpload -> if idx IO (Maybe (Async ())) runSingleJobFromQueue config' = do r <- liftIO $ newIORef mempty waitTillJobStart <- newEmptyMVar - let config = config' { - Job.cfgPollingInterval = 0 - , Job.cfgJobRunner = \job -> do - putMVar waitTillJobStart () - jobRunner job - } + let config = config' + { Job.cfgPollingInterval = 0 + , Job.cfgJobRunner = \job -> do + putMVar waitTillJobStart () + jobRunner job + } let monitorEnv = Job.RunnerEnv { Job.envConfig = config , Job.envJobThreadsRef = r