Skip to content

Commit

Permalink
feat: add queryName for view & add execute view query (#1335)
Browse files Browse the repository at this point in the history
  • Loading branch information
Time-Hu authored Mar 31, 2023
1 parent a1dbdbd commit 973012b
Show file tree
Hide file tree
Showing 12 changed files with 88 additions and 41 deletions.
3 changes: 3 additions & 0 deletions common/hstream/HStream/Utils/Format.hs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ instance Format API.AppendResponse where
instance Format API.TerminateQueriesResponse where
formatResult = const "Done.\n"

instance Format API.ExecuteViewQueryResponse where
formatResult = concatMap ((<> "\n") . TL.unpack . A.encodeToLazyText . structToJsonObject ) . API.executeViewQueryResponseResults

instance Format PB.Struct where
formatResult s@(PB.Struct kv) =
case M.toList kv of
Expand Down
2 changes: 1 addition & 1 deletion external/protocol
Submodule protocol updated 1 files
+8 −0 hstream.proto
7 changes: 6 additions & 1 deletion hstream/src/HStream/Client/Action.hs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ module HStream.Client.Action
, lookupResource
, describeCluster

,retry
, executeViewQuery

, retry
) where

import Control.Concurrent (threadDelay)
Expand Down Expand Up @@ -204,6 +206,9 @@ getStream sName HStreamApi{..} = hstreamApiGetStream $ mkClientNormalRequest' de
getSubscription :: T.Text -> Action GetSubscriptionResponse
getSubscription sid HStreamApi{..} = hstreamApiGetSubscription $ mkClientNormalRequest' def { getSubscriptionRequestId = sid }

executeViewQuery :: String -> Action ExecuteViewQueryResponse
executeViewQuery sql HStreamApi{..} = hstreamApiExecuteViewQuery $ mkClientNormalRequest' def { executeViewQueryRequestSql = T.pack sql }

--------------------------------------------------------------------------------

fakeMap :: (a -> b) -> ClientResult 'Normal a -> ClientResult 'Normal b
Expand Down
6 changes: 4 additions & 2 deletions hstream/src/HStream/Client/SQL.hs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ import HStream.Client.Action (createConnector,
createStream,
createStreamBySelect,
createStreamBySelectWithCustomQueryName,
dropAction, insertIntoStream,
listShards, pauseConnector,
dropAction, executeViewQuery,
insertIntoStream, listShards,
pauseConnector,
resumeConnector, retry,
terminateQueries)
import HStream.Client.Execute (execute, executeShowPlan,
Expand Down Expand Up @@ -130,6 +131,7 @@ commandExec HStreamSqlContext{hstreamCliContext = cliCtx@HStreamCliContext{..},.
RQCreate RCreateAs {} -> do
qName <- ("cli_generated_" <>) <$> newRandomText 10
executeWithLookupResource_ cliCtx (Resource ResQuery qName) (createStreamBySelectWithCustomQueryName xs qName)
RQSelect {} -> execute_ cliCtx $ executeViewQuery xs
rSql' -> hstreamCodegen rSql' >>= \case
ShowPlan showObj -> executeShowPlan cliCtx showObj
-- FIXME: add lookup after supporting lookup stream and lookup view
Expand Down
20 changes: 1 addition & 19 deletions hstream/src/HStream/Server/Core/Query.hs
Original file line number Diff line number Diff line change
Expand Up @@ -105,25 +105,7 @@ executeQuery sc@ServerContext{..} CommandQuery{..} = do
Core.createView' sc view ins out builder accumulation commandQueryStmtText
pure $ API.CommandQueryResponse (mkVectorStruct queryId "view_query_id")
#else
SelectPlan sources sink builder persist -> do
roles_m <- mapM (findIdentifierRole sc) sources
case all (== Just RoleView) roles_m of
False -> do
Log.warning "Can not perform non-pushing SELECT on streams."
throwIO $ HE.InvalidSqlStatement "Can not perform non-pushing SELECT on streams."
True -> do
hm <- readIORef P.groupbyStores
let mats = L.map ((HM.!) hm) sources

sinkRecords_m <- newIORef []
let sinkConnector = HStore.memorySinkConnector sinkRecords_m
HP.runImmTask (sources `zip` mats) sinkConnector builder () () Just Just
sinkRecords <- readIORef sinkRecords_m

let flowObjects = (L.map (fromJust . Aeson.decode . snkValue) sinkRecords) :: [FlowObject]
case flowObjects of
[] -> sendResp mempty
_ -> sendResp (V.fromList $ flowObjectToJsonObject <$> flowObjects)
SelectPlan {} -> discard "ExecuteViewQuery"
CreateViewPlan sources sink view builder persist -> do
validateNameAndThrow sink
validateNameAndThrow view
Expand Down
48 changes: 39 additions & 9 deletions hstream/src/HStream/Server/Core/View.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,45 @@ module HStream.Server.Core.View
( deleteView
, getView
, listViews
, executeViewQuery
, createView
, createView'
) where

import Control.Exception (throw, throwIO)
import qualified Data.HashMap.Strict as HM
import Data.IORef (atomicModifyIORef')
import qualified Data.Text as T
import GHC.Stack (HasCallStack)

import Control.Concurrent (MVar)
import Control.Exception (throw, throwIO)
import Control.Monad (unless)
import qualified Data.Aeson as Aeson
import Data.Functor ((<&>))
import qualified Data.HashMap.Strict as HM
import Data.IORef (atomicModifyIORef', newIORef,
readIORef)
import qualified Data.List as L
import Data.Maybe (fromJust, isJust)
import qualified Data.Text as T
import qualified Data.Vector as V
import GHC.Stack (HasCallStack)
import HStream.Exception (ViewNotFound (..))
import qualified HStream.Exception as HE
import qualified HStream.Logger as Log
import HStream.MetaStore.Types (MetaStore (insertMeta))
import qualified HStream.MetaStore.Types as M
import HStream.Processing.Type (SinkRecord (..))
import HStream.Server.Core.Common (handleQueryTerminate)
import HStream.Server.Handler.Common (IdentifierRole (..),
findIdentifierRole,
handleCreateAsSelect)
import qualified HStream.Server.HStore as SH
import qualified HStream.Server.HStreamApi as API
import qualified HStream.Server.MetaData as P
import HStream.Server.MetaData.Types (ViewInfo (viewName))
import HStream.Server.Types
import HStream.SQL (FlowObject)
import HStream.ThirdParty.Protobuf (Empty (..))
import HStream.Utils (TaskStatus (..), newRandomText)
import HStream.SQL (FlowObject,
flowObjectToJsonObject)
import HStream.ThirdParty.Protobuf (Empty (..), Struct)
import HStream.Utils (TaskStatus (..),
jsonObjectToStruct,
newRandomText)
#ifdef HStreamUseV2Engine
import DiffFlow.Graph (GraphBuilder)
import DiffFlow.Types (DataChangeBatch)
Expand Down Expand Up @@ -144,6 +152,27 @@ getView ServerContext{..} viewId = do
Log.warning $ "Cannot Find View with Name: " <> Log.buildString (T.unpack viewId)
throwIO $ ViewNotFound viewId

executeViewQuery :: ServerContext -> T.Text -> IO (V.Vector Struct)
executeViewQuery sc@ServerContext{..} sql = do
plan <- streamCodegen sql
case plan of
SelectPlan sources sink builder persist -> do
roles_m <- mapM (findIdentifierRole sc) sources
case all (== Just RoleView) roles_m of
False -> do
Log.warning "Can not perform non-pushing SELECT on streams."
throwIO $ HE.InvalidSqlStatement "Can not perform non-pushing SELECT on streams."
True -> do
hm <- readIORef P.groupbyStores
let mats = L.map (hm HM.!) sources
sinkRecords_m <- newIORef []
let sinkConnector = SH.memorySinkConnector sinkRecords_m
HP.runImmTask (sources `zip` mats) sinkConnector builder () () Just Just
sinkRecords <- readIORef sinkRecords_m
return . V.fromList $ jsonObjectToStruct . flowObjectToJsonObject
. fromJust . Aeson.decode @FlowObject . snkValue <$> sinkRecords
_ -> throw $ HE.InvalidSqlStatement "Invalid SQL statement for running view query"

listViews :: HasCallStack => ServerContext -> IO [API.View]
listViews ServerContext{..} = mapM (hstreamViewToView metaHandle) =<< M.listMeta metaHandle

Expand All @@ -156,4 +185,5 @@ hstreamViewToView h P.ViewInfo{viewQuery = P.QueryInfo{..},..} = do
, viewCreatedTime = queryCreatedTime
, viewSchema = mempty
, viewSql = querySql
, viewQueryName = queryId
}
1 change: 1 addition & 0 deletions hstream/src/HStream/Server/Handler.hs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ handlers serverContext@ServerContext{..} =
hstreamApiGetView = getViewHandler serverContext,
hstreamApiListViews = listViewsHandler serverContext,
hstreamApiDeleteView = deleteViewHandler serverContext,
hstreamApiExecuteViewQuery = executeViewQueryHandler serverContext,

-- Cluster
hstreamApiDescribeCluster = describeClusterHandler serverContext,
Expand Down
21 changes: 20 additions & 1 deletion hstream/src/HStream/Server/Handler/View.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,28 @@ module HStream.Server.Handler.View
listViewsHandler
, getViewHandler
, deleteViewHandler
, executeViewQueryHandler
-- * For hs-grpc-server
, handleListView
, handleGetView
, handleDeleteView
, handleExecuteViewQuery
) where

import qualified Data.ByteString.Lazy as BL
import qualified Data.Text as T
import qualified Data.Vector as V
import qualified HsGrpc.Server as G
import Network.GRPC.HighLevel.Generated
import qualified Proto3.Suite as PT

import qualified HStream.Logger as Log
import qualified HStream.Server.Core.View as Core
import HStream.Server.Exception (catchDefaultEx,
defaultExceptionHandle)
import HStream.Server.HStreamApi
import HStream.Server.Types
import HStream.ThirdParty.Protobuf (Empty (..))
import HStream.ThirdParty.Protobuf (Empty (..), Struct)
import HStream.Utils (returnResp)

listViewsHandler
Expand Down Expand Up @@ -71,3 +75,18 @@ handleDeleteView sc _ DeleteViewRequest{..} = catchDefaultEx $ do
Log.debug $ "Receive Delete View Request. "
<> "View ID:" <> Log.buildString (T.unpack deleteViewRequestViewId)
Core.deleteView sc deleteViewRequestViewId deleteViewRequestIgnoreNonExist

executeViewQueryHandler
:: ServerContext -> ServerRequest 'Normal ExecuteViewQueryRequest ExecuteViewQueryResponse
-> IO (ServerResponse 'Normal ExecuteViewQueryResponse)
executeViewQueryHandler sc (ServerNormalRequest _metadata ExecuteViewQueryRequest{..}) = defaultExceptionHandle $ do
Log.debug $ "Receive Execute View Query Request. "
<> "SQL Statement:" <> Log.build executeViewQueryRequestSql
returnResp . ExecuteViewQueryResponse =<< Core.executeViewQuery sc executeViewQueryRequestSql

handleExecuteViewQuery
:: ServerContext -> G.UnaryHandler ExecuteViewQueryRequest ExecuteViewQueryResponse
handleExecuteViewQuery sc _ ExecuteViewQueryRequest{..} = catchDefaultEx $ do
Log.debug $ "Receive Execute View Query Request. "
<> "SQL Statement:" <> Log.build executeViewQueryRequestSql
ExecuteViewQueryResponse <$> Core.executeViewQuery sc executeViewQueryRequestSql
1 change: 1 addition & 0 deletions hstream/src/HStream/Server/HsGrpcHandler.hs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ handlers sc =
, unary (GRPC :: GRPC P.HStreamApi "getView") (H.handleGetView sc)
, unary (GRPC :: GRPC P.HStreamApi "listViews") (H.handleListView sc)
, unary (GRPC :: GRPC P.HStreamApi "deleteView") (H.handleDeleteView sc)
, unary (GRPC :: GRPC P.HStreamApi "executeViewQuery") (H.handleExecuteViewQuery sc)
-- Query
, unary (GRPC :: GRPC P.HStreamApi "terminateQueries") (H.handleTerminateQueries sc)
, unary (GRPC :: GRPC P.HStreamApi "executeQuery") (H.handleExecuteQuery sc)
Expand Down
4 changes: 2 additions & 2 deletions hstream/test/HStream/RegressionSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ spec = aroundAll provideHstreamApi $
threadDelay 500000
runInsertSql api "INSERT INTO s6 (key1, key2, key3) VALUES (4, \"hello_00000000000000000000\", true);"
threadDelay 20000000
runQuerySimple api "SELECT * FROM v6 WHERE key3 = FALSE;"
`grpcShouldReturn` mkViewResponse (mkStruct [ ("SUM(key1)", Aeson.Number 4)
runViewQuerySql api "SELECT * FROM v6 WHERE key3 = FALSE;"
`shouldReturn` mkViewResponse (mkStruct [ ("SUM(key1)", Aeson.Number 4)
, ("key2", Aeson.String "hello_00000000000000000001")
, ("key3", Aeson.Bool False)]
)
Expand Down
8 changes: 4 additions & 4 deletions hstream/test/HStream/RunSQLSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ viewSpec =
threadDelay 500000
runInsertSql api $ "INSERT INTO " <> source1 <> " (a) VALUES (2);"
threadDelay 10000000
runQuerySimple api ("SELECT * FROM " <> viewName <> " WHERE b = 1;")
`grpcShouldReturn` mkViewResponse (mkStruct [ ("SUM(a)", Aeson.Number 3)
runViewQuerySql api ("SELECT * FROM " <> viewName <> " WHERE b = 1;")
`shouldReturn` mkViewResponse (mkStruct [ ("SUM(a)", Aeson.Number 3)
, ("b", Aeson.Number 1)
])

Expand All @@ -149,7 +149,7 @@ viewSpec =
threadDelay 500000
runInsertSql api $ "INSERT INTO " <> source1 <> " (a) VALUES (4);"
threadDelay 10000000
runQuerySimple api ("SELECT * FROM " <> viewName <> " WHERE b = 1;")
`grpcShouldReturn` mkViewResponse (mkStruct [ ("SUM(a)", Aeson.Number 10)
runViewQuerySql api ("SELECT * FROM " <> viewName <> " WHERE b = 1;")
`shouldReturn` mkViewResponse (mkStruct [ ("SUM(a)", Aeson.Number 10)
, ("b", Aeson.Number 1)
])
8 changes: 6 additions & 2 deletions hstream/test/HStream/SpecUtils.hs
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,8 @@ appendRequest HStreamApi{..} streamName shardId records =
mkStruct :: [(Text, Aeson.Value)] -> Struct
mkStruct = jsonObjectToStruct . AesonComp.fromList . (map $ first AesonComp.fromText)

mkViewResponse :: Struct -> CommandQueryResponse
mkViewResponse = CommandQueryResponse . V.singleton . structToStruct "SELECTVIEW"
mkViewResponse :: Struct -> ExecuteViewQueryResponse
mkViewResponse = ExecuteViewQueryResponse . V.singleton

runFetchSql :: T.Text -> IO [Struct]
runFetchSql sql = withGRPCClient clientConfig $ \client -> do
Expand Down Expand Up @@ -278,3 +278,7 @@ runDropSql :: HStreamClientApi -> T.Text -> Expectation
runDropSql api sql = do
DropPlan checkIfExists dropObj <- streamCodegen sql
dropAction checkIfExists dropObj api `grpcShouldReturn` Empty

runViewQuerySql :: HStreamClientApi -> T.Text -> IO ExecuteViewQueryResponse
runViewQuerySql api sql =
getServerResp =<< executeViewQuery (T.unpack sql) api

0 comments on commit 973012b

Please sign in to comment.