Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hr/streaming #87

Closed
wants to merge 3 commits into from
Closed

Hr/streaming #87

wants to merge 3 commits into from

Conversation

HRashidi
Copy link
Contributor

Adding streaming support for the aana

The stream can support fps and channel number
I added fps and channel inside the stream input, but we can do the same with different input, something like video_params

Copy link
Collaborator

@evanderiel evanderiel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix timestamps and test cases.

aana/tests/test_stream_input.py Outdated Show resolved Hide resolved
aana/tests/test_stream_input.py Outdated Show resolved Hide resolved
aana/utils/video.py Outdated Show resolved Hide resolved
aana/utils/video.py Outdated Show resolved Hide resolved
@HRashidi
Copy link
Contributor Author

Support: hls, dash, mp4

Supporting SRT:
apt-get install libavformat-dev libavdevice-dev
pip install av --no-binary av (version 12)

We can not install the pyav with poetry with this flag: --no-binary

@HRashidi HRashidi requested a review from evanderiel April 26, 2024 13:06
@HRashidi HRashidi linked an issue May 2, 2024 that may be closed by this pull request
@@ -20,7 +21,8 @@
},
"mounts": [
"source=${localEnv:PATH_NAS:/nas},target=/nas,type=bind",
"source=${localEnv:PATH_NAS2:/nas2},target=/nas2,type=bind"
"source=${localEnv:PATH_NAS2:/nas2},target=/nas2,type=bind",
"source=${localEnv:HOME}/.ssh,target=/root/.ssh,readonly,type=bind"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why .ssh is mounted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It provided the ssh key of github, so we can push commits inside the container (in case of using github key).
I can remove it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed


@field_validator("media_id")
@classmethod
def media_id_must_not_be_empty(cls, media_id):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it already checked in MediaId type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems you are right. But we also have it inside of image_input. I will remove both if funtionality works with the media_id itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


@field_validator("url")
@classmethod
def check_url(cls, url: str) -> str:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pydantic has Url type: https://docs.pydantic.dev/latest/api/networks/
Can we use it instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, we can do it. I will check to see if it allows ip and port or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


# Check the stream channel be valid
if len(available_streams) == 0 or channel >= len(available_streams):
raise StreamReadingException(stream_url, msg="selected channel does not exist")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to mention which channel you are trying to use and how many channels does the stream have.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

description=("the desired channel of stream"),
)

extract_fps: float = Field(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if extract_fps belongs here. It's kinda specific to frame extraction. If we want to do audio transcription it wouldn't even be necessary. We can keep it here for now but I think it doesn't belong here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was single value, so I did not add another class for it.

@movchan74
Copy link
Contributor

I've tried the example project for streaming on news stream but I think it lags behind but I don't know.
@HRashidi how can I figure it out? Maybe return lag in the output? I'm not sure. @evanderiel @ashwinnair14 any ideas how we should deal with lag?

for frame in packet.decode():
if frame_number % frame_rate == 0:
img = Image(numpy=frame.to_rgb().to_ndarray())
packet_timestamp = packet.dts
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the unit for the timestamps? It is definitely not seconds, diff between adjacent frames is 28800. I suggest we convert it into seconds for standardization.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@movchan74
Copy link
Contributor

I've tried the example project for streaming on news stream but I think it lags behind but I don't know. @HRashidi how can I figure it out? Maybe return lag in the output? I'm not sure. @evanderiel @ashwinnair14 any ideas how we should deal with lag?

I think I found the way to calculate lag with existing output:

import requests, json, time

data = {
    "stream": {
        # "url": "https://www.youtube.com/watch?v=9bZkp7q19f0",
        "url": "https://tagesschau.akamaized.net/hls/live/2020117/tagesschau/tagesschau_3/master-720p-3200.m3u8",
        "media_id": "9bZkp7q19f0",
    }
}
response = requests.post(
    "http://localhost:8000/stream/caption_stream",
    data={"body": json.dumps(data)},
    stream=True,
)
init_timestamp = None
init_time = None
for chunk in response.iter_content(chunk_size=None):
    chunk = json.loads(chunk)

    if not init_timestamp:
        init_timestamp = chunk["timestamps"][0]/100000
        init_time = time.time()

    # calculate lag
    current_time = time.time()
    time_diff = current_time - init_time
    timestamp_diff = chunk["timestamps"][0]/100000 - init_timestamp
    lag = time_diff - timestamp_diff
    print(f"Time diff: {time_diff:.2f}s")
    print(f"Timestamp diff: {timestamp_diff:.2f}s")
    print(f"Lag: {lag:.2f}s")

    print (chunk)

And it does lags behind:

Time diff: 0.00s
Timestamp diff: 0.00s
Lag: 0.00s
{'captions': ['a man walking past a store window with a sign on it that says "no smoking" in french and english.jpg, file photo, montreal, canada', 'a black and white photo of a woman walking past a store window with a sign on it that says "mima" in germany, and a woman walking past a store window with a sign on it that says "mima" in germany'], 'timestamps': [2673903000, 2673931800]}
Time diff: 1.86s
Timestamp diff: 0.58s
Lag: 1.28s
{'captions': ['a woman is walking past a store window with a sign that says "open" on it. the woman is wearing a white shirt and black pants. she is holding a purse', 'a man is walking past a store window with a sign that says "villa" on it in french and english. the man is wearing a white shirt and black pants'], 'timestamps': [2673960600, 2673989400]}
Time diff: 3.60s
Timestamp diff: 1.15s
Lag: 2.45s
{'captions': ['a man is walking past a store front with a sign that says stufung händler in germany. the man is wearing a suit and tie', 'a man is standing outside of a store with a sign that says stifung water in germany. the sign is in english and says stifung water'], 'timestamps': [2674018200, 2674047000]}
Time diff: 4.97s
Timestamp diff: 1.73s
Lag: 3.24s
{'captions': ['a man walking past a store with the words "stufung waren" on the front window in germany\'s capital city of berlin, germany', 'a black and white photo of a store front with a sign that says stiftung warennte in germany. the sign is in english and says stiftung warennte'], 'timestamps': [2674075800, 2674104600]}
Time diff: 6.84s
Timestamp diff: 2.30s
Lag: 4.53s

I even tried to change extract_fps to 1 but it still lags.

@evanderiel
Copy link
Collaborator

I've tried the example project for streaming on news stream but I think it lags behind but I don't know. @HRashidi how can I figure it out? Maybe return lag in the output? I'm not sure. @evanderiel @ashwinnair14 any ideas how we should deal with lag?

If I understand correctly, the problem is that frames come in faster than the pipeline can process them, correct? Then we need to either make the pipeline on average faster (e.g. larger batches) or process fewer frames. Processing fewer frames can mean janky results but it may not be possible to make the pipeline sufficiently fast in any case.

@movchan74
Copy link
Contributor

I've tried the example project for streaming on news stream but I think it lags behind but I don't know. @HRashidi how can I figure it out? Maybe return lag in the output? I'm not sure. @evanderiel @ashwinnair14 any ideas how we should deal with lag?

If I understand correctly, the problem is that frames come in faster than the pipeline can process them, correct? Then we need to either make the pipeline on average faster (e.g. larger batches) or process fewer frames. Processing fewer frames can mean janky results but it may not be possible to make the pipeline sufficiently fast in any case.

Generally, yes. A faster pipeline means less lag. But that's not the only way to reduce lag. Right now, we are processing one batch of images after another sequentially. The more efficient way would be to process multiple batches at once, like we did it with Mobius Pipeline. I actually tried an older commit (bd49a27) that still runs on Mobius Pipeline and it's much faster, there is no lag (it's actually negative because of the way that I calculate it), even at 3 fps.

@movchan74
Copy link
Contributor

Update on the lag issues

Here is a function that runs multiple concurrent tasks on the generator yields:

async def run_async_generator_concurrently(async_generator, process, batch_size=2):
    queue = asyncio.Queue(batch_size * 2)
    result_queue = asyncio.Queue()
    num_consumers = batch_size

    async def producer():
        async for i in async_generator:
            await queue.put(i)
        # Signal all consumers to shut down by putting a sentinel for each consumer in the queue
        for _ in range(num_consumers):
            await queue.put(None)

    async def consumer():
        while True:
            item = await queue.get()
            if item is None:  # Check for the sentinel
                queue.task_done()
                break
            result = await process(item)
            await result_queue.put(result)
            queue.task_done()

    consumers = [consumer() for _ in range(num_consumers)]
    # # Setup the producer and consumers to run concurrently
    # await asyncio.gather(producer(), *consumers)

    producer_task = asyncio.create_task(producer())
    consumer_tasks = [asyncio.create_task(c) for c in consumers]

    # Yield all results as they are processed
    while True:
        if (
            result_queue.empty()
            and all(c.done() for c in consumer_tasks)
            and producer_task.done()
        ):
            break
        result = await result_queue.get()
        yield result
        result_queue.task_done()

    # Wait for all tasks to complete
    await producer_task
    for task in consumer_tasks:
        await task

You can try it on the toy example.

import asyncio


async def async_generator(n):
    for i in range(n):
        yield i


async def process(i):
    await asyncio.sleep(0.1)
    print(f"i * i = {i * i}")
    return i * i

gen = async_generator(10)
async for item in run_async_generator_concurrently(gen, process, 5):
    print(item)

With 5 consumers it runs 5x faster. So it seems to work.

But when I add it to the endpoint it doesn't seems to help much.

class CaptionStreamEndpoint(Endpoint):
    """Transcribe video in chunks endpoint."""

    async def initialize(self):
        """Initialize the endpoint."""
        self.captioning_handle = await AanaDeploymentHandle.create(
            "captioning_deployment"
        )

    async def run(
        self,
        stream: StreamInput,
        batch_size: int,
    ) -> AsyncGenerator[CaptionStreamOutput, None]:
        """Transcribe video in chunks."""
        async def predict_captions(frames_dict):
            captioning_output = await self.captioning_handle.generate_batch(
                images=frames_dict["frames"]
            )
            # print("captioning_output", captioning_output)
            return {
                "captions": captioning_output["captions"],
                "timestamps": frames_dict["timestamps"],
            }

        gen = run_remote(fetch_stream_frames)(stream_input=stream, batch_size=2)
        async for item in run_async_generator_concurrently(
            gen, predict_captions, batch_size
        ):
            yield item

The version with Mobius Pipeline still has a lower lag.

But I run it for longer and the lag always goes up. I've tried to reduce FPS, use a version with Mobius Pipeline, and add more consumers. Nothing works, it starts to increase after some time.

grafik

I've tried to debug it by looking at the BLIP2 latency in metrics but it's not clear to me what it means.

BTW Ray added a command to start Prometheus locally but it's only available in the latest versions so we need to update Ray. See https://docs.ray.io/en/latest/cluster/metrics.html#quickstart-running-prometheus-locally.

What it all means I'm not sure yet. There is definitely something wrong and we need to figure it out before we can say that we have streaming support.

@ashwinnair14
Copy link
Contributor

Temporarily paused.

@movchan74
Copy link
Contributor

I've tested the changes and it seems to work better, at least at 1 FPS. At 1 FPS it seems to be pretty stable now.
image

But it still lags at >=2 FPS. Even with the async queue. Actually, async queue seems to be hurting the performance.
image

I think the main issue now is that the model is too slow. But that's not an issue with PR so it's fine.

@HRashidi HRashidi requested a review from movchan74 May 8, 2024 11:18
@HRashidi HRashidi closed this Aug 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[enhancement] Streaming video support on the SDK. HLS, DASH
4 participants