From 842158f799d65f4bd2381a010068cf95c3bf0d24 Mon Sep 17 00:00:00 2001 From: Akshay Mankar Date: Tue, 1 Oct 2024 16:42:42 +0200 Subject: [PATCH] WIP: Get rid of channel as an explicit param for NotificationSubsystem actions --- .../src/Wire/NotificationSubsystem.hs | 3 +-- .../Wire/NotificationSubsystem/Interpreter.hs | 11 +++++++---- services/brig/src/Brig/API/Client.hs | 18 +++++------------- services/brig/src/Brig/API/Internal.hs | 10 +++------- services/brig/src/Brig/API/Public.hs | 3 +-- services/brig/src/Brig/CanonicalInterpreter.hs | 1 + .../cannon/src/Cannon/RabbitMqConsumerApp.hs | 2 +- 7 files changed, 19 insertions(+), 29 deletions(-) diff --git a/libs/wire-subsystems/src/Wire/NotificationSubsystem.hs b/libs/wire-subsystems/src/Wire/NotificationSubsystem.hs index c80c34d1755..ab110254cb2 100644 --- a/libs/wire-subsystems/src/Wire/NotificationSubsystem.hs +++ b/libs/wire-subsystems/src/Wire/NotificationSubsystem.hs @@ -9,7 +9,6 @@ import Data.Id import Data.List.NonEmpty (NonEmpty ((:|))) import Gundeck.Types hiding (Push (..), Recipient, newPush) import Imports -import Network.AMQP import Polysemy import Wire.Arbitrary @@ -50,7 +49,7 @@ data NotificationSubsystem m a where CleanupUser :: UserId -> NotificationSubsystem m () UnregisterPushClient :: UserId -> ClientId -> NotificationSubsystem m () GetPushTokens :: UserId -> NotificationSubsystem m [PushToken] - SetUpUserNotificationQueues :: Channel -> UserId -> ClientId -> NotificationSubsystem m () + SetupConsumableNotifications :: UserId -> ClientId -> NotificationSubsystem m () makeSem ''NotificationSubsystem diff --git a/libs/wire-subsystems/src/Wire/NotificationSubsystem/Interpreter.hs b/libs/wire-subsystems/src/Wire/NotificationSubsystem/Interpreter.hs index a0174bb9bc5..b35dfdc64e3 100644 --- a/libs/wire-subsystems/src/Wire/NotificationSubsystem/Interpreter.hs +++ b/libs/wire-subsystems/src/Wire/NotificationSubsystem/Interpreter.hs @@ -41,7 +41,8 @@ runNotificationSubsystemGundeck :: Member Delay r, Member (Final IO) r, Member P.TinyLog r, - Member (Embed IO) r + Member (Embed IO) r, + Member (Input Channel) r ) => NotificationSubsystemConfig -> Sem (NotificationSubsystem : r) a -> @@ -53,7 +54,9 @@ runNotificationSubsystemGundeck cfg = interpret $ \case CleanupUser uid -> GundeckAPIAccess.userDeleted uid UnregisterPushClient uid cid -> GundeckAPIAccess.unregisterPushClient uid cid GetPushTokens uid -> GundeckAPIAccess.getPushTokens uid - SetUpUserNotificationQueues chan uid cid -> void $ liftIO $ setUpUserNotificationQueuesImpl chan uid cid + SetupConsumableNotifications uid cid -> do + chan <- input + void $ liftIO $ setupConsumableNotificationsImpl chan uid cid data NotificationSubsystemConfig = NotificationSubsystemConfig { fanoutLimit :: Range 1 HardTruncationLimit Int32, @@ -177,12 +180,12 @@ pushSlowlyImpl ps = delay =<< inputs (diffTimeToFullMicroseconds . slowPushDelay) pushImpl [p] -setUpUserNotificationQueuesImpl :: +setupConsumableNotificationsImpl :: Channel -> UserId -> ClientId -> IO Text -setUpUserNotificationQueuesImpl chan uid cid = do +setupConsumableNotificationsImpl chan uid cid = do let qName = "user-notifications." <> idToText uid <> "." <> clientToText cid -- TODO: Do this using policies: https://www.rabbitmq.com/docs/parameters#policies let headers = diff --git a/services/brig/src/Brig/API/Client.hs b/services/brig/src/Brig/API/Client.hs index ff43f760c4e..dbf2a1fa238 100644 --- a/services/brig/src/Brig/API/Client.hs +++ b/services/brig/src/Brig/API/Client.hs @@ -86,11 +86,9 @@ import Data.Set qualified as Set import Data.Text.Encoding qualified as T import Data.Text.Encoding.Error import Imports hiding ((\\)) -import Network.AMQP (Channel) import Network.HTTP.Types.Method (StdMethod) import Network.Wai.Utilities import Polysemy -import Polysemy.Input (Input, input) import Servant (Link, ToHttpApiData (toUrlPiece)) import System.Logger.Class (field, msg, val, (~~)) import System.Logger.Class qualified as Log @@ -171,8 +169,7 @@ addClient :: Member DeleteQueue r, Member EmailSubsystem r, Member VerificationCodeSubsystem r, - Member Events r, - Member (Input Channel) r + Member Events r ) => Local UserId -> Maybe ConnId -> @@ -190,8 +187,7 @@ addClientWithReAuthPolicy :: Member EmailSubsystem r, Member Events r, Member UserSubsystem r, - Member VerificationCodeSubsystem r, - Member (Input Channel) r + Member VerificationCodeSubsystem r ) => Data.ReAuthPolicy -> Local UserId -> @@ -217,8 +213,7 @@ addClientWithReAuthPolicy policy luid@(tUnqualified -> u) con new = do !>> ClientDataError let clt = clt0 {clientMLSPublicKeys = newClientMLSPublicKeys new} when (ClientSupportsConsumableNotifications `Set.member` (foldMap fromClientCapabilityList mCaps)) $ lift $ liftSem $ do - chanMVar <- input - setUpUserNotificationQueues chanMVar u clt.clientId + setupConsumableNotifications u clt.clientId lift $ do for_ old $ execDelete u con liftSem $ GalleyAPIAccess.newClient u clt.clientId @@ -245,9 +240,7 @@ addClientWithReAuthPolicy policy luid@(tUnqualified -> u) con new = do VerificationCodeNoEmail -> throwE ClientCodeAuthenticationFailed updateClient :: - ( Member NotificationSubsystem r, - Member (Input Channel) r - ) => + (Member NotificationSubsystem r) => UserId -> ClientId -> UpdateClient -> @@ -261,8 +254,7 @@ updateClient uid cid req = do -- first set up the notification queues then save the data is more robust than the other way around let addedCapabilities = caps.fromClientCapabilityList \\ client.clientCapabilities.fromClientCapabilityList when (ClientSupportsConsumableNotifications `Set.member` addedCapabilities) $ lift $ liftSem $ do - chanMVar <- input - setUpUserNotificationQueues chanMVar uid cid + setupConsumableNotifications uid cid wrapClientE $ lift . Data.updateClientCapabilities uid cid . Just $ caps else throwE $ clientError ClientCapabilitiesCannotBeRemoved let lk = maybeToList (unpackLastPrekey <$> req.updateClientLastKey) diff --git a/services/brig/src/Brig/API/Internal.hs b/services/brig/src/Brig/API/Internal.hs index 8321f185489..28ae7f50973 100644 --- a/services/brig/src/Brig/API/Internal.hs +++ b/services/brig/src/Brig/API/Internal.hs @@ -64,7 +64,6 @@ import Data.Set qualified as Set import Data.Text qualified as T import Data.Time.Clock.System import Imports hiding (head) -import Network.AMQP (Channel) import Network.Wai.Utilities as Utilities import Polysemy import Polysemy.Input (Input, input) @@ -142,8 +141,7 @@ servantSitemap :: Member Events r, Member PasswordResetCodeStore r, Member PropertySubsystem r, - Member (Input TeamTemplates) r, - Member (Input Channel) r + Member (Input TeamTemplates) r ) => ServerT BrigIRoutes.API (Handler r) servantSitemap = @@ -195,8 +193,7 @@ accountAPI :: Member PropertySubsystem r, Member Events r, Member PasswordResetCodeStore r, - Member InvitationCodeStore r, - Member (Input Channel) r + Member InvitationCodeStore r ) => ServerT BrigIRoutes.AccountAPI (Handler r) accountAPI = @@ -424,8 +421,7 @@ addClientInternalH :: Member EmailSubsystem r, Member Events r, Member UserSubsystem r, - Member VerificationCodeSubsystem r, - Member (Input Channel) r + Member VerificationCodeSubsystem r ) => UserId -> Maybe Bool -> diff --git a/services/brig/src/Brig/API/Public.hs b/services/brig/src/Brig/API/Public.hs index 6be3d7ab4a1..d4934632c0c 100644 --- a/services/brig/src/Brig/API/Public.hs +++ b/services/brig/src/Brig/API/Public.hs @@ -586,8 +586,7 @@ addClient :: Member EmailSubsystem r, Member VerificationCodeSubsystem r, Member Events r, - Member UserSubsystem r, - Member (Input Channel) r + Member UserSubsystem r ) => Local UserId -> ConnId -> diff --git a/services/brig/src/Brig/CanonicalInterpreter.hs b/services/brig/src/Brig/CanonicalInterpreter.hs index 837ec36c871..5f71b533af6 100644 --- a/services/brig/src/Brig/CanonicalInterpreter.hs +++ b/services/brig/src/Brig/CanonicalInterpreter.hs @@ -23,6 +23,7 @@ import Data.Qualified (Local, toLocalUnsafe) import Data.Time.Clock (UTCTime, getCurrentTime) import Imports import Network.AMQP +import Network.AMQP qualified as Q import Polysemy import Polysemy.Async import Polysemy.Conc diff --git a/services/cannon/src/Cannon/RabbitMqConsumerApp.hs b/services/cannon/src/Cannon/RabbitMqConsumerApp.hs index 2149295484a..02ba59d1bbe 100644 --- a/services/cannon/src/Cannon/RabbitMqConsumerApp.hs +++ b/services/cannon/src/Cannon/RabbitMqConsumerApp.hs @@ -22,7 +22,7 @@ rabbitMQWebSocketApp uid cid e pendingConn = do withConnection e.logg e.rabbitmq $ \conn -> do chan <- liftIO $ Amqp.openChannel conn -- TODO: Don't use the interpreter - qName <- setUpUserNotificationQueuesImpl chan uid cid + qName <- setupConsumableNotificationsImpl chan uid cid let cleanup :: (Exception e, MonadThrow m, MonadIO m) => e -> m () cleanup err = do Log.err e.logg $ Log.msg (Log.val "Pushing to WS failed") . Log.field "error" (displayException err)