Skip to content

Commit

Permalink
Remove push query (#1129)
Browse files Browse the repository at this point in the history
  • Loading branch information
Time-Hu authored Oct 28, 2022
1 parent 2c86934 commit 8ffb39c
Show file tree
Hide file tree
Showing 11 changed files with 197 additions and 224 deletions.
37 changes: 35 additions & 2 deletions common/HStream/Utils/Format.hs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ class Format a where
instance Format Protobuf.Empty where
formatResult = const "Done.\n"

instance Format () where
formatResult = const ""

instance Format API.Stream where
formatResult = (<> "\n") . T.unpack . API.streamStreamName

Expand All @@ -62,12 +65,17 @@ instance Format API.Query where
instance Format API.Connector where
formatResult = renderConnectorsToTable . (:[])

instance Format API.Subscription where
formatResult = renderSubscriptionsToTable . (:[])
instance Format [API.Query] where
formatResult = emptyNotice . renderQueriesToTable

instance Format [API.Connector] where
formatResult = emptyNotice . renderConnectorsToTable

instance Format [API.Subscription] where
formatResult = emptyNotice . renderSubscriptionsToTable

instance Format [API.ServerNode] where
formatResult = emptyNotice . renderServerNodesToTable

Expand Down Expand Up @@ -96,7 +104,7 @@ instance Format API.TerminateQueriesResponse where
formatResult = const "Done.\n"

instance Format P.Struct where
formatResult (P.Struct kv) =
formatResult s@(P.Struct kv) =
case M.toList kv of
[("SELECT", Just x)] -> (<> "\n") . TL.unpack . A.encodeToLazyText . valueToJsonValue $ x
[("SELECTVIEW", Just x)] -> (<> "\n") . TL.unpack . A.encodeToLazyText . valueToJsonValue $ x
Expand All @@ -105,7 +113,7 @@ instance Format P.Struct where
[("view_query_id", Just x)] -> let (A.String qid) = valueToJsonValue x
in "Done. Query ID: " <> T.unpack qid <> "\n"
[("Error Message:", Just v)] -> "Error Message: " ++ show v ++ "\n"
x -> show x
_ -> (<> "\n") . TL.unpack . A.encodeToLazyText . structToJsonObject $ s

instance Format API.CommandQueryResponse where
formatResult = formatCommandQueryResponse
Expand Down Expand Up @@ -145,6 +153,31 @@ renderQueriesToTable queries =
, Table.column Table.expand Table.left def def
]

renderSubscriptionsToTable :: [API.Subscription] -> String
renderSubscriptionsToTable [] = ""
renderSubscriptionsToTable subscriptions =
Table.tableString colSpec Table.asciiS
(Table.fullH (repeat $ Table.headerColumn Table.left Nothing) titles)
(Table.colsAllG Table.center <$> rows) ++ "\n"
where
titles = [ "Subscription ID"
, "Stream Name"
, "Ack Timeout"
, "Max Unacked Records"
]
formatRow API.Subscription {..} =
[ [T.unpack subscriptionSubscriptionId]
, [T.unpack subscriptionStreamName]
, [show subscriptionAckTimeoutSeconds <> "s"]
, [show subscriptionMaxUnackedRecords]
]
rows = map formatRow subscriptions
colSpec = [ Table.column Table.expand Table.left def def
, Table.column Table.expand Table.left def def
, Table.column Table.expand Table.left def def
, Table.column Table.expand Table.left def def
]

renderConnectorsToTable :: [API.Connector] -> String
renderConnectorsToTable [] = ""
renderConnectorsToTable connectors =
Expand Down
2 changes: 1 addition & 1 deletion hstream-sql/etc/SQL.cf
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ DropStream. DropOption ::= "STREAM" ;
DropView. DropOption ::= "VIEW" ;

-- Terminate Query
TerminateQuery. Terminate ::= "TERMINATE" "QUERY" Integer ;
TerminateQuery. Terminate ::= "TERMINATE" "QUERY" Ident ;
TerminateAll. Terminate ::= "TERMINATE" "ALL";

---- Explain
Expand Down
6 changes: 3 additions & 3 deletions hstream-sql/src/HStream/SQL/AST.hs
Original file line number Diff line number Diff line change
Expand Up @@ -834,12 +834,12 @@ type instance RefinedType DropOption = RDropOption

---- Terminate
data RTerminate
= RTerminateQuery String
= RTerminateQuery Text
| RTerminateAll
deriving (Eq, Show)
instance Refine Terminate where
refine (TerminateQuery _ x) = RTerminateQuery (show x)
refine (TerminateAll _ ) = RTerminateAll
refine (TerminateQuery _ (Ident x)) = RTerminateQuery x
refine (TerminateAll _ ) = RTerminateAll
type instance RefinedType Terminate = RTerminate

---- Pause
Expand Down
2 changes: 1 addition & 1 deletion hstream-sql/src/HStream/SQL/Codegen.hs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ hstreamCodegen = \case
RQDrop (RDropIf RDropConnector x) -> return $ DropPlan True (DConnector x)
RQDrop (RDropIf RDropStream x) -> return $ DropPlan True (DStream x)
RQDrop (RDropIf RDropView x) -> return $ DropPlan True (DView x)
RQTerminate (RTerminateQuery qid) -> return $ TerminatePlan (OneQuery $ T.pack qid)
RQTerminate (RTerminateQuery qid) -> return $ TerminatePlan (OneQuery qid)
RQTerminate RTerminateAll -> return $ TerminatePlan AllQueries
--RQSelectView rSelectView -> return $ SelectViewPlan rSelectView
RQExplain rexplain -> return $ ExplainPlan rexplain
Expand Down
21 changes: 6 additions & 15 deletions hstream/app/client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,14 @@ import qualified Data.List as L
import Data.Maybe (isNothing, mapMaybe,
maybeToList)
import qualified Data.Text.Encoding as T
import qualified Data.Text.IO as T
import qualified Data.Vector as V
import Network.GRPC.HighLevel.Client (ClientConfig,
ClientSSLConfig (..),
ClientSSLKeyCertPair (..))
import Network.GRPC.HighLevel.Generated (ClientError (ClientIOError),
import Network.GRPC.HighLevel.Generated (ClientError (..),
ClientResult (..),
GRPCIOError (GRPCIOBadStatusCode),
GRPCMethodType (..),
StatusDetails (unStatusDetails),
GRPCIOError (..),
StatusDetails (..),
withGRPCClient)
import qualified Options.Applicative as O
import System.Exit (exitFailure)
Expand All @@ -51,15 +49,16 @@ import HStream.Client.Types (CliConnOpts (..),
HStreamSqlOpts (..),
commandParser)
import HStream.Client.Utils (mkClientNormalRequest',
printResult,
waitForServerToStart)
import HStream.Server.HStreamApi (DescribeClusterResponse (..),
HStreamApi (..),
ServerNode (serverNodeId),
ServerNode (..),
hstreamApiClient)
import qualified HStream.Server.HStreamApi as API
import HStream.SQL (DropObject (..))
import HStream.ThirdParty.Protobuf (Empty (Empty))
import HStream.Utils (Format, SocketAddr (..),
import HStream.Utils (SocketAddr (..),
fillWithJsonString',
formatResult,
mkGRPCClientConfWithSSL,
Expand Down Expand Up @@ -196,14 +195,6 @@ hstreamStream RefinedCliConnOpts{..} = \case
simpleExecuteWithAddr addr sslConfig (\HStreamApi{..} -> hstreamApiCreateStream (mkClientNormalRequest' stream)) >>= printResult
StreamCmdDelete sName ignoreNonExist ->
simpleExecuteWithAddr addr sslConfig (dropAction ignoreNonExist (DStream sName)) >>= printResult
where
printResult :: Format response => ClientResult 'Normal response -> IO ()
printResult = \case
ClientNormalResponse x _ _ _ _ -> putStrLn $ formatResult x
ClientErrorResponse (ClientIOError (GRPCIOBadStatusCode _ details)) ->
T.putStrLn $ "Error: " <> T.decodeUtf8 (unStatusDetails details)
ClientErrorResponse err -> do
putStrLn $ "Error: " <> show err

getNodes :: RefinedCliConnOpts -> IO DescribeClusterResponse
getNodes RefinedCliConnOpts{..} =
Expand Down
1 change: 1 addition & 0 deletions hstream/hstream.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ library
, scientific
, statgrab
, stm
, string-random
, suspend
, table-layout
, text
Expand Down
35 changes: 26 additions & 9 deletions hstream/src/HStream/Client/Action.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,24 @@ module HStream.Client.Action
( Action

, createStream
, listStreams
, listViews
, listQueries
, listConnectors
, terminateQueries
, createStreamBySelect
, createConnector
, createSubscription
, createSubscription'
, describeCluster
, dropAction
, deleteSubscription
, insertIntoStream
, createStreamBySelect
, listConnectors
, listStreams
, listQueries
, listShards
, lookupResource
, describeCluster
, pauseConnector
, resumeConnector
, createConnector
, listSubscriptions
, listViews
, terminateQueries
) where

import qualified Data.ByteString as BS
Expand All @@ -46,6 +50,8 @@ import HStream.SQL.Codegen (DropObject (..),
import HStream.ThirdParty.Protobuf (Empty (..))
import HStream.Utils

type Action a = HStreamClientApi -> IO (ClientResult 'Normal a)

createStream :: StreamName -> Int
-> Action API.Stream
createStream sName rFac API.HStreamApi{..} =
Expand All @@ -62,6 +68,8 @@ listQueries :: Action API.ListQueriesResponse
listQueries API.HStreamApi{..} = hstreamApiListQueries clientDefaultRequest
listConnectors :: Action API.ListConnectorsResponse
listConnectors API.HStreamApi{..} = hstreamApiListConnectors clientDefaultRequest
listSubscriptions :: Action API.ListSubscriptionsResponse
listSubscriptions API.HStreamApi{..} = hstreamApiListSubscriptions clientDefaultRequest

terminateQueries :: TerminationSelection
-> HStreamClientApi
Expand Down Expand Up @@ -121,7 +129,6 @@ createConnector sql API.HStreamApi{..} =
hstreamApiCreateConnector (mkClientNormalRequest' def
{ API.createConnectorRequestSql = T.pack sql})

type Action a = HStreamClientApi -> IO (ClientResult 'Normal a)

listShards :: T.Text -> Action API.ListShardsResponse
listShards sName API.HStreamApi{..} = do
Expand Down Expand Up @@ -151,6 +158,16 @@ resumeConnector :: T.Text -> Action Empty
resumeConnector cid HStreamApi{..} = hstreamApiResumeConnector $
mkClientNormalRequest' def { resumeConnectorRequestName = cid }

createSubscription :: T.Text -> T.Text -> Action Subscription
createSubscription subId sName = createSubscription' (subscriptionWithDefaultSetting subId sName)

createSubscription' :: Subscription -> Action Subscription
createSubscription' sub HStreamApi{..} = hstreamApiCreateSubscription $ mkClientNormalRequest' sub

deleteSubscription :: T.Text -> Action Empty
deleteSubscription subId HStreamApi{..} = hstreamApiDeleteSubscription $
mkClientNormalRequest' def { deleteSubscriptionRequestSubscriptionId = subId }

--------------------------------------------------------------------------------

fakeMap :: (a -> b) -> ClientResult 'Normal a -> ClientResult 'Normal b
Expand Down
8 changes: 4 additions & 4 deletions hstream/src/HStream/Client/Execute.hs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import HStream.Client.Types (HStreamSqlContext (..),
import HStream.Client.Utils
import qualified HStream.Server.HStreamApi as API
import HStream.SQL
import HStream.Utils (Format, SocketAddr,
getServerResp,
import HStream.Utils (Format, HStreamClientApi,
SocketAddr, getServerResp,
mkGRPCClientConfWithSSL,
serverNodeToSocketAddr)

Expand All @@ -51,7 +51,7 @@ execute_ ctx@HStreamSqlContext{..} action = do
void $ executeWithAddr_ ctx addr action printResult

executeWithLookupResource_ :: Format a => HStreamSqlContext
-> ResourceType -> Action a -> IO ()
-> ResourceType -> (HStreamClientApi -> IO a) -> IO ()
executeWithLookupResource_ ctx@HStreamSqlContext{..} rtype action = do
addr <- readMVar currentServer
lookupWithAddr ctx addr rtype >>= \case
Expand Down Expand Up @@ -123,6 +123,6 @@ getInfoWithAddr ctx@HStreamSqlContext{..} addr action cont = do
void . swapMVar currentServer $ addr
cont resp

simpleExecuteWithAddr :: SocketAddr -> Maybe ClientSSLConfig -> Action a -> IO (ClientResult 'Normal a)
simpleExecuteWithAddr :: SocketAddr -> Maybe ClientSSLConfig -> (HStreamClientApi -> IO a) -> IO a
simpleExecuteWithAddr addr sslConfig action =
withGRPCClient (mkGRPCClientConfWithSSL addr sslConfig) (API.hstreamApiClient >=> action)
Loading

0 comments on commit 8ffb39c

Please sign in to comment.