diff --git a/dev-requirements.txt b/dev-requirements.txt index 1d0fd626..a31ca5fd 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -6,4 +6,6 @@ wheel setuptools twine auditwheel; sys_platform == 'linux' -cibuildwheel \ No newline at end of file +cibuildwheel + +pytest diff --git a/livekit-api/livekit/api/__init__.py b/livekit-api/livekit/api/__init__.py index e22f846d..859a2480 100644 --- a/livekit-api/livekit/api/__init__.py +++ b/livekit-api/livekit/api/__init__.py @@ -17,11 +17,13 @@ # flake8: noqa # re-export packages from protocol -from livekit.protocol import egress -from livekit.protocol import ingress -from livekit.protocol import models -from livekit.protocol import room +from livekit.protocol.egress import * +from livekit.protocol.ingress import * +from livekit.protocol.models import * +from livekit.protocol.room import * +from livekit.protocol.webhook import * -from .access_token import VideoGrants, AccessToken -from .room_service import RoomService +from .livekit_api import LiveKitAPI +from .access_token import VideoGrants, AccessToken, TokenVerifier +from .webhook import WebhookReceiver from .version import __version__ diff --git a/livekit-api/livekit/api/_service.py b/livekit-api/livekit/api/_service.py index b845cedd..4c511540 100644 --- a/livekit-api/livekit/api/_service.py +++ b/livekit-api/livekit/api/_service.py @@ -1,4 +1,5 @@ from typing import Dict +import aiohttp from abc import ABC from ._twirp_client import TwirpClient from .access_token import AccessToken, VideoGrants @@ -7,8 +8,10 @@ class Service(ABC): - def __init__(self, host: str, api_key: str, api_secret: str): - self._client = TwirpClient(host, "livekit") + def __init__( + self, host: str, api_key: str, api_secret: str, session: aiohttp.ClientSession + ): + self._client = TwirpClient(session, host, "livekit") self.api_key = api_key self.api_secret = api_secret @@ -18,6 +21,3 @@ def _auth_header(self, grants: VideoGrants) -> Dict[str, str]: headers = {} headers[AUTHORIZATION] = "Bearer {}".format(token) return headers - - async def aclose(self): - await self._client.aclose() diff --git a/livekit-api/livekit/api/_twirp_client.py b/livekit-api/livekit/api/_twirp_client.py index 09f726ab..ef465281 100644 --- a/livekit-api/livekit/api/_twirp_client.py +++ b/livekit-api/livekit/api/_twirp_client.py @@ -52,7 +52,15 @@ class TwirpErrorCode: class TwirpClient: - def __init__(self, host: str, pkg: str, prefix: str = DEFAULT_PREFIX) -> None: + def __init__( + self, + session: aiohttp.ClientSession, + host: str, + pkg: str, + prefix: str = DEFAULT_PREFIX, + ) -> None: + self._session = aiohttp.ClientSession() + parse_res = urlparse(host) scheme = parse_res.scheme if scheme.startswith("ws"): @@ -62,7 +70,6 @@ def __init__(self, host: str, pkg: str, prefix: str = DEFAULT_PREFIX) -> None: self.host = host.rstrip("/") self.pkg = pkg self.prefix = prefix - self.session = aiohttp.ClientSession() async def request( self, @@ -76,8 +83,8 @@ async def request( headers["Content-Type"] = "application/protobuf" serialized_data = data.SerializeToString() - async with self.session.request( - "post", url, headers=headers, data=serialized_data + async with self._session.post( + url, headers=headers, data=serialized_data ) as resp: if resp.status == 200: return response_class.FromString(await resp.read()) @@ -85,6 +92,3 @@ async def request( # when we have an error, Twirp always encode it in json error_data = await resp.json() raise TwirpError(error_data["code"], error_data["msg"]) - - async def aclose(self): - await self.session.close() diff --git a/livekit-api/livekit/api/access_token.py b/livekit-api/livekit/api/access_token.py index 87fecd88..48622f71 100644 --- a/livekit-api/livekit/api/access_token.py +++ b/livekit-api/livekit/api/access_token.py @@ -14,12 +14,13 @@ import calendar import dataclasses +import re import datetime import os - import jwt DEFAULT_TTL = datetime.timedelta(hours=6) +DEFAULT_LEEWAY = datetime.timedelta(minutes=1) @dataclasses.dataclass @@ -63,6 +64,11 @@ class VideoGrants: @dataclasses.dataclass class Claims: + exp: int = 0 + iss: str = "" # api key + nbf: int = 0 + sub: str = "" # identity + name: str = "" video: VideoGrants = dataclasses.field(default_factory=VideoGrants) metadata: str = "" @@ -110,17 +116,12 @@ def with_sha256(self, sha256: str) -> "AccessToken": return self def to_jwt(self) -> str: - def camel_case_dict(data) -> dict: - return { - "".join( - word if i == 0 else word.title() - for i, word in enumerate(key.split("_")) - ): value - for key, value in data - if value is not None - } + video = self.claims.video + if video.room_join and (not self.identity or not video.room): + raise ValueError("identity and room must be set when joining a room") claims = dataclasses.asdict(self.claims) + claims = {camel_to_snake(k): v for k, v in claims.items()} claims.update( { "sub": self.identity, @@ -129,10 +130,43 @@ def camel_case_dict(data) -> dict: "exp": calendar.timegm( (datetime.datetime.utcnow() + self.ttl).utctimetuple() ), - "video": dataclasses.asdict( - self.claims.video, dict_factory=camel_case_dict - ), } ) return jwt.encode(claims, self.api_secret, algorithm="HS256") + + +class TokenVerifier: + def __init__( + self, + api_key: str = os.getenv("LIVEKIT_API_KEY", ""), + api_secret: str = os.getenv("LIVEKIT_API_SECRET", ""), + *, + leeway: datetime.timedelta = DEFAULT_LEEWAY, + ) -> None: + self.api_key = api_key + self.api_secret = api_secret + self._leeway = leeway + + def verify(self, token: str) -> Claims: + claims = jwt.decode( + token, + self.api_secret, + issuer=self.api_key, + algorithms=["HS256"], + leeway=self._leeway.total_seconds(), + ) + c = Claims(**claims) + + video = claims["video"] + video = {camel_to_snake(k): v for k, v in video.items()} + c.video = VideoGrants(**video) + return c + + +def camel_to_snake(t: str): + return re.sub(r"(? proto_egress.EgressInfo: + return await self._client.request( + SVC, + "StartRoomCompositeEgress", + start, + self._auth_header(VideoGrants(room_record=True)), + proto_egress.EgressInfo, + ) + + async def start_web_egress( + self, start: proto_egress.WebEgressRequest + ) -> proto_egress.EgressInfo: + return await self._client.request( + SVC, + "StartWebEgress", + start, + self._auth_header(VideoGrants(room_record=True)), + proto_egress.EgressInfo, + ) + + async def start_participant_egress( + self, start: proto_egress.ParticipantEgressRequest + ) -> proto_egress.EgressInfo: + return await self._client.request( + SVC, + "StartParticipantEgress", + start, + self._auth_header(VideoGrants(room_record=True)), + proto_egress.EgressInfo, + ) + + async def start_track_composite_egress( + self, start: proto_egress.TrackCompositeEgressRequest + ) -> proto_egress.EgressInfo: + return await self._client.request( + SVC, + "StartTrackCompositeEgress", + start, + self._auth_header(VideoGrants(room_record=True)), + proto_egress.EgressInfo, + ) + + async def start_track_egress( + self, start: proto_egress.TrackEgressRequest + ) -> proto_egress.EgressInfo: + return await self._client.request( + SVC, + "StartTrackEgress", + start, + self._auth_header(VideoGrants(room_record=True)), + proto_egress.EgressInfo, + ) + + async def update_layout( + self, update: proto_egress.UpdateLayoutRequest + ) -> proto_egress.EgressInfo: + return await self._client.request( + SVC, + "UpdateLayout", + update, + self._auth_header(VideoGrants(room_record=True)), + proto_egress.EgressInfo, + ) + + async def update_stream( + self, update: proto_egress.UpdateStreamRequest + ) -> proto_egress.EgressInfo: + return await self._client.request( + SVC, + "UpdateStream", + update, + self._auth_header(VideoGrants(room_record=True)), + proto_egress.EgressInfo, + ) + + async def list_egress( + self, list: proto_egress.ListEgressRequest + ) -> proto_egress.ListEgressResponse: + return await self._client.request( + SVC, + "ListEgress", + list, + self._auth_header(VideoGrants(room_record=True)), + proto_egress.ListEgressResponse, + ) + + async def stop_egress( + self, stop: proto_egress.StopEgressRequest + ) -> proto_egress.EgressInfo: + return await self._client.request( + SVC, + "StopEgress", + stop, + self._auth_header(VideoGrants(room_record=True)), + proto_egress.EgressInfo, + ) diff --git a/livekit-api/livekit/api/ingress_service.py b/livekit-api/livekit/api/ingress_service.py new file mode 100644 index 00000000..abf691ef --- /dev/null +++ b/livekit-api/livekit/api/ingress_service.py @@ -0,0 +1,57 @@ +import aiohttp +from livekit.protocol import ingress as proto_ingress +from ._service import Service +from .access_token import VideoGrants + +SVC = "Ingress" + + +class IngressService(Service): + def __init__( + self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str + ): + super().__init__(session, url, api_key, api_secret) + + async def create_ingress( + self, create: proto_ingress.CreateIngressRequest + ) -> proto_ingress.IngressInfo: + return await self._client.request( + SVC, + "CreateIngress", + create, + self._auth_header(VideoGrants(ingress_admin=True)), + proto_ingress.IngressInfo, + ) + + async def update_ingress( + self, update: proto_ingress.UpdateIngressRequest + ) -> proto_ingress.IngressInfo: + return await self._client.request( + SVC, + "UpdateIngress", + update, + self._auth_header(VideoGrants(ingress_admin=True)), + proto_ingress.IngressInfo, + ) + + async def list_ingress( + self, list: proto_ingress.ListIngressRequest + ) -> proto_ingress.ListIngressResponse: + return await self._client.request( + SVC, + "ListIngress", + list, + self._auth_header(VideoGrants(ingress_admin=True)), + proto_ingress.ListIngressResponse, + ) + + async def delete_ingress( + self, delete: proto_ingress.DeleteIngressRequest + ) -> proto_ingress.IngressInfo: + return await self._client.request( + SVC, + "DeleteIngress", + delete, + self._auth_header(VideoGrants(ingress_admin=True)), + proto_ingress.IngressInfo, + ) diff --git a/livekit-api/livekit/api/livekit_api.py b/livekit-api/livekit/api/livekit_api.py new file mode 100644 index 00000000..3b4982ae --- /dev/null +++ b/livekit-api/livekit/api/livekit_api.py @@ -0,0 +1,35 @@ +import aiohttp +import os +from .room_service import RoomService +from .egress_service import EgressService +from .ingress_service import IngressService + + +class LiveKitAPI: + def __init__( + self, + url: str = os.getenv("LIVEKIT_URL", "http://localhost:7880"), + api_key: str = os.getenv("LIVEKIT_API_KEY", ""), + api_secret: str = os.getenv("LIVEKIT_API_SECRET", ""), + *, + timeout: float = 60, # 1 minutes by default + ): + self._session = aiohttp.ClientSession(timeout=timeout) + self._room = RoomService(url, api_key, api_secret, self._session) + self._ingress = IngressService(url, api_key, api_secret, self._session) + self._egress = EgressService(url, api_key, api_secret, self._session) + + @property + def room(self): + return self._room + + @property + def ingress(self): + return self._ingress + + @property + def egress(self): + return self._egress + + async def aclose(self): + await self._session.close() diff --git a/livekit-api/livekit/api/room_service.py b/livekit-api/livekit/api/room_service.py index 6be0df92..9c1b197a 100644 --- a/livekit-api/livekit/api/room_service.py +++ b/livekit-api/livekit/api/room_service.py @@ -1,3 +1,4 @@ +import aiohttp from livekit.protocol import room as proto_room from livekit.protocol import models as proto_models from ._service import Service @@ -7,8 +8,10 @@ class RoomService(Service): - def __init__(self, url: str, api_key: str, api_secret: str): - super().__init__(url, api_key, api_secret) + def __init__( + self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str + ): + super().__init__(session, url, api_key, api_secret) async def create_room( self, create: proto_room.CreateRoomRequest @@ -81,8 +84,53 @@ async def remove_participant( ) -> proto_room.RemoveParticipantResponse: return await self._client.request( SVC, - "remove_participant", + "RemoveParticipant", remove, self._auth_header(VideoGrants(room_admin=True, room=remove.room)), proto_room.RemoveParticipantResponse, ) + + async def mute_published_track( + self, + update: proto_room.MuteRoomTrackRequest, + ) -> proto_room.MuteRoomTrackResponse: + return await self._client.request( + SVC, + "MutePublishedTrack", + update, + self._auth_header(VideoGrants(room_admin=True, room=update.room)), + proto_room.MuteRoomTrackResponse, + ) + + async def update_participant( + self, update: proto_room.UpdateParticipantRequest + ) -> proto_models.ParticipantInfo: + return await self._client.request( + SVC, + "UpdateParticipant", + update, + self._auth_header(VideoGrants(room_admin=True, room=update.room)), + proto_models.ParticipantInfo, + ) + + async def update_subscriptions( + self, update: proto_room.UpdateSubscriptionsRequest + ) -> proto_room.UpdateSubscriptionsResponse: + return await self._client.request( + SVC, + "UpdateSubscriptions", + update, + self._auth_header(VideoGrants(room_admin=True, room=update.room)), + proto_room.UpdateSubscriptionsResponse, + ) + + async def send_data( + self, send: proto_room.SendDataRequest + ) -> proto_room.SendDataResponse: + return await self._client.request( + SVC, + "SendData", + send, + self._auth_header(VideoGrants(room_admin=True, room=send.room)), + proto_room.SendDataResponse, + ) diff --git a/livekit-api/livekit/api/webhook.py b/livekit-api/livekit/api/webhook.py new file mode 100644 index 00000000..3a3c3cbc --- /dev/null +++ b/livekit-api/livekit/api/webhook.py @@ -0,0 +1,21 @@ +from .access_token import TokenVerifier +from livekit.protocol import webhook as proto_webhook +from google.protobuf.json_format import Parse +import hashlib +import base64 + + +class WebhookReceiver: + def __init__(self, token_verifier: TokenVerifier): + self._verifier = token_verifier + + def receive(self, body: str, auth_token: str) -> proto_webhook.WebhookEvent: + claims = self._verifier.verify(auth_token) + + body_hash = hashlib.sha256(body.encode()).digest() + claims_hash = base64.b64decode(claims.sha256) + + if body_hash != claims_hash: + raise Exception("hash mismatch") + + return Parse(body, proto_webhook.WebhookEvent()) diff --git a/livekit-api/tests/test_access_token.py b/livekit-api/tests/test_access_token.py new file mode 100644 index 00000000..abeb0916 --- /dev/null +++ b/livekit-api/tests/test_access_token.py @@ -0,0 +1,56 @@ +import pytest +import datetime +from livekit.api import AccessToken, TokenVerifier, VideoGrants + +TEST_API_KEY = "myapikey" +TEST_API_SECRET = "thiskeyistotallyunsafe" + + +def test_verify_token(): + grants = VideoGrants(room_join=True, room="test_room") + + token = ( + AccessToken(TEST_API_KEY, TEST_API_SECRET) + .with_identity("test_identity") + .with_metadata("test_metadata") + .with_grants(grants) + .to_jwt() + ) + + token_verifier = TokenVerifier(TEST_API_KEY, TEST_API_SECRET) + claims = token_verifier.verify(token) + + assert claims.sub == "test_identity" + assert claims.metadata == "test_metadata" + assert claims.video == grants + + +def test_verify_token_invalid(): + token = ( + AccessToken(TEST_API_KEY, TEST_API_SECRET) + .with_identity("test_identity") + .to_jwt() + ) + + token_verifier = TokenVerifier(TEST_API_KEY, "invalid_secret") + with pytest.raises(Exception): + token_verifier.verify(token) + + token_verifier = TokenVerifier("invalid_key", TEST_API_SECRET) + with pytest.raises(Exception): + token_verifier.verify(token) + + +def test_verify_token_expired(): + token = ( + AccessToken(TEST_API_KEY, TEST_API_SECRET) + .with_identity("test_identity") + .with_ttl(datetime.timedelta(seconds=0)) + .to_jwt() + ) + + token_verifier = TokenVerifier( + TEST_API_KEY, TEST_API_SECRET, leeway=datetime.timedelta(seconds=0) + ) + with pytest.raises(Exception): + token_verifier.verify(token) diff --git a/livekit-api/tests/test_webhook.py b/livekit-api/tests/test_webhook.py new file mode 100644 index 00000000..3dbef831 --- /dev/null +++ b/livekit-api/tests/test_webhook.py @@ -0,0 +1,80 @@ +import pytest +from livekit.api import WebhookReceiver, TokenVerifier, AccessToken +import hashlib +import base64 + +TEST_API_KEY = "myapikey" +TEST_API_SECRET = "thiskeyistotallyunsafe" +TEST_EVENT = """ +{ + "event": "room_started", + "room": { + "sid": "RM_hycBMAjmt6Ub", + "name": "Demo Room", + "emptyTimeout": 300, + "creationTime": "1692627281", + "turnPassword": "2Pvdj+/WV1xV4EkB8klJ9xkXDWY=", + "enabledCodecs": [ + { + "mime": "audio/opus" + }, + { + "mime": "video/H264" + }, + { + "mime": "video/VP8" + }, + { + "mime": "video/AV1" + }, + { + "mime": "video/H264" + }, + { + "mime": "audio/red" + }, + { + "mime": "video/VP9" + } + ] + }, + "id": "EV_eugWmGhovZmm", + "createdAt": "1692985556" +} +""" + + +def test_webhook_receiver(): + token_verifier = TokenVerifier(TEST_API_KEY, TEST_API_SECRET) + receiver = WebhookReceiver(token_verifier) + + hash64 = base64.b64encode(hashlib.sha256(TEST_EVENT.encode()).digest()).decode() + token = AccessToken(TEST_API_KEY, TEST_API_SECRET) + token.claims.sha256 = hash64 + jwt = token.to_jwt() + receiver.receive(TEST_EVENT, jwt) + + +def test_bad_hash(): + token_verifier = TokenVerifier(TEST_API_KEY, TEST_API_SECRET) + receiver = WebhookReceiver(token_verifier) + + token = AccessToken(TEST_API_KEY, TEST_API_SECRET) + hash64 = base64.b64encode(hashlib.sha256("wrong_hash".encode()).digest()).decode() + token.claims.sha256 = hash64 + jwt = token.to_jwt() + with pytest.raises(Exception): + receiver.receive(TEST_EVENT, jwt) + + +def test_invalid_body(): + token_verifier = TokenVerifier(TEST_API_KEY, TEST_API_SECRET) + receiver = WebhookReceiver(token_verifier) + + token = AccessToken(TEST_API_KEY, TEST_API_SECRET) + body = "invalid body" + hash64 = base64.b64encode(hashlib.sha256(body.encode()).digest()).decode() + token.claims.sha256 = hash64 + jwt = token.to_jwt() + with pytest.raises(Exception): + receiver.receive(body, jwt)