diff --git a/common/HStream/Utils/Format.hs b/common/HStream/Utils/Format.hs index d1dd9e26b..60baca3fb 100644 --- a/common/HStream/Utils/Format.hs +++ b/common/HStream/Utils/Format.hs @@ -44,6 +44,9 @@ class Format a where instance Format Protobuf.Empty where formatResult = const "Done.\n" +instance Format () where + formatResult = const "" + instance Format API.Stream where formatResult = (<> "\n") . T.unpack . API.streamStreamName @@ -62,12 +65,17 @@ instance Format API.Query where instance Format API.Connector where formatResult = renderConnectorsToTable . (:[]) +instance Format API.Subscription where + formatResult = renderSubscriptionsToTable . (:[]) instance Format [API.Query] where formatResult = emptyNotice . renderQueriesToTable instance Format [API.Connector] where formatResult = emptyNotice . renderConnectorsToTable +instance Format [API.Subscription] where + formatResult = emptyNotice . renderSubscriptionsToTable + instance Format [API.ServerNode] where formatResult = emptyNotice . renderServerNodesToTable @@ -96,7 +104,7 @@ instance Format API.TerminateQueriesResponse where formatResult = const "Done.\n" instance Format P.Struct where - formatResult (P.Struct kv) = + formatResult s@(P.Struct kv) = case M.toList kv of [("SELECT", Just x)] -> (<> "\n") . TL.unpack . A.encodeToLazyText . valueToJsonValue $ x [("SELECTVIEW", Just x)] -> (<> "\n") . TL.unpack . A.encodeToLazyText . valueToJsonValue $ x @@ -105,7 +113,7 @@ instance Format P.Struct where [("view_query_id", Just x)] -> let (A.String qid) = valueToJsonValue x in "Done. Query ID: " <> T.unpack qid <> "\n" [("Error Message:", Just v)] -> "Error Message: " ++ show v ++ "\n" - x -> show x + _ -> (<> "\n") . TL.unpack . A.encodeToLazyText . structToJsonObject $ s instance Format API.CommandQueryResponse where formatResult = formatCommandQueryResponse @@ -145,6 +153,31 @@ renderQueriesToTable queries = , Table.column Table.expand Table.left def def ] +renderSubscriptionsToTable :: [API.Subscription] -> String +renderSubscriptionsToTable [] = "" +renderSubscriptionsToTable subscriptions = + Table.tableString colSpec Table.asciiS + (Table.fullH (repeat $ Table.headerColumn Table.left Nothing) titles) + (Table.colsAllG Table.center <$> rows) ++ "\n" + where + titles = [ "Subscription ID" + , "Stream Name" + , "Ack Timeout" + , "Max Unacked Records" + ] + formatRow API.Subscription {..} = + [ [T.unpack subscriptionSubscriptionId] + , [T.unpack subscriptionStreamName] + , [show subscriptionAckTimeoutSeconds <> "s"] + , [show subscriptionMaxUnackedRecords] + ] + rows = map formatRow subscriptions + colSpec = [ Table.column Table.expand Table.left def def + , Table.column Table.expand Table.left def def + , Table.column Table.expand Table.left def def + , Table.column Table.expand Table.left def def + ] + renderConnectorsToTable :: [API.Connector] -> String renderConnectorsToTable [] = "" renderConnectorsToTable connectors = diff --git a/hstream-sql/etc/SQL.cf b/hstream-sql/etc/SQL.cf index 881ffb44a..dd42009fa 100644 --- a/hstream-sql/etc/SQL.cf +++ b/hstream-sql/etc/SQL.cf @@ -82,7 +82,7 @@ DropStream. DropOption ::= "STREAM" ; DropView. DropOption ::= "VIEW" ; -- Terminate Query -TerminateQuery. Terminate ::= "TERMINATE" "QUERY" Integer ; +TerminateQuery. Terminate ::= "TERMINATE" "QUERY" Ident ; TerminateAll. Terminate ::= "TERMINATE" "ALL"; ---- Explain diff --git a/hstream-sql/src/HStream/SQL/AST.hs b/hstream-sql/src/HStream/SQL/AST.hs index 34be5c7fe..0832a7fe8 100644 --- a/hstream-sql/src/HStream/SQL/AST.hs +++ b/hstream-sql/src/HStream/SQL/AST.hs @@ -834,12 +834,12 @@ type instance RefinedType DropOption = RDropOption ---- Terminate data RTerminate - = RTerminateQuery String + = RTerminateQuery Text | RTerminateAll deriving (Eq, Show) instance Refine Terminate where - refine (TerminateQuery _ x) = RTerminateQuery (show x) - refine (TerminateAll _ ) = RTerminateAll + refine (TerminateQuery _ (Ident x)) = RTerminateQuery x + refine (TerminateAll _ ) = RTerminateAll type instance RefinedType Terminate = RTerminate ---- Pause diff --git a/hstream-sql/src/HStream/SQL/Codegen.hs b/hstream-sql/src/HStream/SQL/Codegen.hs index 61dbce502..c4e768364 100644 --- a/hstream-sql/src/HStream/SQL/Codegen.hs +++ b/hstream-sql/src/HStream/SQL/Codegen.hs @@ -120,7 +120,7 @@ hstreamCodegen = \case RQDrop (RDropIf RDropConnector x) -> return $ DropPlan True (DConnector x) RQDrop (RDropIf RDropStream x) -> return $ DropPlan True (DStream x) RQDrop (RDropIf RDropView x) -> return $ DropPlan True (DView x) - RQTerminate (RTerminateQuery qid) -> return $ TerminatePlan (OneQuery $ T.pack qid) + RQTerminate (RTerminateQuery qid) -> return $ TerminatePlan (OneQuery qid) RQTerminate RTerminateAll -> return $ TerminatePlan AllQueries --RQSelectView rSelectView -> return $ SelectViewPlan rSelectView RQExplain rexplain -> return $ ExplainPlan rexplain diff --git a/hstream/app/client.hs b/hstream/app/client.hs index cd8e89182..f3c763980 100644 --- a/hstream/app/client.hs +++ b/hstream/app/client.hs @@ -19,16 +19,14 @@ import qualified Data.List as L import Data.Maybe (isNothing, mapMaybe, maybeToList) import qualified Data.Text.Encoding as T -import qualified Data.Text.IO as T import qualified Data.Vector as V import Network.GRPC.HighLevel.Client (ClientConfig, ClientSSLConfig (..), ClientSSLKeyCertPair (..)) -import Network.GRPC.HighLevel.Generated (ClientError (ClientIOError), +import Network.GRPC.HighLevel.Generated (ClientError (..), ClientResult (..), - GRPCIOError (GRPCIOBadStatusCode), - GRPCMethodType (..), - StatusDetails (unStatusDetails), + GRPCIOError (..), + StatusDetails (..), withGRPCClient) import qualified Options.Applicative as O import System.Exit (exitFailure) @@ -51,15 +49,16 @@ import HStream.Client.Types (CliConnOpts (..), HStreamSqlOpts (..), commandParser) import HStream.Client.Utils (mkClientNormalRequest', + printResult, waitForServerToStart) import HStream.Server.HStreamApi (DescribeClusterResponse (..), HStreamApi (..), - ServerNode (serverNodeId), + ServerNode (..), hstreamApiClient) import qualified HStream.Server.HStreamApi as API import HStream.SQL (DropObject (..)) import HStream.ThirdParty.Protobuf (Empty (Empty)) -import HStream.Utils (Format, SocketAddr (..), +import HStream.Utils (SocketAddr (..), fillWithJsonString', formatResult, mkGRPCClientConfWithSSL, @@ -196,14 +195,6 @@ hstreamStream RefinedCliConnOpts{..} = \case simpleExecuteWithAddr addr sslConfig (\HStreamApi{..} -> hstreamApiCreateStream (mkClientNormalRequest' stream)) >>= printResult StreamCmdDelete sName ignoreNonExist -> simpleExecuteWithAddr addr sslConfig (dropAction ignoreNonExist (DStream sName)) >>= printResult - where - printResult :: Format response => ClientResult 'Normal response -> IO () - printResult = \case - ClientNormalResponse x _ _ _ _ -> putStrLn $ formatResult x - ClientErrorResponse (ClientIOError (GRPCIOBadStatusCode _ details)) -> - T.putStrLn $ "Error: " <> T.decodeUtf8 (unStatusDetails details) - ClientErrorResponse err -> do - putStrLn $ "Error: " <> show err getNodes :: RefinedCliConnOpts -> IO DescribeClusterResponse getNodes RefinedCliConnOpts{..} = diff --git a/hstream/hstream.cabal b/hstream/hstream.cabal index e8973fb20..54003ce94 100644 --- a/hstream/hstream.cabal +++ b/hstream/hstream.cabal @@ -128,6 +128,7 @@ library , scientific , statgrab , stm + , string-random , suspend , table-layout , text diff --git a/hstream/src/HStream/Client/Action.hs b/hstream/src/HStream/Client/Action.hs index aa1f98af4..75bb8d344 100644 --- a/hstream/src/HStream/Client/Action.hs +++ b/hstream/src/HStream/Client/Action.hs @@ -9,20 +9,24 @@ module HStream.Client.Action ( Action , createStream - , listStreams - , listViews - , listQueries - , listConnectors - , terminateQueries + , createStreamBySelect + , createConnector + , createSubscription + , createSubscription' + , describeCluster , dropAction + , deleteSubscription , insertIntoStream - , createStreamBySelect + , listConnectors + , listStreams + , listQueries , listShards , lookupResource - , describeCluster , pauseConnector , resumeConnector - , createConnector + , listSubscriptions + , listViews + , terminateQueries ) where import qualified Data.ByteString as BS @@ -46,6 +50,8 @@ import HStream.SQL.Codegen (DropObject (..), import HStream.ThirdParty.Protobuf (Empty (..)) import HStream.Utils +type Action a = HStreamClientApi -> IO (ClientResult 'Normal a) + createStream :: StreamName -> Int -> Action API.Stream createStream sName rFac API.HStreamApi{..} = @@ -62,6 +68,8 @@ listQueries :: Action API.ListQueriesResponse listQueries API.HStreamApi{..} = hstreamApiListQueries clientDefaultRequest listConnectors :: Action API.ListConnectorsResponse listConnectors API.HStreamApi{..} = hstreamApiListConnectors clientDefaultRequest +listSubscriptions :: Action API.ListSubscriptionsResponse +listSubscriptions API.HStreamApi{..} = hstreamApiListSubscriptions clientDefaultRequest terminateQueries :: TerminationSelection -> HStreamClientApi @@ -121,7 +129,6 @@ createConnector sql API.HStreamApi{..} = hstreamApiCreateConnector (mkClientNormalRequest' def { API.createConnectorRequestSql = T.pack sql}) -type Action a = HStreamClientApi -> IO (ClientResult 'Normal a) listShards :: T.Text -> Action API.ListShardsResponse listShards sName API.HStreamApi{..} = do @@ -151,6 +158,16 @@ resumeConnector :: T.Text -> Action Empty resumeConnector cid HStreamApi{..} = hstreamApiResumeConnector $ mkClientNormalRequest' def { resumeConnectorRequestName = cid } +createSubscription :: T.Text -> T.Text -> Action Subscription +createSubscription subId sName = createSubscription' (subscriptionWithDefaultSetting subId sName) + +createSubscription' :: Subscription -> Action Subscription +createSubscription' sub HStreamApi{..} = hstreamApiCreateSubscription $ mkClientNormalRequest' sub + +deleteSubscription :: T.Text -> Action Empty +deleteSubscription subId HStreamApi{..} = hstreamApiDeleteSubscription $ + mkClientNormalRequest' def { deleteSubscriptionRequestSubscriptionId = subId } + -------------------------------------------------------------------------------- fakeMap :: (a -> b) -> ClientResult 'Normal a -> ClientResult 'Normal b diff --git a/hstream/src/HStream/Client/Execute.hs b/hstream/src/HStream/Client/Execute.hs index 8327db83a..c0811bfaa 100644 --- a/hstream/src/HStream/Client/Execute.hs +++ b/hstream/src/HStream/Client/Execute.hs @@ -31,8 +31,8 @@ import HStream.Client.Types (HStreamSqlContext (..), import HStream.Client.Utils import qualified HStream.Server.HStreamApi as API import HStream.SQL -import HStream.Utils (Format, SocketAddr, - getServerResp, +import HStream.Utils (Format, HStreamClientApi, + SocketAddr, getServerResp, mkGRPCClientConfWithSSL, serverNodeToSocketAddr) @@ -51,7 +51,7 @@ execute_ ctx@HStreamSqlContext{..} action = do void $ executeWithAddr_ ctx addr action printResult executeWithLookupResource_ :: Format a => HStreamSqlContext - -> ResourceType -> Action a -> IO () + -> ResourceType -> (HStreamClientApi -> IO a) -> IO () executeWithLookupResource_ ctx@HStreamSqlContext{..} rtype action = do addr <- readMVar currentServer lookupWithAddr ctx addr rtype >>= \case @@ -123,6 +123,6 @@ getInfoWithAddr ctx@HStreamSqlContext{..} addr action cont = do void . swapMVar currentServer $ addr cont resp -simpleExecuteWithAddr :: SocketAddr -> Maybe ClientSSLConfig -> Action a -> IO (ClientResult 'Normal a) +simpleExecuteWithAddr :: SocketAddr -> Maybe ClientSSLConfig -> (HStreamClientApi -> IO a) -> IO a simpleExecuteWithAddr addr sslConfig action = withGRPCClient (mkGRPCClientConfWithSSL addr sslConfig) (API.hstreamApiClient >=> action) diff --git a/hstream/src/HStream/Client/Internal.hs b/hstream/src/HStream/Client/Internal.hs index 431e25b22..e4d73edd4 100644 --- a/hstream/src/HStream/Client/Internal.hs +++ b/hstream/src/HStream/Client/Internal.hs @@ -3,145 +3,74 @@ {-# LANGUAGE RecordWildCards #-} module HStream.Client.Internal - ( callSubscription - , callDeleteSubscription - , callDeleteSubscriptionAll - , callListSubscriptions - , callStreamingFetch + ( streamingFetch + , cliFetch ) where -import Control.Concurrent (readMVar) -import Control.Monad (forM_, void) +import Control.Monad (void) import qualified Data.Text as T import qualified Data.Vector as V -import Network.GRPC.HighLevel.Generated (ClientRequest (..), - ClientResult (..), - GRPCMethodType (..), - withGRPCClient) -import Proto3.Suite (Enumerated (Enumerated)) +import Network.GRPC.HighLevel.Generated (ClientRequest (..)) +import qualified Proto3.Suite as PB -import HStream.Client.Execute (executeWithAddr_, - lookupWithAddr) + +import HStream.Client.Action +import HStream.Client.Execute import HStream.Client.Types (HStreamSqlContext (..), ResourceType (..)) -import HStream.Client.Utils (mkClientNormalRequest') +import HStream.Client.Utils import qualified HStream.Server.HStreamApi as API -import HStream.Utils (HStreamClientApi, - mkGRPCClientConfWithSSL, - serverNodeToSocketAddr) - -callSubscription :: HStreamSqlContext -> T.Text -> T.Text -> IO () -callSubscription ctx subId stream = void $ execute ctx getRespApp handleRespApp - where - getRespApp API.HStreamApi{..} = do - let subReq = API.Subscription - { API.subscriptionSubscriptionId = subId - , API.subscriptionStreamName = stream - , API.subscriptionAckTimeoutSeconds = 1 - , API.subscriptionMaxUnackedRecords = 100 - , API.subscriptionOffset = Enumerated (Right API.SpecialOffsetLATEST) - } - hstreamApiCreateSubscription (mkClientNormalRequest' subReq) - handleRespApp :: ClientResult 'Normal API.Subscription -> IO () - handleRespApp resp = case resp of - (ClientNormalResponse resp_ _meta1 _meta2 _code _details) -> do - putStrLn "-----------------" - print resp_ - putStrLn "-----------------" - _ -> putStrLn "Failed!" - -callDeleteSubscription :: HStreamSqlContext -> T.Text -> IO () -callDeleteSubscription ctx subId = void $ execute ctx getRespApp handleRespApp - where - getRespApp API.HStreamApi{..} = do - let req = API.DeleteSubscriptionRequest - { deleteSubscriptionRequestSubscriptionId = subId, - deleteSubscriptionRequestForce = True - } - hstreamApiDeleteSubscription (mkClientNormalRequest' req) - handleRespApp resp = case resp of - ClientNormalResponse {} -> do - putStrLn "-----------------" - putStrLn "Done." - putStrLn "-----------------" - _ -> putStrLn "Failed!" +import HStream.SQL.Codegen (DropObject (..)) +import qualified HStream.ThirdParty.Protobuf as PB +import HStream.Utils (decompressBatchedRecord, + formatResult) +import Text.StringRandom (stringRandomIO) -callDeleteSubscriptionAll :: HStreamSqlContext -> IO () -callDeleteSubscriptionAll HStreamSqlContext{..} = do - curNode <- readMVar currentServer - withGRPCClient (mkGRPCClientConfWithSSL curNode sslConfig) $ \client -> do - API.HStreamApi{..} <- API.hstreamApiClient client - let listReq = API.ListSubscriptionsRequest - listResp <- hstreamApiListSubscriptions (mkClientNormalRequest' listReq) - case listResp of - (ClientNormalResponse (API.ListSubscriptionsResponse subs) _meta1 _meta2 _code _details) -> do - forM_ subs $ \API.Subscription{..} -> do - let delReq = API.DeleteSubscriptionRequest - { deleteSubscriptionRequestSubscriptionId = subscriptionSubscriptionId - , deleteSubscriptionRequestForce = True - } - _ <- hstreamApiDeleteSubscription (mkClientNormalRequest' delReq) - return () - putStrLn "-----------------" - putStrLn "Done." - putStrLn "-----------------" - _ -> putStrLn "Failed!" - -callListSubscriptions :: HStreamSqlContext -> IO () -callListSubscriptions ctx = void $ execute ctx getRespApp handleRespApp - where - getRespApp API.HStreamApi{..} = do - let req = API.ListSubscriptionsRequest - hstreamApiListSubscriptions (mkClientNormalRequest' req) - handleRespApp :: ClientResult 'Normal API.ListSubscriptionsResponse -> IO () - handleRespApp resp = case resp of - (ClientNormalResponse (API.ListSubscriptionsResponse subs) _meta1 _meta2 _code _details) -> do - putStrLn "-----------------" - mapM_ print subs - putStrLn "-----------------" - _ -> putStrLn "Failed!" - -callStreamingFetch :: HStreamSqlContext -> V.Vector API.RecordId -> T.Text -> T.Text -> IO () -callStreamingFetch ctx@HStreamSqlContext{..} startRecordIds subId clientId = do - curNode <- readMVar currentServer - m_node <- lookupWithAddr ctx curNode (ResSubscription subId) - case m_node of - Nothing -> putStrLn "Subscription not found" - Just node -> withGRPCClient (mkGRPCClientConfWithSSL (serverNodeToSocketAddr node) sslConfig) $ \client -> do - API.HStreamApi{..} <- API.hstreamApiClient client - void $ hstreamApiStreamingFetch (ClientBiDiRequest 10000 mempty action) +streamingFetch :: T.Text -> API.HStreamApi ClientRequest response -> IO () +streamingFetch subId API.HStreamApi{..} = do + clientId <- genClientId + void $ hstreamApiStreamingFetch (ClientBiDiRequest 10000 mempty (action clientId)) where - action _clientCall _meta streamRecv streamSend _writeDone = do - let initReq = API.StreamingFetchRequest - { API.streamingFetchRequestSubscriptionId = subId - , API.streamingFetchRequestConsumerName = clientId - , API.streamingFetchRequestAckIds = startRecordIds - } + action clientId _clientCall _meta streamRecv streamSend writesDone = do _ <- streamSend initReq - recving + receiving where - recving :: IO () - recving = do - m_recv <- streamRecv - case m_recv of - Left err -> print err - Right (Just resp@API.StreamingFetchResponse{..}) -> do - let recIds = maybe V.empty API.receivedRecordRecordIds streamingFetchResponseReceivedRecords - print resp - let ackReq = API.StreamingFetchRequest - { API.streamingFetchRequestSubscriptionId = subId - , API.streamingFetchRequestConsumerName = clientId - , API.streamingFetchRequestAckIds = recIds - } - _ <- streamSend ackReq - recving - Right Nothing -> do - putStrLn "Stopped." + initReq = API.StreamingFetchRequest + { API.streamingFetchRequestSubscriptionId = subId + , API.streamingFetchRequestConsumerName = clientId + , API.streamingFetchRequestAckIds = V.empty + } + receiving :: IO () + receiving = withInterrupt (void writesDone) $ streamRecv >>= \case + Left err -> print err + Right (Just API.StreamingFetchResponse{streamingFetchResponseReceivedRecords = rs}) -> do + let hRecords = maybe V.empty decompressBatchedRecord (API.receivedRecordRecord =<< rs) + let ackReq = initReq { API.streamingFetchRequestAckIds + = maybe V.empty API.receivedRecordRecordIds rs } + let results = (formatResult @PB.Struct <$>) . PB.fromByteString . API.hstreamRecordPayload <$> hRecords + mapM_ (\case Right x -> putStr x; Left x -> print x) results + _ <- streamSend ackReq + receiving + Right Nothing -> putStrLn terminateMsg + +cliFetch :: HStream.Client.Types.HStreamSqlContext -> String -> IO () +cliFetch ctx sql = do + (sName, newSql) <- genRandomSinkStreamSQL (T.pack . removeEmitChanges . words $ sql) + subId <- genRandomSubscriptionId + void . execute ctx $ createStreamBySelect (T.unpack newSql) + void . execute ctx $ createSubscription subId sName + executeWithLookupResource_ ctx (HStream.Client.Types.ResSubscription subId) (streamingFetch subId) + executeWithLookupResource_ ctx (HStream.Client.Types.ResSubscription subId) (void . deleteSubscription subId) + -- FIXME: Replace resource type with Res Stream once lookup stream is supported + executeWithLookupResource_ ctx (HStream.Client.Types.ResSubscription subId) (void . dropAction False (DStream sName)) + +genRandomSubscriptionId :: IO T.Text +genRandomSubscriptionId = do + randomName <- stringRandomIO "[a-zA-Z]{20}" + return $ "cli_internal_subscription_" <> randomName -execute :: HStreamSqlContext - -> (HStreamClientApi -> IO (ClientResult 'Normal a)) - -> (ClientResult 'Normal a -> IO ()) - -> IO () -execute ctx@HStreamSqlContext{..} action cont = do - addr <- readMVar currentServer - executeWithAddr_ ctx addr action cont +genRandomSinkStreamSQL :: T.Text -> IO (T.Text, T.Text) +genRandomSinkStreamSQL sql = do + randomName <- stringRandomIO "[a-zA-Z]{20}" + let streamName = "cli_generated_stream_" <> randomName + return (streamName, "CREATE STREAM " <> streamName <> " AS " <> sql) diff --git a/hstream/src/HStream/Client/SQL.hs b/hstream/src/HStream/Client/SQL.hs index 905d7abdb..9b2dbaaf8 100644 --- a/hstream/src/HStream/Client/SQL.hs +++ b/hstream/src/HStream/Client/SQL.hs @@ -22,18 +22,8 @@ import qualified Data.Text as T import qualified Data.Text.Encoding as T import qualified Data.Text.IO as T import qualified Data.Vector as V -import Network.GRPC.HighLevel.Generated (ClientError (ClientIOError), - ClientRequest (..), - ClientResult (..), - GRPCIOError (GRPCIOBadStatusCode), - StatusDetails (unStatusDetails), - withGRPCClient) -import Network.GRPC.LowLevel.Call (clientCallCancel) import qualified System.Console.Haskeline as RL import qualified System.Console.Haskeline.History as RL -import System.Posix (Handler (Catch), - installHandler, - keyboardSignal) import Text.RawString.QQ (r) import HStream.Client.Action (createConnector, @@ -46,28 +36,23 @@ import HStream.Client.Action (createConnector, import HStream.Client.Execute (execute, executeShowPlan, executeWithLookupResource_, execute_, updateClusterInfo) -import HStream.Client.Internal (callDeleteSubscription, - callDeleteSubscriptionAll, - callListSubscriptions, - callStreamingFetch, - callSubscription) +import HStream.Client.Internal (cliFetch) import HStream.Client.Types (HStreamSqlContext (..), ResourceType (..)) import HStream.Client.Utils (calculateShardId, dropPlanToResType) -import HStream.Server.HStreamApi (CommandPushQuery (..), - CommandQuery (..), +import HStream.Server.HStreamApi (CommandQuery (..), CommandQueryResponse (..), HStreamApi (..), hstreamApiClient) import qualified HStream.Server.HStreamApi as API -import HStream.SQL (DropObject (..), - HStreamPlan (..), +import HStream.SQL (HStreamPlan (..), PauseObject (..), RCreate (..), RSQL (..), ResumeObject (..), hstreamCodegen, parseAndRefine) +import HStream.SQL.Codegen (DropObject (..)) import HStream.SQL.Exception (SomeSQLException, formatSomeSQLException, isEOF) @@ -76,8 +61,9 @@ import HStream.Utils (HStreamClientApi, formatResult, genUnique, mkGRPCClientConfWithSSL, serverNodeToSocketAddr) - --- FIXME: Currently, every new command will create a new connection to a server, +import Network.GRPC.HighLevel.Client (ClientRequest (..), + ClientResult (..)) +import Network.GRPC.HighLevel.Generated (withGRPCClient) -- and this needs to be optimized. This could be done with a grpc client pool. interactiveSQLApp :: HStreamSqlContext -> Maybe FilePath -> IO () interactiveSQLApp ctx@HStreamSqlContext{..} historyFile = do @@ -116,14 +102,14 @@ commandExec :: HStreamSqlContext -> String -> IO () commandExec ctx@HStreamSqlContext{..} xs = case words xs of [] -> return () - -- The Following commands are for testing only - -- { - ":sub":subId:stream:_ -> callSubscription ctx (T.pack subId) (T.pack stream) - ":delSub":subId:_ -> callDeleteSubscription ctx (T.pack subId) - ":delAllSubs":_ -> callDeleteSubscriptionAll ctx - ":fetch":subId:_ -> HStream.Utils.genUnique >>= callStreamingFetch ctx V.empty (T.pack subId) . T.pack . show - ":listSubs":_ -> callListSubscriptions ctx - -- } + -- -- The Following commands are for testing only + -- -- { + -- ":sub":subId:stream:_ -> callSubscription ctx (T.pack subId) (T.pack stream) + -- ":delSub":subId:_ -> callDeleteSubscription ctx (T.pack subId) + -- ":delAllSubs":_ -> callDeleteSubscriptionAll ctx + -- ":fetch":subId:_ -> genClientId >>= callStreamingFetch ctx V.empty (T.pack subId) + -- ":listSubs":_ -> callListSubscriptions ctx + -- -- } ":h": _ -> putStrLn helpInfo [":help"] -> putStr groupedHelpInfo @@ -132,7 +118,7 @@ commandExec ctx@HStreamSqlContext{..} xs = case words xs of (_:_) -> liftIO $ handle (\(e :: SomeSQLException) -> putStrLn . formatSomeSQLException $ e) $ do rSQL <- parseAndRefine $ T.pack xs case rSQL of - RQPushSelect{} -> runActionWithGrpc ctx (\api -> sqlStreamAction api (T.pack xs)) + RQPushSelect{} -> cliFetch ctx xs RQCreate RCreateAs {} -> execute_ ctx $ createStreamBySelect xs rSql' -> hstreamCodegen rSql' >>= \case @@ -176,26 +162,6 @@ readToSQL acc = do pure Nothing Right _ -> pure . Just $ T.unpack acc -sqlStreamAction :: HStream.Utils.HStreamClientApi -> T.Text -> IO () -sqlStreamAction HStreamApi{..} sql = do - let commandPushQuery = CommandPushQuery{ commandPushQueryQueryText = sql } - resp <- hstreamApiExecutePushQuery (ClientReaderRequest commandPushQuery 10000000 mempty action) - case resp of - ClientReaderResponse {} -> return () - ClientErrorResponse (ClientIOError (GRPCIOBadStatusCode _ details)) -> do - T.putStrLn $ "Error: " <> T.decodeUtf8 (unStatusDetails details) - ClientErrorResponse err -> do - putStrLn $ "Error: " <> (case err of ClientIOError ge -> show ge; _ -> show err) <> ", please try a different server node" - where - action call _meta recv = do - msg <- withInterrupt (clientCallCancel call) recv - case msg of - Left err -> print err - Right Nothing -> putStrLn ("\x1b[32m" <> "Terminated" <> "\x1b[0m") - Right (Just result) -> do - putStr $ HStream.Utils.formatResult result - action call _meta recv - sqlAction :: HStream.Utils.HStreamClientApi -> T.Text -> IO () sqlAction HStreamApi{..} sql = do let commandQuery = CommandQuery{ commandQueryStmtText = sql } @@ -205,11 +171,6 @@ sqlAction HStreamApi{..} sql = do putStr $ HStream.Utils.formatCommandQueryResponse x ClientErrorResponse _ -> putStr $ HStream.Utils.formatResult resp -withInterrupt :: IO () -> IO a -> IO a -withInterrupt interruptHandle act = do - old_handler <- installHandler keyboardSignal (Catch interruptHandle) Nothing - act `finally` installHandler keyboardSignal old_handler Nothing - helpInfo :: String helpInfo = [r| diff --git a/hstream/src/HStream/Client/Utils.hs b/hstream/src/HStream/Client/Utils.hs index 7eea71c66..46af7e622 100644 --- a/hstream/src/HStream/Client/Utils.hs +++ b/hstream/src/HStream/Client/Utils.hs @@ -7,13 +7,18 @@ module HStream.Client.Utils ( clientDefaultRequest - , mkClientNormalRequest' - , requestTimeout + , calculateShardId , extractSelect + , genClientId + , mkClientNormalRequest' , printResult + , dropPlanToResType + , requestTimeout + , subscriptionWithDefaultSetting + , terminateMsg , waitForServerToStart - , calculateShardId - , dropPlanToResType) where + , withInterrupt + , removeEmitChanges) where import Control.Concurrent (threadDelay) import Crypto.Hash.MD5 (hash) @@ -26,13 +31,24 @@ import Network.GRPC.HighLevel.Client import Network.GRPC.HighLevel.Generated (withGRPCClient) import Proto3.Suite.Class (HasDefault, def) +import Control.Exception (finally) +import Data.Functor ((<&>)) +import Data.Int (Int32) +import Data.String (IsString) import HStream.Client.Types (ResourceType (..)) import qualified HStream.Server.HStreamApi as API import HStream.SQL (DropObject (..)) import HStream.Utils (Format (formatResult), - SocketAddr (..), + SocketAddr (..), genUnique, mkClientNormalRequest, mkGRPCClientConfWithSSL) +import Proto3.Suite (Enumerated (..)) +import System.Posix (Handler (Catch), + installHandler, + keyboardSignal) + +terminateMsg :: IsString a => a +terminateMsg = "\x1b[32mTerminated\x1b[0m" clientDefaultRequest :: HasDefault a => ClientRequest 'Normal a b clientDefaultRequest = mkClientNormalRequest' def @@ -40,6 +56,19 @@ clientDefaultRequest = mkClientNormalRequest' def requestTimeout :: Int requestTimeout = 1000 +subAckTimeout :: Int32 +subAckTimeout = 1 + +subMaxUnack :: Int32 +subMaxUnack = 100 + +subscriptionWithDefaultSetting :: T.Text -> T.Text -> API.Subscription +subscriptionWithDefaultSetting subscriptionSubscriptionId subscriptionStreamName = + API.Subscription { subscriptionAckTimeoutSeconds = subAckTimeout + , subscriptionMaxUnackedRecords = subMaxUnack + , subscriptionOffset = Enumerated (Right API.SpecialOffsetLATEST) + , ..} + mkClientNormalRequest' :: a -> ClientRequest 'Normal a b mkClientNormalRequest' = mkClientNormalRequest requestTimeout @@ -87,6 +116,18 @@ dropPlanToResType :: DropObject -> ResourceType dropPlanToResType (DConnector cid ) = ResConnector cid dropPlanToResType DView{} = undefined dropPlanToResType DStream{} = undefined + +genClientId :: IO T.Text +genClientId = genUnique <&> (("hstream_cli_client_" <>) . T.pack . show) + +withInterrupt :: IO () -> IO a -> IO a +withInterrupt interruptHandle act = do + old_handler <- installHandler keyboardSignal (Catch interruptHandle) Nothing + act `finally` installHandler keyboardSignal old_handler Nothing + +removeEmitChanges :: [String] -> String +removeEmitChanges = unwords . reverse . (";" :) . drop 1 . dropWhile ((/= "EMIT") . map toUpper) . reverse + -------------------------------------------------------------------------------- printResult :: Format a => a -> IO ()