Skip to content

Commit

Permalink
add other api services, token verifier & webhooks (#98)
Browse files Browse the repository at this point in the history
  • Loading branch information
theomonnom authored Nov 14, 2023
1 parent 41793b4 commit ce81435
Show file tree
Hide file tree
Showing 12 changed files with 486 additions and 35 deletions.
4 changes: 3 additions & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ wheel
setuptools
twine
auditwheel; sys_platform == 'linux'
cibuildwheel
cibuildwheel

pytest
14 changes: 8 additions & 6 deletions livekit-api/livekit/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

# flake8: noqa
# re-export packages from protocol
from livekit.protocol import egress
from livekit.protocol import ingress
from livekit.protocol import models
from livekit.protocol import room
from livekit.protocol.egress import *
from livekit.protocol.ingress import *
from livekit.protocol.models import *
from livekit.protocol.room import *
from livekit.protocol.webhook import *

from .access_token import VideoGrants, AccessToken
from .room_service import RoomService
from .livekit_api import LiveKitAPI
from .access_token import VideoGrants, AccessToken, TokenVerifier
from .webhook import WebhookReceiver
from .version import __version__
10 changes: 5 additions & 5 deletions livekit-api/livekit/api/_service.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Dict
import aiohttp
from abc import ABC
from ._twirp_client import TwirpClient
from .access_token import AccessToken, VideoGrants
Expand All @@ -7,8 +8,10 @@


class Service(ABC):
def __init__(self, host: str, api_key: str, api_secret: str):
self._client = TwirpClient(host, "livekit")
def __init__(
self, host: str, api_key: str, api_secret: str, session: aiohttp.ClientSession
):
self._client = TwirpClient(session, host, "livekit")
self.api_key = api_key
self.api_secret = api_secret

Expand All @@ -18,6 +21,3 @@ def _auth_header(self, grants: VideoGrants) -> Dict[str, str]:
headers = {}
headers[AUTHORIZATION] = "Bearer {}".format(token)
return headers

async def aclose(self):
await self._client.aclose()
18 changes: 11 additions & 7 deletions livekit-api/livekit/api/_twirp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,15 @@ class TwirpErrorCode:


class TwirpClient:
def __init__(self, host: str, pkg: str, prefix: str = DEFAULT_PREFIX) -> None:
def __init__(
self,
session: aiohttp.ClientSession,
host: str,
pkg: str,
prefix: str = DEFAULT_PREFIX,
) -> None:
self._session = aiohttp.ClientSession()

parse_res = urlparse(host)
scheme = parse_res.scheme
if scheme.startswith("ws"):
Expand All @@ -62,7 +70,6 @@ def __init__(self, host: str, pkg: str, prefix: str = DEFAULT_PREFIX) -> None:
self.host = host.rstrip("/")
self.pkg = pkg
self.prefix = prefix
self.session = aiohttp.ClientSession()

async def request(
self,
Expand All @@ -76,15 +83,12 @@ async def request(
headers["Content-Type"] = "application/protobuf"

serialized_data = data.SerializeToString()
async with self.session.request(
"post", url, headers=headers, data=serialized_data
async with self._session.post(
url, headers=headers, data=serialized_data
) as resp:
if resp.status == 200:
return response_class.FromString(await resp.read())
else:
# when we have an error, Twirp always encode it in json
error_data = await resp.json()
raise TwirpError(error_data["code"], error_data["msg"])

async def aclose(self):
await self.session.close()
60 changes: 47 additions & 13 deletions livekit-api/livekit/api/access_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@

import calendar
import dataclasses
import re
import datetime
import os

import jwt

DEFAULT_TTL = datetime.timedelta(hours=6)
DEFAULT_LEEWAY = datetime.timedelta(minutes=1)


@dataclasses.dataclass
Expand Down Expand Up @@ -63,6 +64,11 @@ class VideoGrants:

@dataclasses.dataclass
class Claims:
exp: int = 0
iss: str = "" # api key
nbf: int = 0
sub: str = "" # identity

name: str = ""
video: VideoGrants = dataclasses.field(default_factory=VideoGrants)
metadata: str = ""
Expand Down Expand Up @@ -110,17 +116,12 @@ def with_sha256(self, sha256: str) -> "AccessToken":
return self

def to_jwt(self) -> str:
def camel_case_dict(data) -> dict:
return {
"".join(
word if i == 0 else word.title()
for i, word in enumerate(key.split("_"))
): value
for key, value in data
if value is not None
}
video = self.claims.video
if video.room_join and (not self.identity or not video.room):
raise ValueError("identity and room must be set when joining a room")

claims = dataclasses.asdict(self.claims)
claims = {camel_to_snake(k): v for k, v in claims.items()}
claims.update(
{
"sub": self.identity,
Expand All @@ -129,10 +130,43 @@ def camel_case_dict(data) -> dict:
"exp": calendar.timegm(
(datetime.datetime.utcnow() + self.ttl).utctimetuple()
),
"video": dataclasses.asdict(
self.claims.video, dict_factory=camel_case_dict
),
}
)

return jwt.encode(claims, self.api_secret, algorithm="HS256")


class TokenVerifier:
def __init__(
self,
api_key: str = os.getenv("LIVEKIT_API_KEY", ""),
api_secret: str = os.getenv("LIVEKIT_API_SECRET", ""),
*,
leeway: datetime.timedelta = DEFAULT_LEEWAY,
) -> None:
self.api_key = api_key
self.api_secret = api_secret
self._leeway = leeway

def verify(self, token: str) -> Claims:
claims = jwt.decode(
token,
self.api_secret,
issuer=self.api_key,
algorithms=["HS256"],
leeway=self._leeway.total_seconds(),
)
c = Claims(**claims)

video = claims["video"]
video = {camel_to_snake(k): v for k, v in video.items()}
c.video = VideoGrants(**video)
return c


def camel_to_snake(t: str):
return re.sub(r"(?<!^)(?=[A-Z])", "_", t).lower()


def snake_to_camel(t: str):
return "".join(x.title() for x in t.split("_"))
112 changes: 112 additions & 0 deletions livekit-api/livekit/api/egress_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import aiohttp
from livekit.protocol import egress as proto_egress
from ._service import Service
from .access_token import VideoGrants

SVC = "Egress"


class EgressService(Service):
def __init__(
self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str
):
super().__init__(session, url, api_key, api_secret)

async def start_room_composite_egress(
self, start: proto_egress.RoomCompositeEgressRequest
) -> proto_egress.EgressInfo:
return await self._client.request(
SVC,
"StartRoomCompositeEgress",
start,
self._auth_header(VideoGrants(room_record=True)),
proto_egress.EgressInfo,
)

async def start_web_egress(
self, start: proto_egress.WebEgressRequest
) -> proto_egress.EgressInfo:
return await self._client.request(
SVC,
"StartWebEgress",
start,
self._auth_header(VideoGrants(room_record=True)),
proto_egress.EgressInfo,
)

async def start_participant_egress(
self, start: proto_egress.ParticipantEgressRequest
) -> proto_egress.EgressInfo:
return await self._client.request(
SVC,
"StartParticipantEgress",
start,
self._auth_header(VideoGrants(room_record=True)),
proto_egress.EgressInfo,
)

async def start_track_composite_egress(
self, start: proto_egress.TrackCompositeEgressRequest
) -> proto_egress.EgressInfo:
return await self._client.request(
SVC,
"StartTrackCompositeEgress",
start,
self._auth_header(VideoGrants(room_record=True)),
proto_egress.EgressInfo,
)

async def start_track_egress(
self, start: proto_egress.TrackEgressRequest
) -> proto_egress.EgressInfo:
return await self._client.request(
SVC,
"StartTrackEgress",
start,
self._auth_header(VideoGrants(room_record=True)),
proto_egress.EgressInfo,
)

async def update_layout(
self, update: proto_egress.UpdateLayoutRequest
) -> proto_egress.EgressInfo:
return await self._client.request(
SVC,
"UpdateLayout",
update,
self._auth_header(VideoGrants(room_record=True)),
proto_egress.EgressInfo,
)

async def update_stream(
self, update: proto_egress.UpdateStreamRequest
) -> proto_egress.EgressInfo:
return await self._client.request(
SVC,
"UpdateStream",
update,
self._auth_header(VideoGrants(room_record=True)),
proto_egress.EgressInfo,
)

async def list_egress(
self, list: proto_egress.ListEgressRequest
) -> proto_egress.ListEgressResponse:
return await self._client.request(
SVC,
"ListEgress",
list,
self._auth_header(VideoGrants(room_record=True)),
proto_egress.ListEgressResponse,
)

async def stop_egress(
self, stop: proto_egress.StopEgressRequest
) -> proto_egress.EgressInfo:
return await self._client.request(
SVC,
"StopEgress",
stop,
self._auth_header(VideoGrants(room_record=True)),
proto_egress.EgressInfo,
)
57 changes: 57 additions & 0 deletions livekit-api/livekit/api/ingress_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import aiohttp
from livekit.protocol import ingress as proto_ingress
from ._service import Service
from .access_token import VideoGrants

SVC = "Ingress"


class IngressService(Service):
def __init__(
self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str
):
super().__init__(session, url, api_key, api_secret)

async def create_ingress(
self, create: proto_ingress.CreateIngressRequest
) -> proto_ingress.IngressInfo:
return await self._client.request(
SVC,
"CreateIngress",
create,
self._auth_header(VideoGrants(ingress_admin=True)),
proto_ingress.IngressInfo,
)

async def update_ingress(
self, update: proto_ingress.UpdateIngressRequest
) -> proto_ingress.IngressInfo:
return await self._client.request(
SVC,
"UpdateIngress",
update,
self._auth_header(VideoGrants(ingress_admin=True)),
proto_ingress.IngressInfo,
)

async def list_ingress(
self, list: proto_ingress.ListIngressRequest
) -> proto_ingress.ListIngressResponse:
return await self._client.request(
SVC,
"ListIngress",
list,
self._auth_header(VideoGrants(ingress_admin=True)),
proto_ingress.ListIngressResponse,
)

async def delete_ingress(
self, delete: proto_ingress.DeleteIngressRequest
) -> proto_ingress.IngressInfo:
return await self._client.request(
SVC,
"DeleteIngress",
delete,
self._auth_header(VideoGrants(ingress_admin=True)),
proto_ingress.IngressInfo,
)
Loading

0 comments on commit ce81435

Please sign in to comment.