-
Notifications
You must be signed in to change notification settings - Fork 475
The segments may or may not be encrypted. The keys
attribute list will
be a list with all the different keys as described with #EXT-X-KEY
:
Each key has the next properties:
-
method
: ex.: "AES-128" -
uri
: the key uri, ex.: "http://videoserver.com/key.bin" -
iv
: the initialization vector, if available. OtherwiseNone
.
If no #EXT-X-KEY
is found, the keys
list will have a unique element None
. Multiple keys are supported.
If unencrypted and encrypted segments are mixed in the M3U8 file, then the list will contain a None
element, with one
or more keys afterwards.
To traverse the list of keys available:
import m3u8
m3u8_obj = m3u8.loads('#EXTM3U8 ... etc ...')
len(m3u8_obj.keys) # => returns the number of keys available in the list (normally 1)
for key in m3u8_obj.keys:
if key: # First one could be None
key.uri
key.method
key.iv
There are cases where listing segments for a given key is important. It's possible to
retrieve the list of segments encrypted with one key via by_key
method in the
segments
list.
Example of getting the segments with no encryption:
import m3u8
m3u8_obj = m3u8.loads('#EXTM3U8 ... etc ...')
segmk1 = m3u8_obj.segments.by_key(None)
# Get the list of segments encrypted using last key
segm = m3u8_obj.segments.by_key( m3u8_obj.keys[-1] )
With this method, is now possible also to change the key from some of the segments programmatically:
import m3u8
m3u8_obj = m3u8.loads('#EXTM3U8 ... etc ...')
# Create a new Key and replace it
new_key = m3u8.Key("AES-128", "/encrypted/newkey.bin", None, iv="0xf123ad23f22e441098aa87ee")
for segment in m3u8_obj.segments.by_key( m3u8_obj.keys[-1] ):
segment.key = new_key
# Remember to sync the key from the list as well
m3u8_obj.keys[-1] = new_key
A playlist can have a list to other playlist files, this is used to
represent multiple bitrates videos, and it's called variant streams
.
See an example here
.
variant_m3u8 = m3u8.loads('#EXTM3U8 ... contains a variant stream ...')
variant_m3u8.is_variant # in this case will be True
for playlist in variant_m3u8.playlists:
playlist.uri
playlist.stream_info.bandwidth
the playlist object used in the for loop above has a few attributes:
-
uri
: the url to the stream -
stream_info
: aStreamInfo
object (actually a namedtuple) with all the attributes available to#EXT-X-STREAM-INF
_ -
media
: a list of relatedMedia
objects with all the attributes available to#EXT-X-MEDIA
_ -
playlist_type
: the type of the playlist, which can be one ofVOD
_ (video on demand) orEVENT
_
NOTE: the following attributes are not implemented yet, follow
issue 4
_ for updates
-
alternative_audios
: its an empty list, unless it's a playlist withAlternative audio
, in this case it's a list withMedia
objects with all the attributes available to#EXT-X-MEDIA
-
alternative_videos
: same asalternative_audios
A variant playlist can also have links to I-frame playlists
, which are used
to specify where the I-frames are in a video. See Apple's documentation
on
this for more information. These I-frame playlists can be accessed in a similar
way to regular playlists.
variant_m3u8 = m3u8.loads('#EXTM3U ... contains a variant stream ...')
for iframe_playlist in variant_m3u8.iframe_playlists:
iframe_playlist.uri
iframe_playlist.iframe_stream_info.bandwidth
The iframe_playlist object used in the for loop above has a few attributes:
-
uri
: the url to the I-frame playlist -
base_uri
: the base uri of the variant playlist (if given) -
iframe_stream_info
: aStreamInfo
object (same as a regular playlist)
Quoting the documentation::
Lines that start with the character '#' are either comments or tags.
Tags begin with #EXT. They are case-sensitive. All other lines that
begin with '#' are comments and SHOULD be ignored.
This library ignores all the non-standard tags by default. If you want them to be collected while loading the file content,
you need to pass a function to the load/loads
functions, following the example below:
import m3u8
def get_movie(line, lineno, data, state):
if line.startswith('#MOVIE-NAME:'):
custom_tag = line.split(':')
data['movie'] = custom_tag[1].strip()
m3u8_obj = m3u8.load('http://videoserver.com/playlist.m3u8', custom_tags_parser=get_movie)
print(m3u8_obj.data['movie']) # million dollar baby
Also you are able to override parsing of existing standard tags. To achieve this your custom_tags_parser function have to return boolean True - it will mean that you fully implement parsing of current line therefore 'main parser' can go to next line.
import re
import m3u8
from m3u8 import protocol
from m3u8.parser import save_segment_custom_value
def parse_iptv_attributes(line, lineno, data, state):
# Customize parsing #EXTINF
if line.startswith(protocol.extinf):
title = ''
chunks = line.replace(protocol.extinf + ':', '').split(',', 1)
if len(chunks) == 2:
duration_and_props, title = chunks
elif len(chunks) == 1:
duration_and_props = chunks[0]
additional_props = {}
chunks = duration_and_props.strip().split(' ', 1)
if len(chunks) == 2:
duration, raw_props = chunks
matched_props = re.finditer(r'([\w\-]+)="([^"]*)"', raw_props)
for match in matched_props:
additional_props[match.group(1)] = match.group(2)
else:
duration = duration_and_props
if 'segment' not in state:
state['segment'] = {}
state['segment']['duration'] = float(duration)
state['segment']['title'] = title
# Helper function for saving custom values
save_segment_custom_value(state, 'extinf_props', additional_props)
# Tell 'main parser' that we expect an URL on next lines
state['expect_segment'] = True
# Tell 'main parser' that it can go to next line, we've parsed current fully.
return True
if __name__ == '__main__':
PLAYLIST = """#EXTM3U
#EXTINF:-1 timeshift="0" catchup-days="7" catchup-type="flussonic" tvg-id="channel1" group-title="Group1",Channel1
http://str00.iptv.domain/7331/mpegts?token=longtokenhere
"""
parsed = m3u8.loads(PLAYLIST, custom_tags_parser=parse_iptv_attributes)
first_segment_props = parsed.segments[0].custom_parser_values['extinf_props']
print(first_segment_props['tvg-id']) # 'channel1'
print(first_segment_props['group-title']) # 'Group1'
print(first_segment_props['catchup-type']) # 'flussonic'
Helper functions get_segment_custom_value() and save_segment_custom_value() are intended for getting/storing your parsed values per segment into Segment class. After that all custom values will be accessible via property custom_parser_values of Segment instance.
In case you need to dump custom tags, you can use the following code snippet as inspiration:
def dumps_iptv(iptv : M3U8):
output = ["#EXTM3U"]
last_group = ""
for seg in iptv.segments:
segdumps = []
seg_props = seg.custom_parser_values["extinf_props"]
if seg_props["group-title"] != last_group and last_group != "":
segdumps.append(2*"\n")
last_group = seg_props["group-title"]
if seg.uri:
if seg.duration is not None:
segdumps.append("#EXTINF:%s" % number_to_string(seg.duration))
if seg_props["tvg-logo"]:
segdumps.append(" tvg-logo=\"%s\"" % seg_props["tvg-logo"])
if seg_props["group-title"]:
segdumps.append(" group-title=\"%s\"" % seg_props["group-title"])
if seg.title:
segdumps.append("," + seg.title)
segdumps.append("\n")
segdumps.append(seg.uri)
output.append("".join(segdumps))
return "\n".join(output)
See issue 347
_ for more information.
If you don't want to use urllib to download playlists, having more control on how objects are fetched over the internet,
you can use your own client. requests
is a well known Python HTTP library and it can be used with m3u8
:
import m3u8
import requests
class RequestsClient():
def download(self, uri, timeout=None, headers={}, verify_ssl=True):
o = requests.get(uri, timeout=timeout, headers=headers)
return o.text, o.url
playlist = m3u8.load('http://videoserver.com/playlist.m3u8', http_client=RequestsClient())
print(playlist.dumps())
The advantage of using a custom HTTP client is to refine SSL verification, proxies, performance, flexibility, etc.
In case you need to use a proxy but can't use a system wide proxy (HTTP/HTTPS proxy environment variables), you can pass your HTTP/HTTPS proxies as a dict to the load function.
import m3u8
proxies = {
'http': 'http://10.10.1.10:3128',
'https': 'http://10.10.1.10:1080',
}
http_client = m3u8.httpclient.DefaultHTTPClient(proxies)
playlist = m3u8.load('http://videoserver.com/playlist.m3u8', http_client=http_client) # this could also be an absolute filename
print(playlist.dumps())
It works with the default client only. Custom HTTP clients must implement this feature.
Although the library provides two helper functions for loading playlists, you can also use the M3U8 class to instantiate new playlists in a manner similar to the loads
function. This approach allows you to validate the playlist before using it.
import m3u8
# Create a new playlist with strict validation
# The default for 'strict' is False
p = m3u8.M3U8("playlist content", strict=True)
When strict=True
is used, the content of the playlist is validated. If any errors are found, an exception will be raised, and a human-readable message will describe the error.
Give it a URI that points to a media playlist (one that contains transport stream URIs, not a multivariant playlist that contains other playlist URIs). It will read the playlist, download all the segments, and write them to your chosen output location. A single output file is created. This works because MPEG Transport Stream files can be concatenated.
from argparse import ArgumentParser
from concurrent.futures import ThreadPoolExecutor
from sys import argv
from m3u8 import loads as m3u8_loads
from requests import Session
def main(args=None):
args = args or argv[1:]
argument_parser = ArgumentParser(description='HLS transport stream downloader')
argument_parser.add_argument('playlist_uri', type=str, help='Playlist location')
argument_parser.add_argument(
'output_path', type=str, help='Output transport stream location'
)
argument_parser.add_argument(
'--threads', type=int, default=8, help='Number of concurrent downloads'
)
parsed_args = argument_parser.parse_args()
playlist_uri = parsed_args.playlist_uri
max_workers = parsed_args.threads
output_path = parsed_args.output_path
pool = ThreadPoolExecutor(max_workers=max_workers)
session = Session()
outfile = open(output_path, 'wb')
with pool, session, outfile:
playlist_text = session.get(playlist_uri).text
parsed_playlist = m3u8_loads(playlist_text, uri=playlist_uri)
all_uris = (s.absolute_uri for s in parsed_playlist.segments)
for data in pool.map(lambda uri: session.get(uri).content, all_uris):
outfile.write(data)
if __name__ == '__main__':
main()
Use the snippet below to append one playlist to another. Keep in mind that this is just an example, and each scenario may require its own adjustments.
import m3u8
# Read in the playlists
# playlist_1_text = m3u8.load('http://videoserver.com/playlist_1.m3u8')
# playlist_2_text = m3u8.load('http://videoserver.com/playlist_2.m3u8')
# Split them into lines
playlist_1_lines = playlist_1_text.splitlines()
playlist_2_lines = playlist_2_text.splitlines()
# Take the lines from playlist 1, then add in any non-duplicates from playlist 2
line_set = set(playlist_1_lines)
combined_lines = playlist_1_lines + [x for x in playlist_2_lines if x not in line_set]
playlist_combined_text = '\n'.join(combined_lines)
# Parse the combined lines to put the media tags back at the top
combined_playlist = m3u8.loads(playlist_combined_text)
combined_playlist_text = combined_playlist.dumps()
print(combined_playlist_text)