Skip to content

Commit

Permalink
WIP: Get rid of channel as an explicit param for NotificationSubsyste…
Browse files Browse the repository at this point in the history
…m actions
  • Loading branch information
akshaymankar committed Oct 2, 2024
1 parent 41c6cf5 commit 842158f
Show file tree
Hide file tree
Showing 7 changed files with 19 additions and 29 deletions.
3 changes: 1 addition & 2 deletions libs/wire-subsystems/src/Wire/NotificationSubsystem.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import Data.Id
import Data.List.NonEmpty (NonEmpty ((:|)))
import Gundeck.Types hiding (Push (..), Recipient, newPush)
import Imports
import Network.AMQP
import Polysemy
import Wire.Arbitrary

Expand Down Expand Up @@ -50,7 +49,7 @@ data NotificationSubsystem m a where
CleanupUser :: UserId -> NotificationSubsystem m ()
UnregisterPushClient :: UserId -> ClientId -> NotificationSubsystem m ()
GetPushTokens :: UserId -> NotificationSubsystem m [PushToken]
SetUpUserNotificationQueues :: Channel -> UserId -> ClientId -> NotificationSubsystem m ()
SetupConsumableNotifications :: UserId -> ClientId -> NotificationSubsystem m ()

makeSem ''NotificationSubsystem

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ runNotificationSubsystemGundeck ::
Member Delay r,
Member (Final IO) r,
Member P.TinyLog r,
Member (Embed IO) r
Member (Embed IO) r,
Member (Input Channel) r
) =>
NotificationSubsystemConfig ->
Sem (NotificationSubsystem : r) a ->
Expand All @@ -53,7 +54,9 @@ runNotificationSubsystemGundeck cfg = interpret $ \case
CleanupUser uid -> GundeckAPIAccess.userDeleted uid
UnregisterPushClient uid cid -> GundeckAPIAccess.unregisterPushClient uid cid
GetPushTokens uid -> GundeckAPIAccess.getPushTokens uid
SetUpUserNotificationQueues chan uid cid -> void $ liftIO $ setUpUserNotificationQueuesImpl chan uid cid
SetupConsumableNotifications uid cid -> do
chan <- input
void $ liftIO $ setupConsumableNotificationsImpl chan uid cid

data NotificationSubsystemConfig = NotificationSubsystemConfig
{ fanoutLimit :: Range 1 HardTruncationLimit Int32,
Expand Down Expand Up @@ -177,12 +180,12 @@ pushSlowlyImpl ps =
delay =<< inputs (diffTimeToFullMicroseconds . slowPushDelay)
pushImpl [p]

setUpUserNotificationQueuesImpl ::
setupConsumableNotificationsImpl ::
Channel ->
UserId ->
ClientId ->
IO Text
setUpUserNotificationQueuesImpl chan uid cid = do
setupConsumableNotificationsImpl chan uid cid = do
let qName = "user-notifications." <> idToText uid <> "." <> clientToText cid
-- TODO: Do this using policies: https://www.rabbitmq.com/docs/parameters#policies
let headers =
Expand Down
18 changes: 5 additions & 13 deletions services/brig/src/Brig/API/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,9 @@ import Data.Set qualified as Set
import Data.Text.Encoding qualified as T
import Data.Text.Encoding.Error
import Imports hiding ((\\))
import Network.AMQP (Channel)
import Network.HTTP.Types.Method (StdMethod)
import Network.Wai.Utilities
import Polysemy
import Polysemy.Input (Input, input)
import Servant (Link, ToHttpApiData (toUrlPiece))
import System.Logger.Class (field, msg, val, (~~))
import System.Logger.Class qualified as Log
Expand Down Expand Up @@ -171,8 +169,7 @@ addClient ::
Member DeleteQueue r,
Member EmailSubsystem r,
Member VerificationCodeSubsystem r,
Member Events r,
Member (Input Channel) r
Member Events r
) =>
Local UserId ->
Maybe ConnId ->
Expand All @@ -190,8 +187,7 @@ addClientWithReAuthPolicy ::
Member EmailSubsystem r,
Member Events r,
Member UserSubsystem r,
Member VerificationCodeSubsystem r,
Member (Input Channel) r
Member VerificationCodeSubsystem r
) =>
Data.ReAuthPolicy ->
Local UserId ->
Expand All @@ -217,8 +213,7 @@ addClientWithReAuthPolicy policy luid@(tUnqualified -> u) con new = do
!>> ClientDataError
let clt = clt0 {clientMLSPublicKeys = newClientMLSPublicKeys new}
when (ClientSupportsConsumableNotifications `Set.member` (foldMap fromClientCapabilityList mCaps)) $ lift $ liftSem $ do
chanMVar <- input
setUpUserNotificationQueues chanMVar u clt.clientId
setupConsumableNotifications u clt.clientId
lift $ do
for_ old $ execDelete u con
liftSem $ GalleyAPIAccess.newClient u clt.clientId
Expand All @@ -245,9 +240,7 @@ addClientWithReAuthPolicy policy luid@(tUnqualified -> u) con new = do
VerificationCodeNoEmail -> throwE ClientCodeAuthenticationFailed

updateClient ::
( Member NotificationSubsystem r,
Member (Input Channel) r
) =>
(Member NotificationSubsystem r) =>
UserId ->
ClientId ->
UpdateClient ->
Expand All @@ -261,8 +254,7 @@ updateClient uid cid req = do
-- first set up the notification queues then save the data is more robust than the other way around
let addedCapabilities = caps.fromClientCapabilityList \\ client.clientCapabilities.fromClientCapabilityList
when (ClientSupportsConsumableNotifications `Set.member` addedCapabilities) $ lift $ liftSem $ do
chanMVar <- input
setUpUserNotificationQueues chanMVar uid cid
setupConsumableNotifications uid cid
wrapClientE $ lift . Data.updateClientCapabilities uid cid . Just $ caps
else throwE $ clientError ClientCapabilitiesCannotBeRemoved
let lk = maybeToList (unpackLastPrekey <$> req.updateClientLastKey)
Expand Down
10 changes: 3 additions & 7 deletions services/brig/src/Brig/API/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ import Data.Set qualified as Set
import Data.Text qualified as T
import Data.Time.Clock.System
import Imports hiding (head)
import Network.AMQP (Channel)
import Network.Wai.Utilities as Utilities
import Polysemy
import Polysemy.Input (Input, input)
Expand Down Expand Up @@ -142,8 +141,7 @@ servantSitemap ::
Member Events r,
Member PasswordResetCodeStore r,
Member PropertySubsystem r,
Member (Input TeamTemplates) r,
Member (Input Channel) r
Member (Input TeamTemplates) r
) =>
ServerT BrigIRoutes.API (Handler r)
servantSitemap =
Expand Down Expand Up @@ -195,8 +193,7 @@ accountAPI ::
Member PropertySubsystem r,
Member Events r,
Member PasswordResetCodeStore r,
Member InvitationCodeStore r,
Member (Input Channel) r
Member InvitationCodeStore r
) =>
ServerT BrigIRoutes.AccountAPI (Handler r)
accountAPI =
Expand Down Expand Up @@ -424,8 +421,7 @@ addClientInternalH ::
Member EmailSubsystem r,
Member Events r,
Member UserSubsystem r,
Member VerificationCodeSubsystem r,
Member (Input Channel) r
Member VerificationCodeSubsystem r
) =>
UserId ->
Maybe Bool ->
Expand Down
3 changes: 1 addition & 2 deletions services/brig/src/Brig/API/Public.hs
Original file line number Diff line number Diff line change
Expand Up @@ -586,8 +586,7 @@ addClient ::
Member EmailSubsystem r,
Member VerificationCodeSubsystem r,
Member Events r,
Member UserSubsystem r,
Member (Input Channel) r
Member UserSubsystem r
) =>
Local UserId ->
ConnId ->
Expand Down
1 change: 1 addition & 0 deletions services/brig/src/Brig/CanonicalInterpreter.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import Data.Qualified (Local, toLocalUnsafe)
import Data.Time.Clock (UTCTime, getCurrentTime)
import Imports
import Network.AMQP
import Network.AMQP qualified as Q
import Polysemy
import Polysemy.Async
import Polysemy.Conc
Expand Down
2 changes: 1 addition & 1 deletion services/cannon/src/Cannon/RabbitMqConsumerApp.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ rabbitMQWebSocketApp uid cid e pendingConn = do
withConnection e.logg e.rabbitmq $ \conn -> do
chan <- liftIO $ Amqp.openChannel conn
-- TODO: Don't use the interpreter
qName <- setUpUserNotificationQueuesImpl chan uid cid
qName <- setupConsumableNotificationsImpl chan uid cid
let cleanup :: (Exception e, MonadThrow m, MonadIO m) => e -> m ()
cleanup err = do
Log.err e.logg $ Log.msg (Log.val "Pushing to WS failed") . Log.field "error" (displayException err)
Expand Down

0 comments on commit 842158f

Please sign in to comment.