Skip to content

Commit

Permalink
hstream-store: add listStreamPartitionsOrderedByName (#1755)
Browse files Browse the repository at this point in the history
* add listStreamPartitionsOrderedByName

* add a SortBench.hs

---------

Co-authored-by: YangKian <[email protected]>
  • Loading branch information
4eUeP and YangKian authored Feb 2, 2024
1 parent d2d41b6 commit 3bb684f
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 2 deletions.
19 changes: 18 additions & 1 deletion hstream-store/HStream/Store/Stream.hs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ module HStream.Store.Stream
, doesStreamExist
, listStreamPartitions
, listStreamPartitionsOrdered
, listStreamPartitionsOrderedByName
, doesStreamPartitionExist
, doesStreamPartitionValExist
, getStreamExtraAttrs
Expand Down Expand Up @@ -167,7 +168,7 @@ module HStream.Store.Stream

import Control.Exception (catch, try)
import Control.Monad (filterM, forM, (<=<))
import Data.Bifunctor (bimap, second)
import Data.Bifunctor (bimap)
import Data.Bits (bit)
import Data.Default (def)
import Data.Foldable (foldrM)
Expand Down Expand Up @@ -545,6 +546,22 @@ listStreamPartitionsOrdered client streamid = do
V.sortBy (\e1 e2 -> compare (snd e1) (snd e2)) mvec
V.unsafeFreeze mvec

-- Sorted by log name
listStreamPartitionsOrderedByName
:: HasCallStack
=> FFI.LDClient
-> StreamId
-> IO (Vector (CBytes, FFI.C_LogID))
listStreamPartitionsOrderedByName client streamid = do
dir_path <- getStreamDirPath streamid
keys <- LD.logDirLogsNames =<< LD.getLogDirectory client dir_path
ps <- forM (V.fromList keys) $ \key -> do
logId <- getUnderlyingLogId client streamid (Just key)
pure (key, logId)
!mvec <- V.unsafeThaw ps
V.sortBy (\e1 e2 -> compare (fst e1) (fst e2)) mvec
V.unsafeFreeze mvec

doesStreamPartitionExist
:: HasCallStack
=> FFI.LDClient
Expand Down
45 changes: 45 additions & 0 deletions hstream-store/bench/SortBench.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
{-# LANGUAGE BangPatterns #-}

module Main where

import Control.Monad
import Criterion.Main
import qualified Data.List as L
import qualified Data.Vector as V
import qualified Data.Vector.Algorithms.Intro as V
import System.Random (randomRIO)

sortList :: [Int] -> IO [Int]
sortList xs = pure $! L.sort xs

sortVectorIntro :: V.Vector Int -> IO (V.Vector Int)
sortVectorIntro xs = do
!mvec <- V.unsafeThaw xs
V.sort mvec
V.unsafeFreeze mvec

--ys ::

main :: IO ()
main = do
let n = 1000000
let !xs1 = [0..n]
!ys1 = V.fromList xs1
!xs2 = [n..0]
!ys2 = V.fromList xs2

-- n(1000000) is too many for generate random numbers, so we use 10000 instead.
!xs3 <- replicateM 10000 $ randomRIO (0, n :: Int)
let !ys3 = V.fromList xs3

defaultMain
[ bgroup "sort1" [ bench "List" $ nfIO (sortList xs1)
, bench "Vector.Algorithms.Intro" $ nfIO (sortVectorIntro ys1)
]
, bgroup "sort2" [ bench "List" $ nfIO (sortList xs2)
, bench "Vector.Algorithms.Intro" $ nfIO (sortVectorIntro ys2)
]
, bgroup "sort3" [ bench "List" $ nfIO (sortList xs3)
, bench "Vector.Algorithms.Intro" $ nfIO (sortVectorIntro ys3)
]
]
16 changes: 16 additions & 0 deletions hstream-store/hstream-store.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -305,3 +305,19 @@ executable hstore-bench-writter

default-language: Haskell2010
ghc-options: -threaded -rtsopts -with-rtsopts=-N

benchmark hstore-sort-bench
import: shared-properties
type: exitcode-stdio-1.0
main-is: SortBench.hs
hs-source-dirs: bench
build-depends:
, base >=4.11 && <5
, bytestring
, criterion
, random
, vector
, vector-algorithms

default-language: Haskell2010
ghc-options: -threaded -rtsopts -with-rtsopts=-N
19 changes: 18 additions & 1 deletion hstream-store/test/HStream/Store/StreamSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
module HStream.Store.StreamSpec (spec) where

import Control.Concurrent (threadDelay)
import Control.Monad (replicateM, void)
import Control.Monad
import Data.Int
import Data.List (sort)
import qualified Data.Map as M
import qualified Data.Map.Strict as Map
import qualified Data.Vector as V
import Test.Hspec
import Z.Data.Vector.Base (Bytes)

Expand Down Expand Up @@ -142,6 +144,21 @@ base = describe "BaseSpec" $ do
S.removeStream client newStreamId
S.doesStreamExist client newStreamId `shouldReturn` False

it "listStreamPartitionsOrdered" $ do
streamid <- S.mkStreamId S.StreamTypeTopic <$> newRandomName 5
let attrs = S.def{ S.logReplicationFactor = S.defAttr1 1 }
S.createStream client streamid attrs
S.doesStreamExist client streamid `shouldReturn` True
let parts = ["10", "01", "11"]
logids <- forM parts $ \p -> do
S.createStreamPartition client streamid (Just p) Map.empty
vs1 <- S.listStreamPartitionsOrdered client streamid
V.map snd vs1 `shouldBe` V.fromList (sort logids)
vs2 <- S.listStreamPartitionsOrderedByName client streamid
V.map fst vs2 `shouldBe` V.fromList (sort parts)

S.removeStream client streamid

archiveStreamSpec :: Spec
archiveStreamSpec = describe "ArchiveStreamSpec" $ do
streamId <- S.mkStreamId S.StreamTypeStream <$> runIO (newRandomName 5)
Expand Down

0 comments on commit 3bb684f

Please sign in to comment.