Skip to content

Commit

Permalink
cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
saurabhnanda committed Nov 19, 2023
1 parent cbce4e6 commit 5811ee1
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 57 deletions.
43 changes: 21 additions & 22 deletions src/OddJobs/Job/Query.hs
Original file line number Diff line number Diff line change
Expand Up @@ -95,28 +95,27 @@ concatJobDbColumnsInternal ys = go ys ""
go [col] x = x <> col
go (col:cols) x = go cols (x <> col <> ", ")

-- | TODO
{-# INLINE concatJobDbColumnsWorkflow #-}
concatJobDbColumnsWorkflow :: (IsString s, Semigroup s) => s
concatJobDbColumnsWorkflow = concatJobDbColumnsInternal jobDbColumnsWorkflow
where

-- | TODO
jobDbColumnsWorkflow :: (IsString s, Semigroup s) => [s]
jobDbColumnsWorkflow =
[ "id"
, "created_at"
, "updated_at"
, "run_at"
, "status"
, "payload"
, "last_error"
, "attempts"
, "locked_at"
, "locked_by"
, "result"
, "parent_job_id"
]
-- -- | TODO
-- {-# INLINE concatJobDbColumnsWorkflow #-}
-- concatJobDbColumnsWorkflow :: (IsString s, Semigroup s) => s
-- concatJobDbColumnsWorkflow = concatJobDbColumnsInternal jobDbColumnsWorkflow

-- -- | TODO
-- jobDbColumnsWorkflow :: (IsString s, Semigroup s) => [s]
-- jobDbColumnsWorkflow =
-- [ "id"
-- , "created_at"
-- , "updated_at"
-- , "run_at"
-- , "status"
-- , "payload"
-- , "last_error"
-- , "attempts"
-- , "locked_at"
-- , "locked_by"
-- , "result"
-- , "parent_job_id"
-- ]

saveJobQuery :: Query
saveJobQuery =
Expand Down
64 changes: 30 additions & 34 deletions src/OddJobs/Migrations.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,18 @@ import Database.PostgreSQL.Simple.ToRow as PGS
import Data.Functor (void)
import OddJobs.Types

createJobTableQuery :: Query
createJobTableQuery = createJobTableQueryInternal False
-- createJobTableQuery :: Query
-- createJobTableQuery = createJobTableQueryInternal False

createJobTable :: Connection -> TableName -> IO ()
createJobTable = createJobTableInternal False
-- createJobTable :: Connection -> TableName -> IO ()
-- createJobTable = createJobTableInternal False

createJobTableWithWorkflow :: Connection -> TableName -> IO ()
createJobTableWithWorkflow = createJobTableInternal True
-- createJobTableWithWorkflow :: Connection -> TableName -> IO ()
-- createJobTableWithWorkflow = createJobTableInternal True

createJobTableQueryInternal ::
Bool ->
-- ^ whether to enable job-results and job-workflow features
createJobTableQuery ::
Query
createJobTableQueryInternal enableWorkflows = "CREATE TABLE IF NOT EXISTS ?" <>
createJobTableQuery = "CREATE TABLE IF NOT EXISTS ?" <>
"( id serial primary key" <>
", created_at timestamp with time zone default now() not null" <>
", updated_at timestamp with time zone default now() not null" <>
Expand All @@ -35,7 +33,7 @@ createJobTableQueryInternal enableWorkflows = "CREATE TABLE IF NOT EXISTS ?" <>
", attempts int not null default 0" <>
", locked_at timestamp with time zone null" <>
", locked_by text null" <>
if enableWorkflows then ", result jsonb, parent_job_id int references ?(id)" else "" <>
", result jsonb, parent_job_id int references ?(id)" <>
", constraint incorrect_locking_info CHECK (" <>
"(locked_at is null and locked_by is null and status <> 'locked') or " <>
"(locked_at is not null and locked_by is not null and (status = 'locked' or status = 'cancelled')))" <>
Expand All @@ -46,7 +44,7 @@ createJobTableQueryInternal enableWorkflows = "CREATE TABLE IF NOT EXISTS ?" <>
"create index if not exists ? on ?(locked_by);" <>
"create index if not exists ? on ?(status);" <>
"create index if not exists ? on ?(run_at);" <>
if enableWorkflows then "create index if not exists ? on ?(parent_job_id);" else ""
"create index if not exists ? on ?(parent_job_id);"

createNotificationTrigger :: Query
createNotificationTrigger = "create or replace function ?() returns trigger as $$" <>
Expand All @@ -59,32 +57,30 @@ createNotificationTrigger = "create or replace function ?() returns trigger as $
"drop trigger if exists ? on ?;" <>
"create trigger ? after insert on ? for each row execute procedure ?();"

createJobTableInternal ::
Bool ->
-- ^ whether to enable job-results and job-workflow features
createJobTable ::
Connection ->
TableName ->
IO ()
createJobTableInternal enableWorkflows conn tname = void $ do
createJobTable conn tname = void $ do
let tnameTxt = getTnameTxt tname
let a1 = ( PGS.Identifier $ "idx_" <> tnameTxt <> "_created_at"
, tname
, PGS.Identifier $ "idx_" <> tnameTxt <> "_updated_at"
, tname
, PGS.Identifier $ "idx_" <> tnameTxt <> "_locked_at"
, tname
, PGS.Identifier $ "idx_" <> tnameTxt <> "_locked_by"
, tname
, PGS.Identifier $ "idx_" <> tnameTxt <> "_status"
, tname
, PGS.Identifier $ "idx_" <> tnameTxt <> "_run_at"
, tname
)
finalFields = if enableWorkflows
then PGS.toRow $ (tname, tname) PGS.:. a1 PGS.:. (tname, PGS.Identifier $ "idx_" <> tnameTxt <> "_parent_job_id")
else PGS.toRow $ (Only tname) PGS.:. a1

_ <- PGS.execute conn (createJobTableQueryInternal enableWorkflows) finalFields
args = ( tname
, tname
, PGS.Identifier $ "idx_" <> tnameTxt <> "_created_at"
, tname
, PGS.Identifier $ "idx_" <> tnameTxt <> "_updated_at"
, tname
, PGS.Identifier $ "idx_" <> tnameTxt <> "_locked_at"
, tname
, PGS.Identifier $ "idx_" <> tnameTxt <> "_locked_by"
, tname
, PGS.Identifier $ "idx_" <> tnameTxt <> "_status"
, tname
, PGS.Identifier $ "idx_" <> tnameTxt <> "_run_at"
, tname
, tname
, PGS.Identifier $ "idx_" <> tnameTxt <> "_parent_job_id"
)
_ <- PGS.execute conn createJobTableQuery args
PGS.execute conn createNotificationTrigger
( fnName
, pgEventName tname
Expand Down
2 changes: 1 addition & 1 deletion test/Test.hs
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ testJobDeletion appPool jobPool = testCase "job immediae deletion" $ do

testJobResults appPool jobPool = testCase "job results" $ do
withRandomTable jobPool $ \tname -> do
withNamedJobMonitor tname jobPool (\cfg -> cfg{Job.cfgImmediateJobDeletion=(const $ pure False)}) $ \logRef -> do
withNamedJobMonitor tname jobPool (\cfg -> cfg{Job.cfgImmediateJobDeletion=const $ pure False}) $ \logRef -> do
Pool.withResource appPool $ \conn -> do
Job{jobId} <- Job.createJob conn tname (PayloadSucceed 0 (Just "abcdef"))
delaySeconds Job.defaultPollingInterval
Expand Down

0 comments on commit 5811ee1

Please sign in to comment.