Skip to content

Commit

Permalink
fix: workaround for building hstream with system-provided jemalloc (#…
Browse files Browse the repository at this point in the history
…1769)

See 'hstream/src/lib/HStream/RawString.hs' for details
  • Loading branch information
4eUeP authored Feb 29, 2024
1 parent 8516677 commit b7a005c
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 190 deletions.
13 changes: 2 additions & 11 deletions hstream/app/client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
Expand All @@ -31,7 +30,6 @@ import qualified Options.Applicative as O
import Proto3.Suite (def)
import System.Exit (exitFailure)
import System.Timeout (timeout)
import Text.RawString.QQ (r)

import qualified HStream.Admin.Server.Command as Admin
import HStream.Client.Action (alterConnectorConfig,
Expand All @@ -46,6 +44,7 @@ import HStream.Client.Execute (executeWithLookupResource_,
initCliContext,
simpleExecute)
import HStream.Client.Internal (interactiveAppend)
import HStream.RawString (cliBanner)
#ifdef HStreamEnableSchema
import HStream.Client.SQLNew (commandExec,
interactiveSQLApp)
Expand Down Expand Up @@ -116,18 +115,10 @@ hstreamSQL connOpt HStreamSqlOpts{_updateInterval = updateInterval,
_retryInterval = retryInterval, _retryLimit = retryLimit, .. } = do
hstreamCliContext <- initCliContext connOpt
case _execute of
Nothing -> showHStream *> interactiveSQLApp HStreamSqlContext{..} _historyFile
Nothing -> putStrLn cliBanner *> interactiveSQLApp HStreamSqlContext{..} _historyFile
Just statement -> do
when (Char.isSpace `all` statement) $ do putStrLn "Empty statement" *> exitFailure
commandExec HStreamSqlContext{..} statement
where
showHStream = putStrLn [r|
__ _________________ _________ __ ___
/ / / / ___/_ __/ __ \/ ____/ | / |/ /
/ /_/ /\__ \ / / / /_/ / __/ / /| | / /|_/ /
/ __ /___/ // / / _, _/ /___/ ___ |/ / / /
/_/ /_//____//_/ /_/ |_/_____/_/ |_/_/ /_/
|]

hstreamNodes :: RefinedCliConnOpts -> HStreamNodes -> IO ()
hstreamNodes connOpts HStreamNodesList =
Expand Down
19 changes: 3 additions & 16 deletions hstream/app/lib/KafkaServer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
Expand Down Expand Up @@ -35,6 +34,7 @@ import ZooKeeper (withResource,
zookeeperResInit)

import qualified Data.Vector as V

import HStream.Base (setupFatalSignalHandler)
import HStream.Common.Server.HashRing (updateHashRing)
import qualified HStream.Common.Server.MetaData as M
Expand Down Expand Up @@ -65,15 +65,12 @@ import qualified HStream.Logger as Log
import HStream.MetaStore.Types (MetaHandle (..),
MetaStore (..),
RHandle (..))
import HStream.RawString (banner)
import qualified HStream.Server.HStreamInternal as I
import qualified HStream.Store.Logger as S
import qualified HStream.ThirdParty.Protobuf as Proto
import HStream.Utils (getProtoTimestamp)

#ifndef HSTREAM_ENABLE_ASAN
import Text.RawString.QQ (r)
#endif

-------------------------------------------------------------------------------

runApp :: IO ()
Expand Down Expand Up @@ -162,17 +159,7 @@ serve :: ServerContext -> K.ServerOptions
-> IO ()
serve sc@ServerContext{..} netOpts usingCppServer = do
Log.i "************************"
#ifndef HSTREAM_ENABLE_ASAN
hPutStrLn stderr $ [r|
_ _ __ _____ ___ ___ __ __ __
| || |/' _/_ _| _ \ __|/ \| V |
| >< |`._`. | | | v / _|| /\ | \_/ |
|_||_||___/ |_| |_|_\___|_||_|_| |_|

|]
#else
hPutStrLn stderr "ONLY FOR DEBUG: Enable ASAN"
#endif
hPutStrLn stderr banner
Log.i "************************"

let serverOnStarted = do
Expand Down
21 changes: 3 additions & 18 deletions hstream/app/server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
Expand Down Expand Up @@ -46,11 +45,13 @@ import HStream.Gossip (GossipContext (..),
initGossipContext,
startGossip, waitGossipBoot)
import HStream.Gossip.Types (InitType (Gossip))
import qualified HStream.Gossip.Types as Gossip
import qualified HStream.Kafka.Server.Config as Ka
import qualified HStream.Logger as Log
import HStream.MetaStore.Types as M (MetaHandle (..),
MetaStore (..),
RHandle (..))
import HStream.RawString (banner)
import HStream.Server.Config (AdvertisedListeners,
ExperimentalFeature (..),
FileLoggerSettings (..),
Expand Down Expand Up @@ -88,12 +89,6 @@ import qualified Network.GRPC.HighLevel.Client as GRPC
import qualified Network.GRPC.HighLevel.Generated as GRPC
#endif

#ifndef HSTREAM_ENABLE_ASAN
import Text.RawString.QQ (r)
#endif

import qualified HStream.Gossip.Types as Gossip

-------------------------------------------------------------------------------

main :: IO ()
Expand Down Expand Up @@ -195,17 +190,7 @@ serve
-> IO ()
serve sc@ServerContext{..} rpcOpts enableStreamV2 = do
Log.i "************************"
#ifndef HSTREAM_ENABLE_ASAN
hPutStrLn stderr $ [r|
_ _ __ _____ ___ ___ __ __ __
| || |/' _/_ _| _ \ __|/ \| V |
| >< |`._`. | | | v / _|| /\ | \_/ |
|_||_||___/ |_| |_|_\___|_||_|_| |_|

|]
#else
hPutStrLn stderr "ONLY FOR DEBUG: Enable ASAN"
#endif
hPutStrLn stderr banner
Log.i "************************"

let serverOnStarted = do
Expand Down
27 changes: 20 additions & 7 deletions hstream/hstream.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,21 @@ common link-asan
"-optl-Wl,--whole-archive" "-optl-Wl,-Bstatic" "-optl-Wl,-lasan"
"-optl-Wl,-Bdynamic" "-optl-Wl,--no-whole-archive"

-- Local library for HStream RawStrings, see module description for why we need this.
library hstream-lib-rawstring
import: shared-properties
hs-source-dirs: src/lib
exposed-modules: HStream.RawString
build-depends:
, base
, containers
, raw-strings-qq

if flag(hstream_enable_asan)
cpp-options: -DHSTREAM_ENABLE_ASAN

default-language: Haskell2010

library
import: shared-properties
exposed-modules:
Expand Down Expand Up @@ -160,6 +175,7 @@ library
, hstream-processing
, hstream-sql
, hstream-store
, hstream:hstream-lib-rawstring
, memory
, microlens
, microlens-aeson
Expand All @@ -168,7 +184,6 @@ library
, optparse-applicative
, proto3-suite
, proto3-wire
, raw-strings-qq
, rocksdb-haskell-bindings
, scientific
, split
Expand All @@ -183,7 +198,7 @@ library
, unix
, unordered-containers
, uuid
, vector ^>=0.13
, vector ^>=0.13
, vector-algorithms
, yaml
, Z-Data
Expand Down Expand Up @@ -234,12 +249,12 @@ library hstream-app
, hstream-gossip
, hstream-kafka
, hstream-store
, hstream:hstream-lib-rawstring
, http-client
, memory
, optparse-applicative
, proto3-suite
, proto3-wire
, raw-strings-qq
, stm
, suspend
, text
Expand Down Expand Up @@ -283,13 +298,12 @@ executable hstream-server
, hstream-gossip
, hstream-kafka
, hstream-store
, hstream:{hstream, hstream-app}
, hstream:{hstream, hstream-app, hstream-lib-rawstring}
, http-client
, memory
, optparse-applicative
, proto3-suite
, proto3-wire
, raw-strings-qq
, stm
, suspend
, text
Expand Down Expand Up @@ -320,19 +334,18 @@ executable hstream
, grpc-haskell
, grpc-haskell-core
, haskeline
, hstream
, hstream-admin-server
, hstream-api-hs
, hstream-common
, hstream-common-base
, hstream-common-stats
, hstream-sql
, hstream-store
, hstream:{hstream, hstream-lib-rawstring}
, network
, optparse-applicative
, proto3-suite
, random
, raw-strings-qq
, split
, text
, unix
Expand Down
77 changes: 8 additions & 69 deletions hstream/src/HStream/Client/SQL.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
Expand All @@ -19,19 +18,18 @@ import Control.Concurrent (forkFinally, myThreadId,
import Control.Exception (SomeException, handle, try)
import Control.Monad (forM_, forever, void, (>=>))
import Control.Monad.IO.Class (liftIO)
import qualified Data.Aeson.Text as J
import Data.Char (toUpper)
import qualified Data.Map as M
import qualified Data.Text as T
import qualified Data.Text.Lazy as TL
import qualified Data.Vector as V
import Network.GRPC.HighLevel.Client (ClientRequest (..),
ClientResult (..))
import Network.GRPC.HighLevel.Generated (withGRPCClient)
import qualified System.Console.Haskeline as RL
import qualified System.Console.Haskeline.History as RL
import Text.RawString.QQ (r)

import qualified Data.Aeson.Text as J
import qualified Data.Text.Lazy as TL
import HStream.Client.Action (createConnector,
createStream,
createStreamBySelectWithCustomQueryName,
Expand All @@ -50,6 +48,8 @@ import HStream.Client.Types (HStreamCliContext (..),
import HStream.Client.Utils (calculateShardId,
dropPlanToResType)
import HStream.Common.Types (hashShardKey)
import HStream.RawString (cliSqlHelpInfo,
cliSqlHelpInfos)
import HStream.Server.HStreamApi (CommandQuery (..),
CommandQueryResponse (..),
HStreamApi (..),
Expand Down Expand Up @@ -78,7 +78,7 @@ import HStream.Utils (HStreamClientApi,
-- and this needs to be optimized. This could be done with a grpc client pool.
interactiveSQLApp :: HStreamSqlContext -> Maybe FilePath -> IO ()
interactiveSQLApp sqlCtx@HStreamSqlContext{hstreamCliContext = cliCtx@HStreamCliContext{..}, ..} historyFile = do
putStrLn helpInfo
putStrLn cliSqlHelpInfo
tid <- myThreadId
void $ forkFinally maintainAvailableNodes (\case Left err -> throwTo tid err; _ -> return ())
RL.runInputT settings loop
Expand Down Expand Up @@ -121,9 +121,9 @@ commandExec HStreamSqlContext{hstreamCliContext = cliCtx@HStreamCliContext{..},.
-- ":listSubs":_ -> callListSubscriptions cliCtx
-- -- }

":h": _ -> putStrLn helpInfo
":h": _ -> putStrLn cliSqlHelpInfo
[":help"] -> putStr groupedHelpInfo
":help":x:_ -> forM_ (M.lookup (map toUpper x) helpInfos) putStrLn
":help":x:_ -> forM_ (M.lookup (map toUpper x) cliSqlHelpInfos) putStrLn

(_:_) -> liftIO $ handle (\(e :: SomeSQLException) -> putStrLn . formatSomeSQLException $ e) $ do
rSQL <- parseAndRefine $ T.pack xs
Expand Down Expand Up @@ -191,69 +191,8 @@ sqlAction HStreamApi{..} sql = do
putStr $ HStream.Utils.formatCommandQueryResponse x
ClientErrorResponse _ -> putStr $ HStream.Utils.formatResult resp

-- XXX: Can't build template-haskell with asan
#ifndef HSTREAM_ENABLE_ASAN
helpInfo :: String
helpInfo =
[r|
Command
:h To show these help info
:q To exit command line interface
:help [sql_operation] To show full usage of sql statement

SQL STATEMENTS:
To create a simplest stream:
CREATE STREAM stream_name;

To create a query select all fields from a stream:
SELECT * FROM stream_name EMIT CHANGES;

To insert values to a stream:
INSERT INTO stream_name (field1, field2) VALUES (1, 2);
|]

helpInfos :: M.Map String String
helpInfos = M.fromList [
("CREATE",[r|
CREATE STREAM <stream_name> [AS <select_query>] [ WITH ( {stream_options} ) ];
CREATE {SOURCE|SINK} CONNECTOR <connector_name> [IF NOT EXIST] WITH ( {connector_options} ) ;
CREATE VIEW <stream_name> AS <select_query> ;
|]),
("INSERT",[r|
INSERT INTO <stream_name> ( {field_name} ) VALUES ( {field_value} );
INSERT INTO <stream_name> VALUES CAST ('json_value' AS JSONB);
INSERT INTO <stream_name> VALUES CAST ('binary_value' AS BYTEA);
|]),
("SELECT", [r|
SELECT <* | {expression [ AS field_alias ]}>
FROM stream_name_1
[ join_type JOIN stream_name_2
WITHIN (some_interval)
ON stream_name_1.field_1 = stream_name_2.field_2 ]
[ WHERE search_condition ]
[ GROUP BY field_name [, window_type] ]
EMIT CHANGES;
|]),
("SHOW", [r|
SHOW <CONNECTORS|STREAMS|QUERIES|VIEWS>;
|]),
("TERMINATE", [r|
TERMINATE <QUERY <query_id>|ALL>;
|]),
("DROP", [r|
DROP <STREAM <stream_name>|VIEW <view_name>|QUERY <query_id>> [IF EXISTS];
|])
]
#else
helpInfo :: String
helpInfo = ""

helpInfos :: M.Map String String
helpInfos = M.fromList []
#endif

groupedHelpInfo :: String
groupedHelpInfo = ("SQL Statements\n" <> ) . unlines . map (\(x, y) -> x <> " " <> y) . M.toList $ helpInfos
groupedHelpInfo = ("SQL Statements\n" <> ) . unlines . map (\(x, y) -> x <> " " <> y) . M.toList $ cliSqlHelpInfos

runActionWithGrpc :: HStreamCliContext
-> (HStream.Utils.HStreamClientApi -> IO b) -> IO b
Expand Down
Loading

0 comments on commit b7a005c

Please sign in to comment.