Skip to content

Commit

Permalink
bypass reader when server is in backup mode
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKian committed Jun 26, 2024
1 parent 390b6a2 commit f8f9c9f
Showing 1 changed file with 17 additions and 7 deletions.
24 changes: 17 additions & 7 deletions hstream/src/HStream/Server/Core/ShardReader.hs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ import HStream.Server.Types (BiStreamReader (..),
BiStreamReaderSender,
ServerContext (..),
ServerInternalOffset,
ShardReader (..),
ServerMode (..), ShardReader (..),
StreamReader (..), ToOffset (..),
getLogLSN, mkShardReader,
mkStreamReader)
Expand Down Expand Up @@ -143,12 +143,22 @@ readShard ServerContext{..} API.ReadShardRequest{..} = do
readRecords r@ShardReader{..} = do
let cStreamName = textToCBytes targetStream
!read_start <- getPOSIXTime
records <- readProcessGap r (fromIntegral readShardRequestMaxRecords)
Stats.serverHistogramAdd scStatsHolder Stats.SHL_ReadLatency =<< msecSince read_start
Stats.stream_stat_add_read_in_bytes scStatsHolder cStreamName (fromIntegral . sum $ map (BS.length . S.recordPayload) records)
Stats.stream_stat_add_read_in_batches scStatsHolder cStreamName (fromIntegral $ length records)
let (records', _) = filterRecords shardReaderStartTs shardReaderEndTs records
receivedRecordsVecs <- forM records' decodeRecordBatch
-- records <- readProcessGap r (fromIntegral readShardRequestMaxRecords)
-- Stats.serverHistogramAdd scStatsHolder Stats.SHL_ReadLatency =<< msecSince read_start
-- Stats.stream_stat_add_read_in_bytes scStatsHolder cStreamName (fromIntegral . sum $ map (BS.length . S.recordPayload) records)
-- Stats.stream_stat_add_read_in_batches scStatsHolder cStreamName (fromIntegral $ length records)
-- let (records', _) = filterRecords shardReaderStartTs shardReaderEndTs records
-- receivedRecordsVecs <- forM records' decodeRecordBatch
state <- readIORef serverState
receivedRecordsVecs <- case state of
ServerNormal -> do
records <- readProcessGap r (fromIntegral readShardRequestMaxRecords)
Stats.serverHistogramAdd scStatsHolder Stats.SHL_ReadLatency =<< msecSince read_start
Stats.stream_stat_add_read_in_bytes scStatsHolder cStreamName (fromIntegral . sum $ map (BS.length . S.recordPayload) records)
Stats.stream_stat_add_read_in_batches scStatsHolder cStreamName (fromIntegral $ length records)
let (records', _) = filterRecords shardReaderStartTs shardReaderEndTs records
forM records' decodeRecordBatch
ServerBackup -> return []
let res = V.fromList $ map (\(_, _, _, record) -> record) receivedRecordsVecs
Log.debug $ "reader " <> Log.build readShardRequestReaderId
<> " read " <> Log.build (V.length res) <> " batchRecords"
Expand Down

0 comments on commit f8f9c9f

Please sign in to comment.