-
Notifications
You must be signed in to change notification settings - Fork 0
/
cache.py
32 lines (23 loc) · 879 Bytes
/
cache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from collections.abc import AsyncIterator
from typing import Any
from nextline.events import OnWriteStdout
from nextline.plugin.spec import hookimpl
from nextline.utils.pubsub import PubSubItem
class CacheStdout:
def __init__(self) -> None:
self._pubsub = PubSubItem[str](cache=True)
async def subscribe(self) -> AsyncIterator[str]:
async for text in self._pubsub.subscribe():
yield text
async def aclose(self) -> None:
await self._pubsub.aclose()
async def __aenter__(self) -> 'CacheStdout':
return self
async def __aexit__(self, *_: Any, **__: Any) -> None:
await self.aclose()
@hookimpl
async def on_initialize_run(self) -> None:
self._pubsub.clear()
@hookimpl
async def on_write_stdout(self, event: OnWriteStdout) -> None:
await self._pubsub.publish(event.text)