From 86be4fe376809e2d2f6c2f3f2936b20fc5676668 Mon Sep 17 00:00:00 2001 From: mu <59917266+4eUeP@users.noreply.github.com> Date: Tue, 12 Sep 2023 12:28:52 +0800 Subject: [PATCH 1/2] hstream-kafka: add an OffsetManager --- cabal.project | 2 +- common/kafka/hstream-kafka.cabal | 6 + .../kafka/src/Kafka/Common/OffsetManager.hs | 115 ++++++++++++++++++ common/kafka/src/Kafka/Common/RecordFormat.hs | 18 +++ .../server/hstream-admin-server.cabal | 2 +- .../Store/Internal/LogDevice/Reader.hs | 10 +- hstream-store/HStream/Store/Stream.hs | 21 ++-- hstream-store/hstream-store.cabal | 2 +- 8 files changed, 160 insertions(+), 16 deletions(-) create mode 100644 common/kafka/src/Kafka/Common/OffsetManager.hs create mode 100644 common/kafka/src/Kafka/Common/RecordFormat.hs diff --git a/cabal.project b/cabal.project index eac7f24ca..5114f6e63 100644 --- a/cabal.project +++ b/cabal.project @@ -56,7 +56,7 @@ package rocksdb-haskell-bindings extra-lib-dirs: /usr/local/lib constraints: - Z-Data == 2.0.0.1 + Z-Data == 2.0.0.2 , zoovisitor == 0.2.6.1 , blaze-textual == 0.2.1.0 , entropy == 0.4.1.7 diff --git a/common/kafka/hstream-kafka.cabal b/common/kafka/hstream-kafka.cabal index 162e85e07..988db0c64 100644 --- a/common/kafka/hstream-kafka.cabal +++ b/common/kafka/hstream-kafka.cabal @@ -43,6 +43,8 @@ common shared-properties library import: shared-properties exposed-modules: + Kafka.Common.OffsetManager + Kafka.Common.RecordFormat Kafka.Protocol Kafka.Protocol.Encoding Kafka.Protocol.Error @@ -61,11 +63,15 @@ library build-depends: , base >=4.11 && <5 , bytestring + , containers , digest + , hashtables , hstream-common-base + , hstream-store , network , text , vector + , Z-Data default-language: GHC2021 default-extensions: diff --git a/common/kafka/src/Kafka/Common/OffsetManager.hs b/common/kafka/src/Kafka/Common/OffsetManager.hs new file mode 100644 index 000000000..42759c570 --- /dev/null +++ b/common/kafka/src/Kafka/Common/OffsetManager.hs @@ -0,0 +1,115 @@ +{-# LANGUAGE BangPatterns #-} + +module Kafka.Common.OffsetManager + ( OffsetManager + , newOffsetManager + , withOffset + , withOffsetN + , cleanOffsetCache + , getOldestOffset + , getLatestOffset + ) where + +import Control.Concurrent +import Control.Exception +import Control.Monad +import Data.ByteString (ByteString) +import qualified Data.HashTable.IO as H +import Data.Int +import Data.Word +import GHC.Stack (HasCallStack) + +import qualified HStream.Store as S +import qualified HStream.Store.Internal.LogDevice as S +import Kafka.Common.RecordFormat +import qualified Kafka.Protocol.Encoding as K + +------------------------------------------------------------------------------- + +type HashTable k v = H.BasicHashTable k v + +data OffsetManager = OffsetManager + { offsets :: HashTable Word64 (MVar Int64) + -- ^ Offsets cache + -- + -- TODO: + -- * use FastMutInt as value (?) + , offsetsLock :: MVar () + , store :: S.LDClient + , reader :: S.LDReader + } + +newOffsetManager :: S.LDClient -> Int -> IO OffsetManager +newOffsetManager store maxLogs = do + offsets <- H.new + offsetsLock <- newMVar () + reader <- S.newLDReader store (fromIntegral maxLogs) Nothing + pure OffsetManager{..} + +withOffset :: OffsetManager -> Word64 -> (Int64 -> IO a) -> IO a +withOffset m logid = withOffsetN m logid 0 + +-- thread safe version +-- +-- NOTE: n must >= 1 and < (maxBound :: Int32) +withOffsetN :: OffsetManager -> Word64 -> Int64 -> (Int64 -> IO a) -> IO a +withOffsetN m@OffsetManager{..} logid n f = do + m_offset <- H.lookup offsets logid + case m_offset of + Just offset -> modifyMVar offset $ \o -> do + let !o' = o + n + !a <- f o' + pure (o', a) + Nothing -> withMVar offsetsLock $ \_ -> do + o' <- catch (do mo <- getLatestOffset m logid + pure $ maybe (n - 1) (+ n) mo) + (\(_ :: S.NOTFOUND) -> pure $ n - 1) + H.insert offsets logid =<< newMVar o' + f o' + +cleanOffsetCache :: OffsetManager -> Word64 -> IO () +cleanOffsetCache OffsetManager{..} logid = H.delete offsets logid + +getOldestOffset :: HasCallStack => OffsetManager -> Word64 -> IO (Maybe Int64) +getOldestOffset OffsetManager{..} logid = do + isEmpty <- S.isLogEmpty store logid + if isEmpty + then pure Nothing + else do + -- Actually, we only need the first lsn but there is no easy way to get + Just . offset <$> readOneRecord reader logid S.LSN_MIN S.LSN_MAX + +getLatestOffset :: HasCallStack => OffsetManager -> Word64 -> IO (Maybe Int64) +getLatestOffset OffsetManager{..} logid = do + -- FIXME: first check is empty log seems blocking. + isEmpty <- S.isLogEmpty store logid + if isEmpty + then pure Nothing + else do tailLsn <- S.getTailLSN store logid + Just . offset <$> readOneRecord reader logid tailLsn tailLsn + +-- TODO +-- getOffsetByTime :: HasCallStack => OffsetManager -> Word64 -> Int64 -> IO Int64 +-- getOffsetByTime OffsetManager{..} logid timestamp = undefined + +------------------------------------------------------------------------------- + +-- Return the first read RecordFormat +-- +-- FIXME: what happens when read an empty log? +readOneRecord + :: HasCallStack + => S.LDReader -> Word64 -> S.LSN -> S.LSN -> IO RecordFormat +readOneRecord reader logid start end = finally acquire release + where + acquire = do + S.readerSetTimeout reader 1000 + S.readerStartReading reader logid start end + dataRecords <- S.readerRead @ByteString reader 1 + case dataRecords of + [S.DataRecord{..}] -> K.runGet recordPayload + xs -> -- TODO + ioError $ userError $ "Invalid reader result " <> show xs + release = do + isReading <- S.readerIsReading reader logid + when isReading $ S.readerStopReading reader logid diff --git a/common/kafka/src/Kafka/Common/RecordFormat.hs b/common/kafka/src/Kafka/Common/RecordFormat.hs new file mode 100644 index 000000000..182353e06 --- /dev/null +++ b/common/kafka/src/Kafka/Common/RecordFormat.hs @@ -0,0 +1,18 @@ +module Kafka.Common.RecordFormat + ( RecordFormat (..) + ) where + +import Data.ByteString (ByteString) +import Data.Int +import GHC.Generics (Generic) + +import qualified Kafka.Protocol.Encoding as K + +-- Format to store in logdevice +data RecordFormat = RecordFormat + { offset :: Int64 + , batchLength :: Int32 + , recordBytes :: ByteString + } deriving (Generic, Show) + +instance K.Serializable RecordFormat diff --git a/hstream-admin/server/hstream-admin-server.cabal b/hstream-admin/server/hstream-admin-server.cabal index b3a85b507..ee2a905d6 100644 --- a/hstream-admin/server/hstream-admin-server.cabal +++ b/hstream-admin/server/hstream-admin-server.cabal @@ -82,7 +82,7 @@ library , deepseq , grpc-haskell , grpc-haskell-core - , hashable >=1.2.7.0 && <1.4 + , hashable >=1.2.7.0 && <1.5 , haskeline ^>=0.8.1 , hstream-api-hs , hstream-common diff --git a/hstream-store/HStream/Store/Internal/LogDevice/Reader.hs b/hstream-store/HStream/Store/Internal/LogDevice/Reader.hs index 0916c55dc..dd5e281c6 100644 --- a/hstream-store/HStream/Store/Internal/LogDevice/Reader.hs +++ b/hstream-store/HStream/Store/Internal/LogDevice/Reader.hs @@ -121,10 +121,12 @@ startReadingFromCheckpoint reader logid untilSeq = E.throwStreamErrorIfNotOK . warnSlow $ ld_checkpointed_reader_start_reading_from_ckp ptr logid untilSeq - -- Note: The exception NOTFOUND means the log is not being read, either because - -- readerStartReading() was never called (or readerStopReading() was - -- called), or because `until` LSN was reached -readerStopReading :: LDReader -> C_LogID -> IO () +-- Exceptions: +-- NOTFOUND the log is not being read, either because readerStartReading() +-- was never called (or readerStopReading() was called), or because +-- `until` LSN was reached +-- Any exceptions from AsyncReader::stopReading +readerStopReading :: HasCallStack => LDReader -> C_LogID -> IO () readerStopReading reader logid = withForeignPtr reader $ \ptr -> void $ E.throwStreamErrorIfNotOK $ c_ld_reader_stop_reading ptr logid diff --git a/hstream-store/HStream/Store/Stream.hs b/hstream-store/HStream/Store/Stream.hs index b7a6b1ade..63ad5f404 100644 --- a/hstream-store/HStream/Store/Stream.hs +++ b/hstream-store/HStream/Store/Stream.hs @@ -75,8 +75,6 @@ module HStream.Store.Stream -- * Logdevice Checkpoint Store , FFI.LDCheckpointStore - , initCheckpointStoreLogID - , checkpointStoreLogID , LD.newFileBasedCheckpointStore , LD.newRSMBasedCheckpointStore , LD.newZookeeperBasedCheckpointStore @@ -135,6 +133,10 @@ module HStream.Store.Stream , LD.removeCheckpoints , LD.removeAllCheckpoints + -- * Internal logs + , checkpointStoreLogID + , initCheckpointStoreLogID + -- * Internal helpers , getStreamDirPath , getStreamLogPath @@ -649,15 +651,16 @@ getStreamIdFromLogId client logid = do -- bit: 00...1...00 initCheckpointStoreLogID :: FFI.LDClient -> LD.LogAttributes -> IO FFI.C_LogID initCheckpointStoreLogID client attrs = do - -- FIXME: First get and then create is NOT suitable for multiple concurrent - -- server. This may throw EXISTS exception. - r <- try $ LD.getLogGroupByID client checkpointStoreLogID + -- NOTE: first get and then create is NOT suitable for multiple concurrent + -- server. So here we use makeLogGroup directly and handle EXISTS exception. + r <- try $ LD.makeLogGroup client "/hstream/internal/checkpoint" + checkpointStoreLogID checkpointStoreLogID + attrs True case r of - Left (_ :: E.NOTFOUND) -> do - _ <- LD.makeLogGroup client "/hstream/internal/checkpoint" checkpointStoreLogID checkpointStoreLogID attrs True - return checkpointStoreLogID - Right _ -> return checkpointStoreLogID + Left (_ :: E.EXISTS) -> return checkpointStoreLogID + Right _ -> return checkpointStoreLogID +-- logid greater or equal than (bit 56) is reserved for internal use. checkpointStoreLogID :: FFI.C_LogID checkpointStoreLogID = bit 56 diff --git a/hstream-store/hstream-store.cabal b/hstream-store/hstream-store.cabal index e5252f0ad..058b3ea52 100644 --- a/hstream-store/hstream-store.cabal +++ b/hstream-store/hstream-store.cabal @@ -87,7 +87,7 @@ library , data-default ^>=0.7 , filepath ^>=1.4.2 , ghc-prim >=0.5 && <1.0 - , hashable ^>=1.3.5 + , hashable >=1.3.5 && <1.5 , hstream-common-base , primitive ^>=0.7 , vector >=0.12 && <0.14 From d3a3387064974efab840596587464941b57cbaf6 Mon Sep 17 00:00:00 2001 From: mu <59917266+4eUeP@users.noreply.github.com> Date: Wed, 13 Sep 2023 10:47:44 +0800 Subject: [PATCH 2/2] fix sql tests --- hstream-diffflow/test/DiffFlow/LoopSpec.hs | 133 ++++++++++++++------- hstream-sql/etc/syntax-test-cases.yaml | 6 +- 2 files changed, 96 insertions(+), 43 deletions(-) diff --git a/hstream-diffflow/test/DiffFlow/LoopSpec.hs b/hstream-diffflow/test/DiffFlow/LoopSpec.hs index acf6e5cd2..3561beda2 100644 --- a/hstream-diffflow/test/DiffFlow/LoopSpec.hs +++ b/hstream-diffflow/test/DiffFlow/LoopSpec.hs @@ -157,6 +157,19 @@ checkStep2 isDone reach_m reachSummary_m = describe "check reach summary out" $ #if MIN_VERSION_aeson(2,0,0) -------------------------------------------------------------------------------- +#if MIN_VERSION_hashable(1,4,0) +dcbs1 :: [[DataChange Object Word32]] +dcbs1 = dcbs1' + +dcbs2 :: [[DataChange Object Word32]] +dcbs2 = dcbs2' + +dcbs3 :: [[DataChange Object Word32]] +dcbs3 = dcbs3' + +dcbs4 :: [[DataChange Object Word32]] +dcbs4 = dcbs4' +#else dcbs1 :: [[DataChange Object Word32]] dcbs1 = [ [DataChange (A.fromList [("v1", "c"), ("v2", "a")]) (Timestamp 0 []) 1] , [DataChange (A.fromList [("v1", "a"), ("v2", "b")]) (Timestamp 0 []) 1] @@ -221,6 +234,7 @@ dcbs4 = [ [DataChange (A.fromList [("v1", "b"), ("reduced", "cd")]) (Timestamp 1 , [DataChange (A.fromList [("v1", "b"), ("reduced", "cda" )]) (Timestamp 1 [3]) 1 , DataChange (A.fromList [("v1", "b"), ("reduced", "cdab")]) (Timestamp 1 [3]) (-1)] ] +#endif -------------------------------------------------------------------------------- #else @@ -244,53 +258,92 @@ dcbs1 = [ [DataChange (A.fromList [("v1", "b"), ("v2", "c")]) (Timestamp 0 []) 1 ] dcbs2 :: [[DataChange Object Word32]] -dcbs2 = [ [DataChange (A.fromList [("v1", "b"), ("reduced", "cd")]) (Timestamp 0 [1]) 1] - , [DataChange (A.fromList [("v1", "a"), ("reduced", "b" )]) (Timestamp 0 [1]) 1] - , [DataChange (A.fromList [("v1", "c"), ("reduced", "a" )]) (Timestamp 0 [1]) 1] - - , [DataChange (A.fromList [("v1", "b"), ("reduced", "cd" )]) (Timestamp 0 [2]) (-1) - , DataChange (A.fromList [("v1", "b"), ("reduced", "cda")]) (Timestamp 0 [2]) 1] - , [DataChange (A.fromList [("v1", "a"), ("reduced", "b" )]) (Timestamp 0 [2]) (-1) - , DataChange (A.fromList [("v1", "a"), ("reduced", "bcd")]) (Timestamp 0 [2]) 1] - , [DataChange (A.fromList [("v1", "c"), ("reduced", "a" )]) (Timestamp 0 [2]) (-1) - , DataChange (A.fromList [("v1", "c"), ("reduced", "ab" )]) (Timestamp 0 [2]) 1] - - , [DataChange (A.fromList [("v1", "b"), ("reduced", "cda" )]) (Timestamp 0 [3]) (-1) - , DataChange (A.fromList [("v1", "b"), ("reduced", "cdab")]) (Timestamp 0 [3]) 1] - , [DataChange (A.fromList [("v1", "a"), ("reduced", "bcd" )]) (Timestamp 0 [3]) (-1) - , DataChange (A.fromList [("v1", "a"), ("reduced", "bcda")]) (Timestamp 0 [3]) 1] - , [DataChange (A.fromList [("v1", "c"), ("reduced", "ab" )]) (Timestamp 0 [3]) (-1) - , DataChange (A.fromList [("v1", "c"), ("reduced", "abcd")]) (Timestamp 0 [3]) 1] - ] +dcbs2 = dcbs2' dcbs3 :: [[DataChange Object Word32]] -dcbs3 = [ [DataChange (A.fromList [("v1", "b"), ("v2", "c")]) (Timestamp 1 []) (-1)] +dcbs3 = + [ [DataChange (A.fromList [("v1", "b"), ("v2", "c")]) (Timestamp 1 []) (-1)] - , [DataChange (A.fromList [("v1", "a"), ("v2", "c")]) (Timestamp 1 []) (-1)] - , [DataChange (A.fromList [("v1", "b"), ("v2", "a")]) (Timestamp 1 []) (-1)] + , [DataChange (A.fromList [("v1", "a"), ("v2", "c")]) (Timestamp 1 []) (-1)] + , [DataChange (A.fromList [("v1", "b"), ("v2", "a")]) (Timestamp 1 []) (-1)] - , [DataChange (A.fromList [("v1", "b"), ("v2", "b")]) (Timestamp 1 []) (-1)] - , [DataChange (A.fromList [("v1", "c"), ("v2", "c")]) (Timestamp 1 []) (-1)] - , [DataChange (A.fromList [("v1", "a"), ("v2", "a")]) (Timestamp 1 []) (-1)] - ] + , [DataChange (A.fromList [("v1", "b"), ("v2", "b")]) (Timestamp 1 []) (-1)] + , [DataChange (A.fromList [("v1", "c"), ("v2", "c")]) (Timestamp 1 []) (-1)] + , [DataChange (A.fromList [("v1", "a"), ("v2", "a")]) (Timestamp 1 []) (-1)] + ] dcbs4 :: [[DataChange Object Word32]] -dcbs4 = [ [DataChange (A.fromList [("v1", "b"), ("reduced", "cd")]) (Timestamp 1 [1]) (-1) - , DataChange (A.fromList [("v1", "b"), ("reduced", "d" )]) (Timestamp 1 [1]) 1] - - , [DataChange (A.fromList [("v1", "b"), ("reduced", "cd" )]) (Timestamp 1 [2]) 1 - , DataChange (A.fromList [("v1", "b"), ("reduced", "cda")]) (Timestamp 1 [2]) (-1)] - , [DataChange (A.fromList [("v1", "a"), ("reduced", "bcd")]) (Timestamp 1 [2]) (-1) - , DataChange (A.fromList [("v1", "a"), ("reduced", "bd" )]) (Timestamp 1 [2]) 1] - - , [DataChange (A.fromList [("v1", "b"), ("reduced", "cda" )]) (Timestamp 1 [3]) 1 - , DataChange (A.fromList [("v1", "b"), ("reduced", "cdab")]) (Timestamp 1 [3]) (-1)] - , [DataChange (A.fromList [("v1", "a"), ("reduced", "bcd" )]) (Timestamp 1 [3]) 1 - , DataChange (A.fromList [("v1", "a"), ("reduced", "bcda")]) (Timestamp 1 [3]) (-1)] - , [DataChange (A.fromList [("v1", "c"), ("reduced", "abcd")]) (Timestamp 1 [3]) (-1) - , DataChange (A.fromList [("v1", "c"), ("reduced", "abd" )]) (Timestamp 1 [3]) 1] - ] +dcbs4 = dcbs4' -------------------------------------------------------------------------------- #endif -------------------------------------------------------------------------------- + +dcbs1' :: [[DataChange Object Word32]] +dcbs1' = + [ [DataChange (A.fromList [("v1", "a"), ("v2", "b")]) (Timestamp 0 []) 1] + , [DataChange (A.fromList [("v1", "b"), ("v2", "d")]) (Timestamp 0 []) 1] + , [DataChange (A.fromList [("v1", "b"), ("v2", "c")]) (Timestamp 0 []) 1] + , [DataChange (A.fromList [("v1", "c"), ("v2", "a")]) (Timestamp 0 []) 1] + + , [DataChange (A.fromList [("v1", "a"), ("v2", "d")]) (Timestamp 0 []) 1] + , [DataChange (A.fromList [("v1", "b"), ("v2", "a")]) (Timestamp 0 []) 1] + , [DataChange (A.fromList [("v1", "c"), ("v2", "b")]) (Timestamp 0 []) 1] + , [DataChange (A.fromList [("v1", "a"), ("v2", "c")]) (Timestamp 0 []) 1] + + , [DataChange (A.fromList [("v1", "a"), ("v2", "a")]) (Timestamp 0 []) 1] + , [DataChange (A.fromList [("v1", "b"), ("v2", "b")]) (Timestamp 0 []) 1] + , [DataChange (A.fromList [("v1", "c"), ("v2", "d")]) (Timestamp 0 []) 1] + , [DataChange (A.fromList [("v1", "c"), ("v2", "c")]) (Timestamp 0 []) 1] + ] + +dcbs2' :: [[DataChange Object Word32]] +dcbs2' = + [ [DataChange (A.fromList [("v1", "b"), ("reduced", "cd")]) (Timestamp 0 [1]) 1] + , [DataChange (A.fromList [("v1", "a"), ("reduced", "b" )]) (Timestamp 0 [1]) 1] + , [DataChange (A.fromList [("v1", "c"), ("reduced", "a" )]) (Timestamp 0 [1]) 1] + + , [DataChange (A.fromList [("v1", "b"), ("reduced", "cd" )]) (Timestamp 0 [2]) (-1) + , DataChange (A.fromList [("v1", "b"), ("reduced", "cda")]) (Timestamp 0 [2]) 1] + , [DataChange (A.fromList [("v1", "a"), ("reduced", "b" )]) (Timestamp 0 [2]) (-1) + , DataChange (A.fromList [("v1", "a"), ("reduced", "bcd")]) (Timestamp 0 [2]) 1] + , [DataChange (A.fromList [("v1", "c"), ("reduced", "a" )]) (Timestamp 0 [2]) (-1) + , DataChange (A.fromList [("v1", "c"), ("reduced", "ab" )]) (Timestamp 0 [2]) 1] + + , [DataChange (A.fromList [("v1", "b"), ("reduced", "cda" )]) (Timestamp 0 [3]) (-1) + , DataChange (A.fromList [("v1", "b"), ("reduced", "cdab")]) (Timestamp 0 [3]) 1] + , [DataChange (A.fromList [("v1", "a"), ("reduced", "bcd" )]) (Timestamp 0 [3]) (-1) + , DataChange (A.fromList [("v1", "a"), ("reduced", "bcda")]) (Timestamp 0 [3]) 1] + , [DataChange (A.fromList [("v1", "c"), ("reduced", "ab" )]) (Timestamp 0 [3]) (-1) + , DataChange (A.fromList [("v1", "c"), ("reduced", "abcd")]) (Timestamp 0 [3]) 1] + ] + +dcbs3' :: [[DataChange Object Word32]] +dcbs3' = + [ [DataChange (A.fromList [("v1", "b"), ("v2", "c")]) (Timestamp 1 []) (-1)] + + , [DataChange (A.fromList [("v1", "b"), ("v2", "a")]) (Timestamp 1 []) (-1)] + , [DataChange (A.fromList [("v1", "a"), ("v2", "c")]) (Timestamp 1 []) (-1)] + + , [DataChange (A.fromList [("v1", "a"), ("v2", "a")]) (Timestamp 1 []) (-1)] + , [DataChange (A.fromList [("v1", "b"), ("v2", "b")]) (Timestamp 1 []) (-1)] + , [DataChange (A.fromList [("v1", "c"), ("v2", "c")]) (Timestamp 1 []) (-1)] + ] + +dcbs4' :: [[DataChange Object Word32]] +dcbs4' = + [ [DataChange (A.fromList [("v1", "b"), ("reduced", "cd")]) (Timestamp 1 [1]) (-1) + , DataChange (A.fromList [("v1", "b"), ("reduced", "d" )]) (Timestamp 1 [1]) 1] + + , [DataChange (A.fromList [("v1", "b"), ("reduced", "cd" )]) (Timestamp 1 [2]) 1 + , DataChange (A.fromList [("v1", "b"), ("reduced", "cda")]) (Timestamp 1 [2]) (-1)] + , [DataChange (A.fromList [("v1", "a"), ("reduced", "bcd")]) (Timestamp 1 [2]) (-1) + , DataChange (A.fromList [("v1", "a"), ("reduced", "bd" )]) (Timestamp 1 [2]) 1] + + , [DataChange (A.fromList [("v1", "b"), ("reduced", "cda" )]) (Timestamp 1 [3]) 1 + , DataChange (A.fromList [("v1", "b"), ("reduced", "cdab")]) (Timestamp 1 [3]) (-1)] + , [DataChange (A.fromList [("v1", "a"), ("reduced", "bcd" )]) (Timestamp 1 [3]) 1 + , DataChange (A.fromList [("v1", "a"), ("reduced", "bcda")]) (Timestamp 1 [3]) (-1)] + , [DataChange (A.fromList [("v1", "c"), ("reduced", "abcd")]) (Timestamp 1 [3]) (-1) + , DataChange (A.fromList [("v1", "c"), ("reduced", "abd" )]) (Timestamp 1 [3]) 1] + ] diff --git a/hstream-sql/etc/syntax-test-cases.yaml b/hstream-sql/etc/syntax-test-cases.yaml index 213a85eb6..2a16447b7 100644 --- a/hstream-sql/etc/syntax-test-cases.yaml +++ b/hstream-sql/etc/syntax-test-cases.yaml @@ -238,9 +238,9 @@ testSuiteCases: - testCaseFail: null testCaseLabel: Create Connectors testCaseResult: RQCreate (RCreateConnector "SOURCE" "source01" "mysql" False (RConnectorOptions - (fromList [("port",Number 3306.0),("host",String "mysql-s1"),("password",String - "password"),("user",String "root"),("database",String "d1"),("table",String "person"),("stream",String - "stream01")]))) + (fromList [("password",String "password"),("table",String "person"),("database",String + "d1"),("port",Number 3306.0),("stream",String "stream01"),("host",String "mysql-s1"),("user",String + "root")]))) testCaseStmts: - create source connector source01 from mysql with ("host" = 'mysql-s1', "port" = 3306, "user" = 'root', "password" = 'password', "database" = 'd1', "table" =