diff --git a/test/Test.hs b/test/Test.hs index 656b949..17799f9 100644 --- a/test/Test.hs +++ b/test/Test.hs @@ -48,6 +48,7 @@ import qualified OddJobs.ConfigBuilder as Job import UnliftIO import Control.Exception (ArithException) import Data.Bifunctor(first) +import System.Environment (lookupEnv) $(Aeson.deriveJSON Aeson.defaultOptions ''Seconds) @@ -57,30 +58,36 @@ main = do bracket createJobPool destroyAllResources $ \jobPool -> do defaultMain $ tests appPool jobPool where - connInfo = ConnectInfo - { connectHost = "localhost" - , connectPort = fromIntegral (5432 :: Int) - , connectUser = "jobs_test" - , connectPassword = "jobs_test" - , connectDatabase = "jobs_test" - } - - createAppPool = Pool.newPool $ Pool.defaultPoolConfig - (PGS.connect connInfo) -- create a new resource - PGS.close -- destroy resource - (fromRational 10) -- number of seconds unused resources are kept around - 45 - createJobPool = Pool.newPool $ Pool.defaultPoolConfig - (PGS.connect connInfo) -- create a new resource - PGS.close -- destroy resource - (fromRational 10) -- number of seconds unused resources are kept around - 45 + getConnInfo = do + connectHost <- (fromMaybe "localhost") <$> lookupEnv "PGHOST" + connectUser <- (fromMaybe "jobs_test") <$> lookupEnv "PGUSER" + connectPassword <- (fromMaybe "jobs_test") <$> lookupEnv "PGPASSWORD" + connectDatabase <- (fromMaybe "jobs_test") <$> lookupEnv "PGDATABASE" + connectPort <- (maybe (fromIntegral (5432 :: Int)) read) <$> lookupEnv "PGPORT" + pure ConnectInfo{..} + + createAppPool = do + connInfo <- getConnInfo + Pool.newPool $ Pool.defaultPoolConfig + (PGS.connect connInfo) -- create a new resource + PGS.close -- destroy resource + (fromRational 10) -- number of seconds unused resources are kept around + 45 + + createJobPool = do + connInfo <- getConnInfo + Pool.newPool $ Pool.defaultPoolConfig + (PGS.connect connInfo) -- create a new resource + PGS.close -- destroy resource + (fromRational 10) -- number of seconds unused resources are kept around + 45 tests appPool jobPool = testGroup "All tests" [ testGroup "simple tests" [ testJobCreation appPool jobPool , testJobScheduling appPool jobPool , testJobFailure appPool jobPool + , testJobResults appPool jobPool , testEnsureShutdown appPool jobPool , testGracefulShutdown appPool jobPool , testPushFailedJobEndQueue jobPool @@ -134,20 +141,20 @@ testPayload :: Value testPayload = toJSON (10 :: Int) jobRunner :: Job.Job -> IO (Maybe Aeson.Value) -jobRunner Job{jobPayload, jobAttempts} = (Nothing <$) $ case fromJSON jobPayload of +jobRunner Job{jobPayload, jobAttempts} = case fromJSON jobPayload of Aeson.Error e -> error e Success (j :: JobPayload) -> let recur pload idx = case pload of PayloadAlwaysFail delay -> delaySeconds delay >> error ("Forced error after " <> show delay <> " seconds") - PayloadSucceed delay -> delaySeconds delay + PayloadSucceed delay jresult -> delaySeconds delay >> pure (fmap Aeson.toJSON jresult) PayloadFail delay innerpload -> if idx> error ("Forced error after " <> show delay <> " seconds. step=" <> show idx) PayloadThrowStringException s -> throwString s - PayloadThrowDivideByZero -> seq (1 `div` 0 :: Integer) (pure ()) + PayloadThrowDivideByZero -> seq (1 `div` 0 :: Integer) (pure Nothing) in recur j 0 -data JobPayload = PayloadSucceed Seconds +data JobPayload = PayloadSucceed Seconds (Maybe String) | PayloadFail Seconds JobPayload | PayloadAlwaysFail Seconds | PayloadThrowStringException String @@ -321,13 +328,15 @@ payloadGen :: MonadGen m => m JobPayload payloadGen = Gen.recursive Gen.choice nonRecursive recursive where nonRecursive = [ PayloadAlwaysFail <$> Gen.element [1, 2, 3] - , PayloadSucceed <$> Gen.element [1, 2, 3]] + , PayloadSucceed <$> Gen.element [1, 2, 3] <*> (Gen.maybe $ Gen.string (Range.singleton 10) Gen.alphaNum) + + ] recursive = [ PayloadFail <$> Gen.element [1, 2, 3] <*> payloadGen ] testJobCreation appPool jobPool = testCase "job creation" $ do withNewJobMonitor jobPool $ \tname logRef -> do Pool.withResource appPool $ \conn -> do - Job{jobId} <- Job.createJob conn tname (PayloadSucceed 0) + Job{jobId} <- Job.createJob conn tname (PayloadSucceed 0 Nothing) delaySeconds $ Seconds 6 assertJobIdStatus conn tname logRef "Expecting job to be successful by now" Job.Success jobId @@ -341,7 +350,7 @@ testEnsureShutdown appPool jobPool = testCase "ensure shutdown" $ do scheduleJob tname = withNamedJobMonitor tname jobPool Prelude.id $ \logRef -> do t <- getCurrentTime Pool.withResource appPool $ \conn -> do - Job{jobId} <- Job.scheduleJob conn tname (PayloadSucceed 0) (addUTCTime (fromIntegral (2 * unSeconds Job.defaultPollingInterval)) t) + Job{jobId} <- Job.scheduleJob conn tname (PayloadSucceed 0 Nothing) (addUTCTime (fromIntegral (2 * unSeconds Job.defaultPollingInterval)) t) assertJobIdStatus conn tname logRef "Job is scheduled in future, should still be queueud" Job.Queued jobId pure (jobId, logRef) @@ -351,9 +360,9 @@ testGracefulShutdown appPool jobPool = withRandomTable jobPool $ \tname -> do waitTillJobStart <- newEmptyMVar Pool.withResource appPool $ \conn -> do - j1 <- Job.createJob conn tname (PayloadSucceed $ 4 * Job.defaultPollingInterval) + j1 <- Job.createJob conn tname (PayloadSucceed (4 * Job.defaultPollingInterval) Nothing) time<- getCurrentTime - j2 <- Job.scheduleJob conn tname (PayloadSucceed 0) (addUTCTime (fromIntegral $ unSeconds $ Job.defaultPollingInterval * 2) time) + j2 <- Job.scheduleJob conn tname (PayloadSucceed 0 Nothing) (addUTCTime (fromIntegral $ unSeconds $ Job.defaultPollingInterval * 2) time) logRef <- withNamedJobMonitor tname jobPool (\cfg -> cfg { Job.cfgOnJobStart = \ _ -> do putMVar waitTillJobStart () } ) $ \logRef -> do @@ -366,13 +375,24 @@ testJobScheduling appPool jobPool = testCase "job scheduling" $ do withNewJobMonitor jobPool $ \tname logRef -> do Pool.withResource appPool $ \conn -> do t <- getCurrentTime - job@Job{jobId} <- Job.scheduleJob conn tname (PayloadSucceed 0) (addUTCTime (fromIntegral (3600 :: Integer)) t) + job@Job{jobId} <- Job.scheduleJob conn tname (PayloadSucceed 0 Nothing) (addUTCTime (fromIntegral (3600 :: Integer)) t) delaySeconds $ Seconds 2 assertJobIdStatus conn tname logRef "Job is scheduled in the future. It should NOT have been successful by now" Job.Queued jobId _ <- Job.saveJobIO conn tname job{jobRunAt = addUTCTime (fromIntegral (-1 :: Integer)) t} delaySeconds (Job.defaultPollingInterval + Seconds 2) assertJobIdStatus conn tname logRef "Job had a runAt date in the past. It should have been successful by now" Job.Success jobId +testJobResults appPool jobPool = testCase "job results" $ do + withRandomTable jobPool $ \tname -> do + withNamedJobMonitor tname jobPool (\cfg -> cfg{Job.cfgDeleteSuccessfulJobs=False}) $ \logRef -> do + Pool.withResource appPool $ \conn -> do + Job{jobId} <- Job.createJob conn tname (PayloadSucceed 0 (Just "abcdef")) + delaySeconds Job.defaultPollingInterval + delaySeconds $ calculateRetryDelay testMaxAttempts + Seconds 3 + Job{jobAttempts, jobStatus, jobResult} <- ensureJobId conn tname jobId + assertEqual "Exepcting job to be in Success status" Job.Success jobStatus + assertEqual "Expecting job to have the correct result" (Just (Aeson.String "abcdef")) jobResult + testJobFailure appPool jobPool = testCase "job failure" $ do withNewJobMonitor jobPool $ \tname _logRef -> do Pool.withResource appPool $ \conn -> do @@ -418,7 +438,7 @@ testOnJobStart appPool jobPool = testCase "onJobStart" $ do let callback = const $ modifyIORef' callbackRef (const True) withNamedJobMonitor tname jobPool (\cfg -> cfg{Job.cfgOnJobStart=callback}) $ \_logRef -> do Pool.withResource appPool $ \conn -> do - _ <- Job.createJob conn tname (PayloadSucceed 0) + _ <- Job.createJob conn tname (PayloadSucceed 0 Nothing) delaySeconds (2 * Job.defaultPollingInterval) readIORef callbackRef >>= assertBool "It seems that onJobStart callback has not been called" @@ -428,7 +448,7 @@ testOnJobSuccess appPool jobPool = testCase "onJobSuccess" $ do let callback = const $ modifyIORef' callbackRef (const True) withNamedJobMonitor tname jobPool (\cfg -> cfg{Job.cfgOnJobSuccess=callback}) $ \_logRef -> do Pool.withResource appPool $ \conn -> do - _ <- Job.createJob conn tname (PayloadSucceed 0) + _ <- Job.createJob conn tname (PayloadSucceed 0 Nothing) delaySeconds (2 * Job.defaultPollingInterval) readIORef callbackRef >>= assertBool "It seems that onJobSuccess callback has not been called" @@ -438,7 +458,7 @@ testOnJobTimeout appPool jobPool = testCase "onJobTimeout" $ do let callback = const $ modifyIORef' callbackRef (const True) withNamedJobMonitor tname jobPool (\cfg -> cfg{Job.cfgOnJobTimeout=callback, Job.cfgDefaultJobTimeout=Seconds 2}) $ \_logRef -> do Pool.withResource appPool $ \conn -> do - _ <- Job.createJob conn tname (PayloadSucceed 10) + _ <- Job.createJob conn tname (PayloadSucceed 10 Nothing) delaySeconds (2 * Job.defaultPollingInterval) readIORef callbackRef >>= assertBool "It seems that onJobTimeout callback has not been called" @@ -476,9 +496,9 @@ testResourceLimitedScheduling _appPool jobPool = testGroup "concurrency control let firstResource = [(Job.ResourceId "first", 1)] secondResource = [(Job.ResourceId "second", 1)] - Job{jobId = firstJob} <- Job.createJobWithResources conn tname resCfg (PayloadSucceed 10) firstResource - Job{jobId = secondJob} <- Job.createJobWithResources conn tname resCfg (PayloadSucceed 10) firstResource - Job{jobId = thirdJob} <- Job.createJobWithResources conn tname resCfg (PayloadSucceed 10) secondResource + Job{jobId = firstJob} <- Job.createJobWithResources conn tname resCfg (PayloadSucceed 10 Nothing) firstResource + Job{jobId = secondJob} <- Job.createJobWithResources conn tname resCfg (PayloadSucceed 10 Nothing) firstResource + Job{jobId = thirdJob} <- Job.createJobWithResources conn tname resCfg (PayloadSucceed 10 Nothing) secondResource _jone <- runSingleJobFromQueue cfg _jtwo <- runSingleJobFromQueue cfg @@ -489,8 +509,8 @@ testResourceLimitedScheduling _appPool jobPool = testGroup "concurrency control assertJobIdStatus conn tname logRef "Job uses different resources - it should be running" Job.Locked thirdJob , testCase "resource-less jobs run normally" $ setup jobPool $ \tname resCfg logRef conn cfg -> do - Job{jobId = firstJob} <- Job.createJobWithResources conn tname resCfg (PayloadSucceed 10) [] - Job{jobId = secondJob} <- Job.createJobWithResources conn tname resCfg (PayloadSucceed 10) [] + Job{jobId = firstJob} <- Job.createJobWithResources conn tname resCfg (PayloadSucceed 10 Nothing) [] + Job{jobId = secondJob} <- Job.createJobWithResources conn tname resCfg (PayloadSucceed 10 Nothing) [] _jone <- runSingleJobFromQueue cfg _jtwo <- runSingleJobFromQueue cfg @@ -502,9 +522,9 @@ testResourceLimitedScheduling _appPool jobPool = testGroup "concurrency control let firstResource = [(Job.ResourceId "first", 1)] secondResource = [(Job.ResourceId "second", 1)] - Job{jobId = firstJob} <- Job.createJobWithResources conn tname resCfg (PayloadSucceed 10) (firstResource <> secondResource) - Job{jobId = secondJob} <- Job.createJobWithResources conn tname resCfg (PayloadSucceed 10) firstResource - Job{jobId = thirdJob} <- Job.createJobWithResources conn tname resCfg (PayloadSucceed 10) secondResource + Job{jobId = firstJob} <- Job.createJobWithResources conn tname resCfg (PayloadSucceed 10 Nothing) (firstResource <> secondResource) + Job{jobId = secondJob} <- Job.createJobWithResources conn tname resCfg (PayloadSucceed 10 Nothing) firstResource + Job{jobId = thirdJob} <- Job.createJobWithResources conn tname resCfg (PayloadSucceed 10 Nothing) secondResource jone <- runSingleJobFromQueue cfg jtwo <- runSingleJobFromQueue cfg @@ -520,8 +540,8 @@ testResourceLimitedScheduling _appPool jobPool = testGroup "concurrency control , testCase "resources are freed when jobs succeed" $ setup jobPool $ \tname resCfg logRef conn cfg -> do let firstResource = [(Job.ResourceId "first", 1)] - Job{jobId = firstJob} <- Job.createJobWithResources conn tname resCfg (PayloadSucceed 10) firstResource - Job{jobId = secondJob} <- Job.createJobWithResources conn tname resCfg (PayloadSucceed 10) firstResource + Job{jobId = firstJob} <- Job.createJobWithResources conn tname resCfg (PayloadSucceed 10 Nothing) firstResource + Job{jobId = secondJob} <- Job.createJobWithResources conn tname resCfg (PayloadSucceed 10 Nothing) firstResource jone <- runSingleJobFromQueue cfg _jtwo <- runSingleJobFromQueue cfg @@ -540,7 +560,7 @@ testResourceLimitedScheduling _appPool jobPool = testGroup "concurrency control let firstResource = [(Job.ResourceId "first", 1)] Job{jobId = firstJob} <- Job.createJobWithResources conn tname resCfg (PayloadAlwaysFail 10) firstResource - Job{jobId = secondJob} <- Job.createJobWithResources conn tname resCfg (PayloadSucceed 10) firstResource + Job{jobId = secondJob} <- Job.createJobWithResources conn tname resCfg (PayloadSucceed 10 Nothing) firstResource jone <- runSingleJobFromQueue cfg _jtwo <- runSingleJobFromQueue cfg @@ -560,8 +580,8 @@ testResourceLimitedScheduling _appPool jobPool = testGroup "concurrency control let firstResource = [(Job.ResourceId "first", 1)] resCfg' = resCfg { Job.resCfgDefaultLimit = 2 } - Job{jobId = firstJob} <- Job.createJobWithResources conn tname resCfg' (PayloadSucceed 10) firstResource - Job{jobId = secondJob} <- Job.createJobWithResources conn tname resCfg' (PayloadSucceed 10) firstResource + Job{jobId = firstJob} <- Job.createJobWithResources conn tname resCfg' (PayloadSucceed 10 Nothing) firstResource + Job{jobId = secondJob} <- Job.createJobWithResources conn tname resCfg' (PayloadSucceed 10 Nothing) firstResource _jone <- runSingleJobFromQueue cfg _jtwo <- runSingleJobFromQueue cfg @@ -573,9 +593,9 @@ testResourceLimitedScheduling _appPool jobPool = testGroup "concurrency control let firstResource n = [(Job.ResourceId "first", n)] resCfg' = resCfg { Job.resCfgDefaultLimit = 5 } - Job{jobId = firstJob} <- Job.createJobWithResources conn tname resCfg' (PayloadSucceed 10) (firstResource 4) - Job{jobId = secondJob} <- Job.createJobWithResources conn tname resCfg' (PayloadSucceed 10) (firstResource 3) - Job{jobId = thirdJob} <- Job.createJobWithResources conn tname resCfg' (PayloadSucceed 10) (firstResource 2) + Job{jobId = firstJob} <- Job.createJobWithResources conn tname resCfg' (PayloadSucceed 10 Nothing) (firstResource 4) + Job{jobId = secondJob} <- Job.createJobWithResources conn tname resCfg' (PayloadSucceed 10 Nothing) (firstResource 3) + Job{jobId = thirdJob} <- Job.createJobWithResources conn tname resCfg' (PayloadSucceed 10 Nothing) (firstResource 2) jone <- runSingleJobFromQueue cfg _jtwo <- runSingleJobFromQueue cfg @@ -598,7 +618,7 @@ testResourceLimitedScheduling _appPool jobPool = testGroup "concurrency control , testCase "resource limits permanently block too-big jobs" $ setup jobPool $ \tname resCfg logRef conn cfg -> do let firstResource = [(Job.ResourceId "first", 2)] - Job{jobId = firstJob} <- Job.createJobWithResources conn tname resCfg (PayloadSucceed 10) firstResource + Job{jobId = firstJob} <- Job.createJobWithResources conn tname resCfg (PayloadSucceed 10 Nothing) firstResource void $ runSingleJobFromQueueBlock cfg @@ -607,7 +627,7 @@ testResourceLimitedScheduling _appPool jobPool = testGroup "concurrency control , testCase "resources with 0 limit block jobs" $ setup' jobPool 0 $ \tname resCfg logRef conn cfg -> do let firstResource = [(Job.ResourceId "first", 1)] - Job{jobId = firstJob} <- Job.createJobWithResources conn tname resCfg (PayloadSucceed 10) firstResource + Job{jobId = firstJob} <- Job.createJobWithResources conn tname resCfg (PayloadSucceed 10 Nothing) firstResource void $ runSingleJobFromQueueBlock cfg @@ -616,7 +636,7 @@ testResourceLimitedScheduling _appPool jobPool = testGroup "concurrency control , testCase "resources with negative limit block jobs" $ setup' jobPool (-5) $ \tname resCfg logRef conn cfg -> do let firstResource = [(Job.ResourceId "first", 1)] - Job{jobId = firstJob} <- Job.createJobWithResources conn tname resCfg (PayloadSucceed 10) firstResource + Job{jobId = firstJob} <- Job.createJobWithResources conn tname resCfg (PayloadSucceed 10 Nothing) firstResource void $ runSingleJobFromQueueBlock cfg @@ -641,7 +661,7 @@ setup' jobPool defaultLimit action = testKillJob appPool jobPool = testCase "killing a ongoing job" $ do withRandomTable jobPool $ \tname -> withNamedJobMonitor tname jobPool Prelude.id $ \logRef -> do Pool.withResource appPool $ \conn -> do - Job{jobId = jid} <- Job.createJob conn tname (PayloadSucceed 60) + Job{jobId = jid} <- Job.createJob conn tname (PayloadSucceed 60 Nothing) delaySeconds $ Job.defaultPollingInterval + Seconds 2 assertJobIdStatus conn tname logRef "Job should be running" Job.Locked jid @@ -758,7 +778,7 @@ payloadDelay jobPollingInterval = payloadDelay_ (Seconds 0) let defaultDelay x = total + x + jobPollingInterval in case p of PayloadAlwaysFail x -> defaultDelay x - PayloadSucceed x -> defaultDelay x + PayloadSucceed x _ -> defaultDelay x PayloadFail x ip -> payloadDelay_ (defaultDelay x) ip PayloadThrowStringException _ -> defaultDelay 0 PayloadThrowDivideByZero -> defaultDelay 0