-
Notifications
You must be signed in to change notification settings - Fork 2
/
bilibili_provider.py
149 lines (123 loc) · 4.75 KB
/
bilibili_provider.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import sys
import threading
import logging
import asyncio
import aiohttp
from feeluown.library import (
ProviderV2, ProviderFlags as PF, AbstractProvider,
ModelType, VideoModel, BriefArtistModel,
)
from feeluown.media import Quality, Media, MediaType, VideoAudioManifest
from feeluown.utils.sync import AsyncToSync
local = threading.local()
logger = logging.getLogger('feeluown.fuo_provider_bilibili')
def fixed_get_session():
session = getattr(local, 'session', None)
if session is None:
session = aiohttp.ClientSession(loop=asyncio.get_event_loop())
local.session = session
return session
def Sync(coro):
"""
Ensure aiohttp session is closed.
"""
async def wrap_coro(*args, **kwargs):
try:
return await coro(*args, **kwargs)
finally:
await local.session.close()
return AsyncToSync(wrap_coro)
def patch():
"""
Try to workaround https://github.com/MoyuScript/bilibili-api/issues/245
"""
from bilibili_api.utils import network
network.get_session = fixed_get_session
mod_to_delete = []
for mod in sys.modules:
if mod.startswith('bilibili_api') and 'network' not in mod:
mod_to_delete.append(mod)
for mod in mod_to_delete:
del sys.modules[mod]
from bilibili_api.video import Video # noqa
patch()
def create_video(identifier):
if identifier.isdigit():
v = Video(aid=int(identifier))
else:
# Old bilibili video model trim the BV prefix
if not identifier.startswith('BV'):
identifier = f'BV{identifier}'
v = Video(bvid=identifier)
return v
class BilibiliProvider(AbstractProvider, ProviderV2):
class meta:
identifier = 'bilibili'
name = 'Bilibili'
flags = {
ModelType.video: (PF.get | PF.multi_quality | PF.model_v2),
}
@property
def identifier(self):
return self.meta.identifier
@property
def name(self):
return self.meta.name
def video_get(self, vid: str):
v = create_video(vid)
info = Sync(v.get_info)()
artists = [BriefArtistModel(source=self.meta.identifier,
identifier=info['owner']['mid'],
name=info['owner']['name'])]
video = VideoModel(source=self.meta.identifier,
identifier=vid,
title=info['title'],
artists=artists,
duration=info['duration'],
cover=info['pic'])
# `pages` means how much parts a video have.
# TODO: each part should be a video model and have its own identifier
video.cache_set('pages', [{'cid': page['cid']} for page in info['pages']])
return video
def video_get_media(self, video, quality):
q_media_mapping = self._get_or_fetch_q_media_mapping(video)
return q_media_mapping.get(quality)
def video_list_quality(self, video):
q_media_mapping = self._get_or_fetch_q_media_mapping(video)
return list(q_media_mapping.keys())
def _get_or_fetch_q_media_mapping(self, video):
v = create_video(video.identifier)
pages = self._model_cache_get_or_fetch(video, 'pages')
assert pages, 'this should not happend, a video has no part'
url_info = Sync(v.get_download_url)(cid=pages[0]['cid'])
q_media_mapping = self._parse_media_info(url_info)
video.cache_set('q_media_mapping', q_media_mapping)
return q_media_mapping
def _parse_media_info(self, url_info):
q_media_mapping = {}
dash_info = url_info['dash']
# Not sure if the `audio` always exists.
audio_url = dash_info['audio'][0]['base_url']
for q in sorted(url_info['accept_quality'], reverse=True)[:4]:
for video in dash_info['video']:
if video['id'] == q:
video_url = video['base_url']
if audio_url:
obj = VideoAudioManifest(video_url, audio_url)
else:
obj = video_url
media = Media(obj,
type_=MediaType.video,
http_headers={'Referer': 'https://www.bilibili.com/'})
# TODO: handle more qualities
if q >= 32:
q_media_mapping[Quality.Video.hd] = media
else:
q_media_mapping[Quality.Video.sd] = media
return q_media_mapping
APP = app # noqa
provider = BilibiliProvider()
tmp_provider = APP.library.get(provider.meta.identifier)
if tmp_provider is not None:
APP.library.deregister(tmp_provider)
APP.library.register(provider)