Skip to content

Commit

Permalink
add a window_end column for aggregated results based on time window (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
daleiz authored Apr 3, 2023
1 parent 973012b commit 664cecb
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 42 deletions.
4 changes: 2 additions & 2 deletions hstream-sql/src/HStream/SQL/Codegen/V1.hs
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ relationExprToGraph relation builder = case relation of
>>= HTW.aggregate aggregateInit
aggregateR
HM.union
(timeWindowFlowObjectSerde $ calendarDiffTimeToMs i)
timeWindowFlowObjectSerde
(timeWindowSerde $ calendarDiffTimeToMs i)
flowObjectSerde
materialized
Expand All @@ -415,7 +415,7 @@ relationExprToGraph relation builder = case relation of
>>= HTW.aggregate aggregateInit
aggregateR
HM.union
(timeWindowFlowObjectSerde $ calendarDiffTimeToMs i1)
timeWindowFlowObjectSerde
(timeWindowSerde $ calendarDiffTimeToMs i1)
flowObjectSerde
materialized
Expand Down
63 changes: 23 additions & 40 deletions hstream-sql/src/HStream/SQL/Codegen/V1/Boilerplate.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import qualified Data.HashMap.Strict as HM
import Data.Maybe (fromJust)
import Data.Scientific (Scientific (coefficient),
coefficient, scientific)
import qualified Data.Text as T
import qualified Data.Text.Lazy as TL
import qualified Data.Text.Lazy.Encoding as TLE
import HStream.Processing.Encoding
Expand All @@ -28,6 +29,12 @@ import qualified Data.Aeson.Key as Key
import qualified Data.Aeson.KeyMap as KeyMap
#endif

winStartText :: T.Text
winStartText = "window_start"

winEndText :: T.Text
winEndText = "window_end"

instance Serialized FlowObject where
compose (fo1, fo2) =
let o = compose ( flowObjectToJsonObject fo1
Expand Down Expand Up @@ -117,70 +124,46 @@ sessionWindowSerde =
endTs <- getInt64be
return TimeWindow {tWindowStart = startTs, tWindowEnd = endTs}

timeWindowObjectSerde :: Int64 -> Serde TimeWindow Object
timeWindowObjectSerde windowSize =
#if MIN_VERSION_aeson(2,0,0)
Serde
{ serializer = Serializer $ \TimeWindow{..} ->
let winStart = [(Key.fromText "winStart", Aeson.Number $ scientific (toInteger tWindowStart) 0)]
in KeyMap.fromList winStart
, deserializer = Deserializer $ \obj ->
let (Aeson.Number start) = fromJust $ (KeyMap.lookup) "winStart" obj
startInt64 = fromInteger $ coefficient start
in mkTimeWindow startInt64 (startInt64 + windowSize)
}
#else
Serde
{ serializer = Serializer $ \TimeWindow{..} ->
let winStart = [("winStart", Aeson.Number $ scientific (toInteger tWindowStart) 0)]
in HM.fromList winStart
, deserializer = Deserializer $ \obj ->
let (Aeson.Number start) = (HM.!) obj "winStart"
startInt64 = fromInteger $ coefficient start
in mkTimeWindow startInt64 (startInt64 + windowSize)
}
#endif

sessionWindowObjectSerde :: Serde TimeWindow Object
sessionWindowObjectSerde =
timeWindowObjectSerde :: Serde TimeWindow Object
timeWindowObjectSerde =
#if MIN_VERSION_aeson(2,0,0)
Serde
{ serializer = Serializer $ \TimeWindow{..} ->
let winStart = [(Key.fromText "winStart", Aeson.Number $ scientific (toInteger tWindowStart) 0)]
winEnd = [(Key.fromText "winEnd" , Aeson.Number $ scientific (toInteger tWindowEnd ) 0)]
let winStart = [(Key.fromText winStartText, Aeson.Number $ scientific (toInteger tWindowStart) 0)]
winEnd = [(Key.fromText winEndText, Aeson.Number $ scientific (toInteger tWindowEnd ) 0)]
in KeyMap.fromList $ winStart ++ winEnd
, deserializer = Deserializer $ \obj ->
let (Aeson.Number start) = fromJust $ (KeyMap.lookup) "winStart" obj
let (Aeson.Number start) = fromJust $ KeyMap.lookup (Key.fromText winStartText) obj
startInt64 = fromInteger $ coefficient start
(Aeson.Number end) = fromJust $ (KeyMap.lookup) "winEnd" obj
(Aeson.Number end) = fromJust $ KeyMap.lookup (Key.fromText winEndText) obj
endInt64 = fromInteger $ coefficient end
in mkTimeWindow startInt64 endInt64
}
#else
Serde
{ serializer = Serializer $ \TimeWindow{..} ->
let winStart = [("winStart", Aeson.Number $ scientific (toInteger tWindowStart) 0)]
winEnd = [("winEnd" , Aeson.Number $ scientific (toInteger tWindowEnd ) 0)]
let winStart = [(winStartText, Aeson.Number $ scientific (toInteger tWindowStart) 0)]
winEnd = [(winEndText, Aeson.Number $ scientific (toInteger tWindowEnd ) 0)]
in HM.fromList $ winStart ++ winEnd
, deserializer = Deserializer $ \obj ->
let (Aeson.Number start) = (HM.!) obj "winStart"
let (Aeson.Number start) = fromJust $ KeyMap.lookup (Key.fromText winStartText) obj
startInt64 = fromInteger $ coefficient start
(Aeson.Number end) = (HM.!) obj "winEnd"
(Aeson.Number end) = fromJust $ KeyMap.lookup (Key.fromText winEndText) obj
endInt64 = fromInteger $ coefficient end
in mkTimeWindow startInt64 endInt64
}
#endif

timeWindowFlowObjectSerde :: Int64 -> Serde TimeWindow FlowObject
timeWindowFlowObjectSerde windowSize =
timeWindowFlowObjectSerde :: Serde TimeWindow FlowObject
timeWindowFlowObjectSerde =
Serde
{ serializer = Serializer $ \tw -> (jsonObjectToFlowObject "") $ (runSer . serializer $ timeWindowObjectSerde windowSize) tw
, deserializer = Deserializer $ \fo -> (runDeser . deserializer $ timeWindowObjectSerde windowSize) (flowObjectToJsonObject fo)
{ serializer = Serializer $ \tw -> (jsonObjectToFlowObject "") $ (runSer . serializer $ timeWindowObjectSerde) tw
, deserializer = Deserializer $ \fo -> (runDeser . deserializer $ timeWindowObjectSerde) (flowObjectToJsonObject fo)
}

sessionWindowFlowObjectSerde :: Serde TimeWindow FlowObject
sessionWindowFlowObjectSerde =
Serde
{ serializer = Serializer $ \tw -> (jsonObjectToFlowObject "") $ (runSer . serializer $ sessionWindowObjectSerde) tw
, deserializer = Deserializer $ \fo -> (runDeser . deserializer $ sessionWindowObjectSerde) (flowObjectToJsonObject fo)
{ serializer = Serializer $ \tw -> (jsonObjectToFlowObject "") $ (runSer . serializer $ timeWindowObjectSerde) tw
, deserializer = Deserializer $ \fo -> (runDeser . deserializer $ timeWindowObjectSerde) (flowObjectToJsonObject fo)
}

0 comments on commit 664cecb

Please sign in to comment.