forked from pawel02/music_bot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
music_cog.py
185 lines (156 loc) · 5.65 KB
/
music_cog.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
from ast import alias
import discord
from discord.ext import commands
from yt_dlp import YoutubeDL
class music_cog(commands.Cog):
def __init__(self, bot):
self.bot = bot
# all the music related stuff
self.is_playing = False
self.is_paused = False
# 2d array containing [song, channel]
self.music_queue = []
self.YDL_OPTIONS = {
"format": "bestaudio",
"quiet": "true",
"noplaylist": "True",
"audioquality": "0",
"ignoreerrors": "True",
"no_warnings": "True",
"default_search": "auto"
}
self.FFMPEG_OPTIONS = {
"before_options": "-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5 -reconnect_at_eof 1 -reconnect_on_network_error 1 -multiple_requests 1",
"options": "-vn",
}
self.vc = None
# searching the item on youtube
def search_yt(self, item):
with YoutubeDL(self.YDL_OPTIONS) as ydl:
try:
info = ydl.extract_info("ytsearch:%s" % item, download=False)[
"entries"
][0]
except Exception:
return False
return {"source": info["url"], "title": info["fulltitle"]}
def play_next(self):
if len(self.music_queue) > 0:
self.is_playing = True
# get the first url
m_url = self.music_queue[0][0]["source"]
# remove the first element as you are currently playing it
self.music_queue.pop(0)
self.vc.play(
discord.FFmpegPCMAudio(m_url, **self.FFMPEG_OPTIONS),
after=lambda e: self.play_next(),
)
else:
self.is_playing = False
# infinite loop checking
async def play_music(self, ctx):
if len(self.music_queue) > 0:
self.is_playing = True
m_url = self.music_queue[0][0]["source"]
# try to connect to voice channel if you are not already connected
if self.vc == None or not self.vc.is_connected():
self.vc = await self.music_queue[0][1].connect()
# in case we fail to connect
if self.vc == None:
await ctx.send("Could not connect to the voice channel")
return
else:
await self.vc.move_to(self.music_queue[0][1])
# remove the first element as you are currently playing it
self.music_queue.pop(0)
self.vc.play(
discord.FFmpegPCMAudio(m_url, **self.FFMPEG_OPTIONS),
after=lambda e: self.play_next(),
)
else:
self.is_playing = False
@commands.command(
name="play",
aliases=["p", "playing"],
help="Plays a selected song from youtube",
)
async def play(self, ctx, *args):
query = " ".join(args)
voice_channel = ctx.author.voice.channel
if voice_channel is None:
# you need to be connected so that the bot knows where to go
await ctx.send("Dude, I can't play music until you CONNECT TO A VOICE CHANNEL!")
elif self.is_paused:
self.vc.resume()
else:
song = self.search_yt(query)
if type(song) == type(True):
await ctx.send(
"Could not download the song. Incorrect format try another keyword. This could be due to playlist or a livestream format."
)
else:
await ctx.send("Song added to the queue")
self.music_queue.append([song, voice_channel])
if self.is_playing == False:
await self.play_music(ctx)
@commands.command(
name="pause", help="Pauses the current song being played"
)
async def pause(self, ctx, *args):
if self.is_playing:
self.is_playing = False
self.is_paused = True
self.vc.pause()
elif self.is_paused:
self.is_paused = False
self.is_playing = True
self.vc.resume()
@commands.command(
name="resume",
aliases=["r"],
help="Resumes playing with the discord bot",
)
async def resume(self, ctx, *args):
if self.is_paused:
self.is_paused = False
self.is_playing = True
self.vc.resume()
@commands.command(
name="skip", aliases=["s"], help="Skips the current song being played"
)
async def skip(self, ctx):
if self.vc != None and self.vc:
self.vc.stop()
@commands.command(
name="queue", aliases=["q"], help="Displays the current songs in queue"
)
async def queue(self, ctx):
retval = ""
for i in range(0, len(self.music_queue)):
# display a max of 50 songs in the current queue
if i > 50:
break
retval += self.music_queue[i][0]["title"] + "\n"
if retval != "":
await ctx.send(retval)
else:
await ctx.send("No music in queue")
@commands.command(
name="clear",
aliases=["c", "bin"],
help="Stops the music and clears the queue",
)
async def clear(self, ctx):
if self.vc != None and self.is_playing:
self.vc.stop()
self.music_queue = []
await ctx.send("Music queue cleared")
@commands.command(
name="leave",
aliases=["disconnect", "l", "d"],
help="Kick the bot from VC",
)
async def dc(self, ctx):
self.is_playing = False
self.is_paused = False
await self.vc.disconnect()