From 664cecb0f56ceeb11f7aa95108c3c730c57dd7c5 Mon Sep 17 00:00:00 2001 From: daleiz <30970925+daleiz@users.noreply.github.com> Date: Mon, 3 Apr 2023 18:29:33 +0800 Subject: [PATCH] add a window_end column for aggregated results based on time window (#1341) --- hstream-sql/src/HStream/SQL/Codegen/V1.hs | 4 +- .../src/HStream/SQL/Codegen/V1/Boilerplate.hs | 63 +++++++------------ 2 files changed, 25 insertions(+), 42 deletions(-) diff --git a/hstream-sql/src/HStream/SQL/Codegen/V1.hs b/hstream-sql/src/HStream/SQL/Codegen/V1.hs index aa1a66ee0..a8d17cd32 100644 --- a/hstream-sql/src/HStream/SQL/Codegen/V1.hs +++ b/hstream-sql/src/HStream/SQL/Codegen/V1.hs @@ -404,7 +404,7 @@ relationExprToGraph relation builder = case relation of >>= HTW.aggregate aggregateInit aggregateR HM.union - (timeWindowFlowObjectSerde $ calendarDiffTimeToMs i) + timeWindowFlowObjectSerde (timeWindowSerde $ calendarDiffTimeToMs i) flowObjectSerde materialized @@ -415,7 +415,7 @@ relationExprToGraph relation builder = case relation of >>= HTW.aggregate aggregateInit aggregateR HM.union - (timeWindowFlowObjectSerde $ calendarDiffTimeToMs i1) + timeWindowFlowObjectSerde (timeWindowSerde $ calendarDiffTimeToMs i1) flowObjectSerde materialized diff --git a/hstream-sql/src/HStream/SQL/Codegen/V1/Boilerplate.hs b/hstream-sql/src/HStream/SQL/Codegen/V1/Boilerplate.hs index f1508d882..17bb0369f 100644 --- a/hstream-sql/src/HStream/SQL/Codegen/V1/Boilerplate.hs +++ b/hstream-sql/src/HStream/SQL/Codegen/V1/Boilerplate.hs @@ -17,6 +17,7 @@ import qualified Data.HashMap.Strict as HM import Data.Maybe (fromJust) import Data.Scientific (Scientific (coefficient), coefficient, scientific) +import qualified Data.Text as T import qualified Data.Text.Lazy as TL import qualified Data.Text.Lazy.Encoding as TLE import HStream.Processing.Encoding @@ -28,6 +29,12 @@ import qualified Data.Aeson.Key as Key import qualified Data.Aeson.KeyMap as KeyMap #endif +winStartText :: T.Text +winStartText = "window_start" + +winEndText :: T.Text +winEndText = "window_end" + instance Serialized FlowObject where compose (fo1, fo2) = let o = compose ( flowObjectToJsonObject fo1 @@ -117,70 +124,46 @@ sessionWindowSerde = endTs <- getInt64be return TimeWindow {tWindowStart = startTs, tWindowEnd = endTs} -timeWindowObjectSerde :: Int64 -> Serde TimeWindow Object -timeWindowObjectSerde windowSize = -#if MIN_VERSION_aeson(2,0,0) - Serde - { serializer = Serializer $ \TimeWindow{..} -> - let winStart = [(Key.fromText "winStart", Aeson.Number $ scientific (toInteger tWindowStart) 0)] - in KeyMap.fromList winStart - , deserializer = Deserializer $ \obj -> - let (Aeson.Number start) = fromJust $ (KeyMap.lookup) "winStart" obj - startInt64 = fromInteger $ coefficient start - in mkTimeWindow startInt64 (startInt64 + windowSize) - } -#else - Serde - { serializer = Serializer $ \TimeWindow{..} -> - let winStart = [("winStart", Aeson.Number $ scientific (toInteger tWindowStart) 0)] - in HM.fromList winStart - , deserializer = Deserializer $ \obj -> - let (Aeson.Number start) = (HM.!) obj "winStart" - startInt64 = fromInteger $ coefficient start - in mkTimeWindow startInt64 (startInt64 + windowSize) - } -#endif - -sessionWindowObjectSerde :: Serde TimeWindow Object -sessionWindowObjectSerde = +timeWindowObjectSerde :: Serde TimeWindow Object +timeWindowObjectSerde = #if MIN_VERSION_aeson(2,0,0) Serde { serializer = Serializer $ \TimeWindow{..} -> - let winStart = [(Key.fromText "winStart", Aeson.Number $ scientific (toInteger tWindowStart) 0)] - winEnd = [(Key.fromText "winEnd" , Aeson.Number $ scientific (toInteger tWindowEnd ) 0)] + let winStart = [(Key.fromText winStartText, Aeson.Number $ scientific (toInteger tWindowStart) 0)] + winEnd = [(Key.fromText winEndText, Aeson.Number $ scientific (toInteger tWindowEnd ) 0)] in KeyMap.fromList $ winStart ++ winEnd , deserializer = Deserializer $ \obj -> - let (Aeson.Number start) = fromJust $ (KeyMap.lookup) "winStart" obj + let (Aeson.Number start) = fromJust $ KeyMap.lookup (Key.fromText winStartText) obj startInt64 = fromInteger $ coefficient start - (Aeson.Number end) = fromJust $ (KeyMap.lookup) "winEnd" obj + (Aeson.Number end) = fromJust $ KeyMap.lookup (Key.fromText winEndText) obj endInt64 = fromInteger $ coefficient end in mkTimeWindow startInt64 endInt64 } #else Serde { serializer = Serializer $ \TimeWindow{..} -> - let winStart = [("winStart", Aeson.Number $ scientific (toInteger tWindowStart) 0)] - winEnd = [("winEnd" , Aeson.Number $ scientific (toInteger tWindowEnd ) 0)] + let winStart = [(winStartText, Aeson.Number $ scientific (toInteger tWindowStart) 0)] + winEnd = [(winEndText, Aeson.Number $ scientific (toInteger tWindowEnd ) 0)] in HM.fromList $ winStart ++ winEnd , deserializer = Deserializer $ \obj -> - let (Aeson.Number start) = (HM.!) obj "winStart" + let (Aeson.Number start) = fromJust $ KeyMap.lookup (Key.fromText winStartText) obj startInt64 = fromInteger $ coefficient start - (Aeson.Number end) = (HM.!) obj "winEnd" + (Aeson.Number end) = fromJust $ KeyMap.lookup (Key.fromText winEndText) obj endInt64 = fromInteger $ coefficient end in mkTimeWindow startInt64 endInt64 } #endif -timeWindowFlowObjectSerde :: Int64 -> Serde TimeWindow FlowObject -timeWindowFlowObjectSerde windowSize = +timeWindowFlowObjectSerde :: Serde TimeWindow FlowObject +timeWindowFlowObjectSerde = Serde - { serializer = Serializer $ \tw -> (jsonObjectToFlowObject "") $ (runSer . serializer $ timeWindowObjectSerde windowSize) tw - , deserializer = Deserializer $ \fo -> (runDeser . deserializer $ timeWindowObjectSerde windowSize) (flowObjectToJsonObject fo) + { serializer = Serializer $ \tw -> (jsonObjectToFlowObject "") $ (runSer . serializer $ timeWindowObjectSerde) tw + , deserializer = Deserializer $ \fo -> (runDeser . deserializer $ timeWindowObjectSerde) (flowObjectToJsonObject fo) } sessionWindowFlowObjectSerde :: Serde TimeWindow FlowObject sessionWindowFlowObjectSerde = Serde - { serializer = Serializer $ \tw -> (jsonObjectToFlowObject "") $ (runSer . serializer $ sessionWindowObjectSerde) tw - , deserializer = Deserializer $ \fo -> (runDeser . deserializer $ sessionWindowObjectSerde) (flowObjectToJsonObject fo) + { serializer = Serializer $ \tw -> (jsonObjectToFlowObject "") $ (runSer . serializer $ timeWindowObjectSerde) tw + , deserializer = Deserializer $ \fo -> (runDeser . deserializer $ timeWindowObjectSerde) (flowObjectToJsonObject fo) }