Skip to content

Commit

Permalink
Pull index migration tests
Browse files Browse the repository at this point in the history
... from #4346

These tests are supposed to fail as they show an incompatibility with
OpenSearch (the bulk API of OS does not support "_version" tricks.)
However, we need to get them green (or switch to ElasticSearch 7) to be
able to use index migration jobs.
  • Loading branch information
supersven committed Nov 22, 2024
1 parent e147a42 commit 0b12c1d
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 22 deletions.
2 changes: 1 addition & 1 deletion services/brig/src/Brig/Index/Options.hs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ module Brig.Index.Options
commandParser,
mkCreateIndexSettings,
toESServer,
ReindexFromAnotherIndexSettings,
ReindexFromAnotherIndexSettings(..),
reindexDestIndex,
reindexTimeoutSeconds,
reindexEsConnection,
Expand Down
113 changes: 92 additions & 21 deletions services/brig/test/integration/API/Search.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE PartialTypeSignatures #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE RecordWildCards #-}
-- 'putMapping' is incorrectly deprecated in bloodhound
{-# OPTIONS_GHC -Wno-deprecations #-}
{-# OPTIONS_GHC -Wno-incomplete-uni-patterns #-}
Expand Down Expand Up @@ -35,12 +36,17 @@ import API.User.Util
import Bilge
import Bilge.Assert
import Brig.App (initHttpManagerWithTLSConfig)
import Brig.Index.Eval (runCommand)
import Brig.Index.Options
import Brig.Index.Options qualified as IndexOpts
import Brig.Options (ElasticSearchOpts)
import Brig.Options qualified as Opt
import Brig.Options qualified as Opts
import Cassandra qualified as C
import Cassandra.Options qualified as CassOpts
import Control.Lens ((.~), (?~), (^.))
import Control.Monad.Catch (MonadCatch, MonadThrow)
import Control.Retry
import Data.Aeson (FromJSON, Value, decode)
import Control.Monad.Catch (MonadCatch)
import Data.Aeson (Value, decode)
import Data.Aeson qualified as Aeson
import Data.Domain (Domain (Domain))
import Data.Handle (fromHandle)
Expand All @@ -61,13 +67,15 @@ import Network.Wai qualified as Wai
import Network.Wai.Handler.Warp qualified as Warp
import Network.Wai.Test qualified as WaiTest
import Safe (headMay)
import System.Logger qualified as Log
import Test.QuickCheck (Arbitrary (arbitrary), generate)
import Test.Tasty
import Test.Tasty.HUnit
import Text.RawString.QQ (r)
import URI.ByteString qualified as URI
import UnliftIO (Concurrently (..), async, bracket, cancel, runConcurrently)
import Util
import Util.Options (Endpoint)
import Wire.API.Federation.API.Brig (SearchResponse (SearchResponse))
import Wire.API.Team.Feature
import Wire.API.Team.SearchVisibility
Expand All @@ -93,7 +101,11 @@ tests opts mgr galley brig = do
testWithBothIndices opts mgr "Non ascii names" $ testSearchNonAsciiNames brig,
testWithBothIndices opts mgr "user with umlaut" $ testSearchWithUmlaut brig,
testWithBothIndices opts mgr "user with japanese name" $ testSearchCJK brig,
test mgr "migration to new index" $ testMigrationToNewIndex opts brig,
testGroup "index migration" $
[ test mgr "migration to new index from existing index" $ testMigrationToNewIndex opts brig runReindexFromAnotherIndex,
test mgr "migration to new index from database" $ testMigrationToNewIndex opts brig (runReindexFromDatabase Reindex),
test mgr "migration to new index from database (force sync)" $ testMigrationToNewIndex opts brig (runReindexFromDatabase ReindexSameOrNewer)
],
testGroup "team A: SearchVisibilityStandard (= unrestricted outbound search)" $
[ testGroup "team A: SearchableByOwnTeam (= restricted inbound search)" $
[ testWithBothIndices opts mgr " I. non-team user cannot find team A member by display name" $ testSearchTeamMemberAsNonMemberDisplayName mgr brig galley FeatureStatusDisabled,
Expand Down Expand Up @@ -611,8 +623,13 @@ testSearchOtherDomain opts brig = do
-- cluster. This test spins up a proxy server to pass requests to our only ES
-- server. The proxy server ensures that only requests to the 'old' index go
-- through.
testMigrationToNewIndex :: (TestConstraints m, MonadUnliftIO m) => Opt.Opts -> Brig -> m ()
testMigrationToNewIndex opts brig = do
testMigrationToNewIndex ::
(TestConstraints m, MonadUnliftIO m) =>
Opt.Opts ->
Brig ->
(Log.Logger -> Opt.Opts -> ES.IndexName -> IO ()) ->
m ()
testMigrationToNewIndex opts brig reindexCommand = do
withOldESProxy opts $ \oldESUrl oldESIndex -> do
oldIndexName :: ES.IndexName <- either (\v -> fail ("Invalid index name" ++ Text.unpack v)) pure $ ES.mkIndexName oldESIndex
let optsOldIndex =
Expand Down Expand Up @@ -656,9 +673,10 @@ testMigrationToNewIndex opts brig = do
assertCanFindByName brig phase1TeamUser1 phase2TeamUser

-- Run Migrations
let newIndexName = opts ^. Opt.elasticsearchLens . Opt.indexLens
taskNodeId <- assertRight =<< runBH opts (ES.reindexAsync $ ES.mkReindexRequest oldIndexName newIndexName)
void $ runBH opts $ waitForTaskToComplete @ES.ReindexResponse taskNodeId
logger <- Log.create Log.StdOut
liftIO $ do
createCommand logger opts oldIndexName
reindexCommand logger opts oldIndexName

-- Phase 3: Using old index for search, writing to both indices, migrations have run
refreshIndex brig
Expand Down Expand Up @@ -692,6 +710,71 @@ testMigrationToNewIndex opts brig = do
assertCanFindByName brig phase1TeamUser1 phase3NonTeamUser
assertCanFindByName brig phase1TeamUser1 phase3TeamUser

createCommand :: Log.Logger -> Opt.Opts -> ES.IndexName -> IO ()
createCommand logger opts oldIndexName =
let newIndexName = opts ^. Opt.elasticsearchLens . Opt.indexLens
esOldOpts :: Opt.ElasticSearchOpts = (opts ^. Opt.elasticsearchLens) & (Opt.indexLens .~ oldIndexName)
esOldConnectionSettings :: ESConnectionSettings = toESConnectionSettings esOldOpts
esNewConnectionSettings = esOldConnectionSettings {esIndex = newIndexName}
replicas = 2
shards = 2
refreshInterval = 5
esSettings =
IndexOpts.localElasticSettings
& IndexOpts.esConnection .~ esNewConnectionSettings
& IndexOpts.esIndexReplicas .~ ES.ReplicaCount replicas
& IndexOpts.esIndexShardCount .~ shards
& IndexOpts.esIndexRefreshInterval .~ refreshInterval
in runCommand logger $ Create esSettings opts.galley

runReindexFromAnotherIndex :: Log.Logger -> Opt.Opts -> ES.IndexName -> IO ()
runReindexFromAnotherIndex logger opts oldIndexName =
let newIndexName = opts ^. Opt.elasticsearchLens . Opt.indexLens
esOldOpts :: Opt.ElasticSearchOpts = (opts ^. Opt.elasticsearchLens) & (Opt.indexLens .~ oldIndexName)
esOldConnectionSettings :: ESConnectionSettings = toESConnectionSettings esOldOpts
reindexSettings = ReindexFromAnotherIndexSettings esOldConnectionSettings newIndexName 5
in runCommand logger $ ReindexFromAnotherIndex reindexSettings

runReindexFromDatabase ::
(ElasticSettings -> CassandraSettings -> Endpoint -> Command) ->
Log.Logger ->
Opt.Opts ->
ES.IndexName ->
IO ()
runReindexFromDatabase syncCommand logger opts oldIndexName =
let newIndexName = opts ^. Opt.elasticsearchLens . Opt.indexLens
esOldOpts :: Opt.ElasticSearchOpts = (opts ^. Opt.elasticsearchLens) & (Opt.indexLens .~ oldIndexName)
esOldConnectionSettings :: ESConnectionSettings = toESConnectionSettings esOldOpts
esNewConnectionSettings = esOldConnectionSettings {esIndex = newIndexName}
replicas = 2
shards = 2
refreshInterval = 5
elasticSettings :: ElasticSettings =
IndexOpts.localElasticSettings
& IndexOpts.esConnection .~ esNewConnectionSettings
& IndexOpts.esIndexReplicas .~ ES.ReplicaCount replicas
& IndexOpts.esIndexShardCount .~ shards
& IndexOpts.esIndexRefreshInterval .~ refreshInterval
cassandraSettings :: CassandraSettings =
( localCassandraSettings
& IndexOpts.cHost .~ (Text.unpack opts.cassandra.endpoint.host)
& IndexOpts.cPort .~ (opts.cassandra.endpoint.port)
& IndexOpts.cKeyspace .~ (C.Keyspace opts.cassandra.keyspace)
)

endpoint :: Endpoint = opts.galley
in runCommand logger $ syncCommand elasticSettings cassandraSettings endpoint

toESConnectionSettings :: ElasticSearchOpts -> ESConnectionSettings
toESConnectionSettings opts = ESConnectionSettings {..}
where
toText (ES.Server url) = url
esServer = (fromRight undefined . URI.parseURI URI.strictURIParserOptions . Text.encodeUtf8 . toText) opts.url
esIndex = opts.index
esCaCert = opts.caCert
esInsecureSkipVerifyTls = opts.insecureSkipVerifyTls
esCredentials = opts.credentials

withOldESProxy :: (TestConstraints m, MonadUnliftIO m, HasCallStack) => Opt.Opts -> (Text -> Text -> m a) -> m a
withOldESProxy opts f = do
indexNameText <- randomHandle
Expand All @@ -717,18 +800,6 @@ indexProxyServer idx opts mgr =
else Wai.WPRResponse (Wai.responseLBS HTTP.status400 [] $ "Refusing to proxy to path=" <> cs (Wai.rawPathInfo req))
in waiProxyTo proxyApp Wai.defaultOnExc mgr

waitForTaskToComplete :: forall a m. (ES.MonadBH m, MonadThrow m, FromJSON a) => ES.TaskNodeId -> m ()
waitForTaskToComplete taskNodeId = do
let policy = constantDelay 100000 <> limitRetries 30
let retryCondition _ = fmap not . isTaskComplete
task <- retrying policy retryCondition (const $ ES.tryPerformBHRequest $ ESR.getTask @a taskNodeId)
taskCompleted <- isTaskComplete task
liftIO $ assertBool "Timed out waiting for task" taskCompleted
where
isTaskComplete :: Either ES.EsError (ES.TaskResponse a) -> m Bool
isTaskComplete (Left e) = liftIO $ assertFailure $ "Expected Right, got Left: " <> show e
isTaskComplete (Right taskRes) = pure $ ES.taskResponseCompleted taskRes

testWithBothIndices :: Opt.Opts -> Manager -> TestName -> WaiTest.Session a -> TestTree
testWithBothIndices opts mgr name f = do
testGroup
Expand Down

0 comments on commit 0b12c1d

Please sign in to comment.