-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
awsIteratedList causes a very tight loop on GetRecord when stream is empty #4
Comments
@ozataman could catch. I think that's serious bug. We consider addressing this by changing the
|
I think that's certainly better. However, it may still not help the user completely, as even the empty list contains a valid iterator to be used in the next query and it is not possible to extract that from this instance. It contains a bit of application context, but here is how I've handled it in the interim: -------------------------------------------------------------------------------
-- | Produce an infinite stream of records from shard.
streamRecords
:: (Functor n, MonadIO n, MonadReader AppEnv n, MonadCatch n)
=> ShardId
-> Maybe SequenceNumber
-> Maybe Int
-> Producer (ResourceT n) Record
streamRecords sid sn lim = do
nm <- either (error.toS) id <$> runEitherT getStream
let pos = case sn of
Nothing -> Latest
Just _ -> AfterSequenceNumber
gsi = GetShardIterator sid pos sn nm
iter <- lift $ getShardIteratorResShardIterator <$> runKinesis 10 gsi
go (GetRecords lim iter)
where
go r = do
a <- lift $ runKinesis 10 r
let rs = getRecordsResRecords a
unless (null rs) $ C.sourceList rs
whenJust (nextIteratedRequest r a) $ \ r' -> do
when (null rs) $ liftIO (threadDelay 1000000)
go r' |
Yes, I agree that your version makes more sense. The reason for my simple solution above is to make it usable with with the I wonder if in a production setting one may just bypass the It would be really cool if Kinesis would support long-polling... |
If the stream is empty, GetRecordResponse returns an empty list, but a valid starting point for the next request. With a fast connection, this leads to an endless, very high request throughput loop until more data arrive and slow things down. The immediate effect is receiving ThroughputExceeded exceptions with an empty stream.
Not sure what the solution can be, since internals of awsIteratedList is outside of this package's control. The right thing to do may be to remove the IteratedTransaction instance and roll our own iteratedList function that delays by, say, a second if the previous list is empty.
I've already implemented this in our app and can confirm it eliminates the throughput exceptions.
The text was updated successfully, but these errors were encountered: