diff --git a/livekit-api/livekit/api/__init__.py b/livekit-api/livekit/api/__init__.py index e22f846d..d8e5f115 100644 --- a/livekit-api/livekit/api/__init__.py +++ b/livekit-api/livekit/api/__init__.py @@ -22,6 +22,6 @@ from livekit.protocol import models from livekit.protocol import room +from .api import LivekitAPI from .access_token import VideoGrants, AccessToken -from .room_service import RoomService from .version import __version__ diff --git a/livekit-api/livekit/api/_service.py b/livekit-api/livekit/api/_service.py index b845cedd..34cc2858 100644 --- a/livekit-api/livekit/api/_service.py +++ b/livekit-api/livekit/api/_service.py @@ -7,8 +7,10 @@ class Service(ABC): - def __init__(self, host: str, api_key: str, api_secret: str): - self._client = TwirpClient(host, "livekit") + def __init__( + self, host: str, api_key: str, api_secret: str, session: aiohttp.ClientSession + ): + self._client = TwirpClient(session, host, "livekit") self.api_key = api_key self.api_secret = api_secret @@ -18,6 +20,3 @@ def _auth_header(self, grants: VideoGrants) -> Dict[str, str]: headers = {} headers[AUTHORIZATION] = "Bearer {}".format(token) return headers - - async def aclose(self): - await self._client.aclose() diff --git a/livekit-api/livekit/api/_twirp_client.py b/livekit-api/livekit/api/_twirp_client.py index 09f726ab..3b736f43 100644 --- a/livekit-api/livekit/api/_twirp_client.py +++ b/livekit-api/livekit/api/_twirp_client.py @@ -52,7 +52,15 @@ class TwirpErrorCode: class TwirpClient: - def __init__(self, host: str, pkg: str, prefix: str = DEFAULT_PREFIX) -> None: + def __init__( + self, + session: aiohttp.ClientSession, + host: str, + pkg: str, + prefix: str = DEFAULT_PREFIX, + ) -> None: + self._session = aiohttp.ClientSession() + parse_res = urlparse(host) scheme = parse_res.scheme if scheme.startswith("ws"): @@ -62,7 +70,6 @@ def __init__(self, host: str, pkg: str, prefix: str = DEFAULT_PREFIX) -> None: self.host = host.rstrip("/") self.pkg = pkg self.prefix = prefix - self.session = aiohttp.ClientSession() async def request( self, @@ -76,7 +83,7 @@ async def request( headers["Content-Type"] = "application/protobuf" serialized_data = data.SerializeToString() - async with self.session.request( + async with self._session.request( "post", url, headers=headers, data=serialized_data ) as resp: if resp.status == 200: @@ -85,6 +92,3 @@ async def request( # when we have an error, Twirp always encode it in json error_data = await resp.json() raise TwirpError(error_data["code"], error_data["msg"]) - - async def aclose(self): - await self.session.close() diff --git a/livekit-api/livekit/api/api.py b/livekit-api/livekit/api/api.py new file mode 100644 index 00000000..8294f626 --- /dev/null +++ b/livekit-api/livekit/api/api.py @@ -0,0 +1,30 @@ +import aiohttp +import os + + +class LivekitAPI: + def __init__( + self, + url: str = os.getenv("LIVEKIT_URL", "http://localhost:7880"), + api_key: str = os.getenv("LIVEKIT_API_KEY", ""), + api_secret: str = os.getenv("LIVEKIT_API_SECRET", ""), + ): + self._session = aiohttp.ClientSession() + self._room = RoomService(url, api_key, api_secret, self._session) + self._ingress = IngressService(url, api_key, api_secret, self._session) + self._egress = EgressService(url, api_key, api_secret, self._session) + + @property + def room(self): + return self._room + + @property + def ingress(self): + return self._ingress + + @property + def egress(self): + return self._egress + + async def aclose(self): + await self._session.close() diff --git a/livekit-api/livekit/api/egress_service.py b/livekit-api/livekit/api/egress_service.py new file mode 100644 index 00000000..63c1fa6d --- /dev/null +++ b/livekit-api/livekit/api/egress_service.py @@ -0,0 +1,111 @@ +import aiohttp +from livekit.protocol import egress as proto_egress +from livekit.protocol import models as proto_models +from ._service import Service +from .access_token import VideoGrants + +SVC = "Egress" + + +class EgressService(Service): + def __init__(self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str): + super().__init__(session, url, api_key, api_secret) + + async def start_room_composite_egress( + self, start: proto_egress.StartRoomCompositeEgressRequest + ) -> proto_egress.EgressInfo: + return await self._client.request( + SVC, + "StartRoomCompositeEgress", + start, + self._auth_header(VideoGrants(room_record=True)), + proto_egress.EgressInfo, + ) + + async def start_web_egress( + self, start: proto_egress.StartWebEgressRequest + ) -> proto_egress.EgressInfo: + return await self._client.request( + SVC, + "StartWebEgress", + start, + self._auth_header(VideoGrants(room_record=True)), + proto_egress.EgressInfo, + ) + + async def start_participant_egress( + self, start: proto_egress.ParticipantEgressRequest + ) -> proto_egress.EgressInfo: + return await self._client.request( + SVC, + "StartParticipantEgress", + start, + self._auth_header(VideoGrants(room_record=True)), + proto_egress.EgressInfo, + ) + + async def start_track_composite_egress( + self, start: proto_egress.TrackCompositeEgressRequest + ) -> proto_egress.EgressInfo: + return await self._client.request( + SVC, + "StartTrackCompositeEgress", + start, + self._auth_header(VideoGrants(room_record=True)), + proto_egress.EgressInfo, + ) + + async def start_track_egress( + self, start: proto_egress.TrackEgressRequest + ) -> proto_egress.EgressInfo: + return await self._client.request( + SVC, + "StartTrackEgress", + start, + self._auth_header(VideoGrants(room_record=True)), + proto_egress.EgressInfo, + ) + + async def update_layout( + self, update: proto_egress.UpdateLayoutRequest + ) -> proto_egress.EgressInfo: + return await self._client.request( + SVC, + "UpdateLayout", + update, + self._auth_header(VideoGrants(room_record=True)), + proto_egress.EgressInfo, + ) + + async def update_stream( + self, update: proto_egress.UpdateStreamRequest + ) -> proto_egress.EgressInfo: + return await self._client.request( + SVC, + "UpdateStream", + update, + self._auth_header(VideoGrants(room_record=True)), + proto_egress.EgressInfo, + ) + + async def list_egress( + self, list: proto_egress.ListEgressRequest + ) -> proto_egress.ListEgressResponse: + return await self._client.request( + SVC, + "ListEgress", + list, + self._auth_header(VideoGrants(room_record=True)), + proto_egress.ListEgressResponse, + ) + + async def stop_egress( + self, stop: proto_egress.StopEgressRequest + ) -> proto_egress.EgressInfo: + return await self._client.request( + SVC, + "StopEgress", + stop, + self._auth_header(VideoGrants(room_record=True)), + proto_egress.EgressInfo, + ) diff --git a/livekit-api/livekit/api/ingress_service.py b/livekit-api/livekit/api/ingress_service.py new file mode 100644 index 00000000..a087825c --- /dev/null +++ b/livekit-api/livekit/api/ingress_service.py @@ -0,0 +1,56 @@ +import aiohttp +from livekit.protocol import ingress as proto_ingress +from livekit.protocol import models as proto_models +from ._service import Service +from .access_token import VideoGrants + +SVC = "Ingress" + + +class IngressService(Service): + def __init__(self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str): + super().__init__(session, url, api_key, api_secret) + + async def create_ingress( + self, create: proto_ingress.CreateIngressRequest + ) -> proto_ingress.IngressInfo: + return await self._client.request( + SVC, + "CreateIngress", + create, + self._auth_header(VideoGrants(ingress_admin=True)), + proto_ingress.IngressInfo, + ) + + async def update_ingress( + self, update: proto_ingress.UpdateIngressRequest + ) -> proto_ingress.IngressInfo: + return await self._client.request( + SVC, + "UpdateIngress", + update, + self._auth_header(VideoGrants(ingress_admin=True)), + proto_ingress.IngressInfo, + ) + + async def list_ingress( + self, list: proto_ingress.ListIngressRequest + ) -> proto_ingress.ListIngressResponse: + return await self._client.request( + SVC, + "ListIngress", + list, + self._auth_header(VideoGrants(ingress_admin=True)), + proto_ingress.ListIngressResponse, + ) + + async def delete_ingress( + self, delete: proto_ingress.DeleteIngressRequest + ) -> proto_ingress.IngressInfo: + return await self._client.request( + SVC, + "DeleteIngress", + delete, + self._auth_header(VideoGrants(ingress_admin=True)), + proto_ingress.IngressInfo, + ) diff --git a/livekit-api/livekit/api/room_service.py b/livekit-api/livekit/api/room_service.py index 6be0df92..686322cd 100644 --- a/livekit-api/livekit/api/room_service.py +++ b/livekit-api/livekit/api/room_service.py @@ -1,3 +1,4 @@ +import aiohttp from livekit.protocol import room as proto_room from livekit.protocol import models as proto_models from ._service import Service @@ -7,8 +8,10 @@ class RoomService(Service): - def __init__(self, url: str, api_key: str, api_secret: str): - super().__init__(url, api_key, api_secret) + def __init__( + self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str + ): + super().__init__(session, url, api_key, api_secret) async def create_room( self, create: proto_room.CreateRoomRequest @@ -81,8 +84,64 @@ async def remove_participant( ) -> proto_room.RemoveParticipantResponse: return await self._client.request( SVC, - "remove_participant", + "RemoveParticipant", remove, self._auth_header(VideoGrants(room_admin=True, room=remove.room)), proto_room.RemoveParticipantResponse, ) + + async def mute_published_track( + self, + update: proto_room.MuteRoomTrackRequest, + ) -> proto_room.MuteRoomTrackResponse: + return await self._client.request( + SVC, + "MutePublishedTrack", + update, + self._auth_header(VideoGrants(room_admin=True, room=room)), + proto_room.MuteRoomTrackResponse, + ) + + async def update_participant( + self, update: proto_room.UpdateParticipantRequest + ) -> proto_models.ParticipantInfo: + return await self._client.request( + SVC, + "UpdateParticipant", + update, + self._auth_header(VideoGrants(room_admin=True, room=update.room)), + proto_models.ParticipantInfo, + ) + + async def update_subscriptions( + self, update: proto_room.UpdateSubscriptionsRequest + ) -> proto_room.UpdateSubscriptionsResponse: + return await self._client.request( + SVC, + "UpdateSubscriptions", + update, + self._auth_header(VideoGrants(room_admin=True, room=update.room)), + proto_room.UpdateSubscriptionsResponse, + ) + + async def send_data( + self, send: proto_room.SendDataRequest + ) -> proto_room.SendDataResponse: + return await self._client.request( + SVC, + "SendData", + send, + self._auth_header(VideoGrants(room_admin=True, room=send.room)), + proto_room.SendDataResponse, + ) + + async def update_room_metadata( + self, update: proto_room.UpdateRoomMetadataRequest + ) -> proto_room.Room: + return await self._client.request( + SVC, + "UpdateRoomMetadata", + update, + self._auth_header(VideoGrants(room_admin=True, room=update.room)), + proto_room.Room, + ) diff --git a/livekit-api/livekit/api/webhooks.py b/livekit-api/livekit/api/webhooks.py new file mode 100644 index 00000000..e69de29b