This repository has been archived by the owner on Mar 28, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 6
/
music.py
79 lines (69 loc) · 2.44 KB
/
music.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import discord
import youtube_dl
from asyncio import Queue
from random import shuffle
ytdl_format_options = {
'format': 'bestaudio/best',
'extractaudio': True,
'outtmpl': '%(extractor)s-%(id)s-%(title)s.%(ext)s',
'restrictfilenames': True,
'noplaylist': True,
'nocheckcertificate': True,
'ignoreerrors': True,
'logtostderr': False,
'quiet': True,
'verbose': False,
'no_warnings': True,
'default_search': 'auto',
'source_address': '0.0.0.0'
}
class Playlist:
# Init a playlist object
def __init__(self):
self.__queue = Queue()
# Add a song with Discords Voice Client and the Youtube Downloader
async def add(self, song_url: str, voice_client: discord.VoiceClient, user=None):
try:
song_player = await voice_client.create_ytdl_player(song_url, ytdl_options=ytdl_format_options)
except youtube_dl.DownloadError:
return youtube_dl.DownloadError
except youtube_dl.SameFileError:
return youtube_dl.SameFileError
except youtube_dl.utils.ExtractorError:
return youtube_dl.utils.ExtractorError
except youtube_dl.utils.UnavailableVideoError:
return youtube_dl.utils.UnavailableVideoError
user_name = "********"
if user is not None:
user_name = user.display_name
song = {"player": None, "url": song_player.url, "title": song_player.title,
"uploader": song_player.uploader, "user": user_name}
await self.__queue.put(song)
return song
# Get next song and delete it
async def pop(self, voice_client: discord.VoiceClient):
item = await self.__queue.get()
item['player'] = await voice_client.create_ytdl_player(item['url'], ytdl_options=ytdl_format_options)
return item
# Get if queue is empty
def empty(self):
return self.__queue.empty()
# Get the whole queue
def get(self):
f_list = list()
while not self.empty():
f_list.append(self.__queue.get_nowait())
for x in f_list:
self.__queue.put_nowait(x)
return f_list
# Get the queue length
def __len__(self):
return self.__queue.qsize()
# Shuffle the whole queue
def shuffle(self):
f_list = list()
while not self.empty():
f_list.append(self.__queue.get_nowait())
shuffle(f_list)
for x in f_list:
self.__queue.put_nowait(x)