-
Notifications
You must be signed in to change notification settings - Fork 2
/
gmail.py
288 lines (253 loc) · 12.6 KB
/
gmail.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
import getpass # for password
import os
import time
# Default values for click position
xpos = 0 # xposition of the click
ypos = 0 # yposition of the click
# query = {"query_term": "", "no_of_times": 0}
# tabs = {"Primary": 0, "Social": 0, "Promotions": 0, "Forums": 0}
# is_otp_enabled = false
# skip_first_mail = 0
# Function that initializes the Chrome driver
def init_driver():
# setting up chromedriver
chromedriver = "/home/siddharth/python_scripts/gmail_cleaner/chromedriver"
os.environ["webdriver.chrome.driver"] = chromedriver
driver = webdriver.Chrome(chromedriver)
driver.wait = WebDriverWait(driver, 30) # 20 seconds of polling for page wait
driver.implicitly_wait(30) # 20 seconds of polling for every page wait #alternative code for above line
return driver
# Function to take in user preferences for settings
def set_preferences(driver):
# User account details
username = input("Enter gmail id:")
password = getpass.getpass("Enter password:")
# set preferences
ansotp = input("Is 2 step verification enabled using OTP for this site (Y/N)")
is_otp_enabled = (True if ansotp == 'Y' else False) # if using OTP in account
# tabs specifies the number of times delete action must be performed on each tab
tabs = {"Primary": 0, "Social": 1, "Promotions": 0, "Forums": 0, "Updates": 0}
try:
skip_first_mail = input("How many initial mail pages to skip?")
skip_first_mail = int(skip_first_mail) # Convert to int
ans = input("Would you like to delete Tabs(T) or Query(Q)")
if ans == 'Q':
try:
query = {"query_term": "", "no_of_times": 0} # Initializing query
query["query_term"] = input("Enter Search Query: ")
try:
query["no_of_times"] = int(
input("Enter Number of times to repeat action: ")) # Convert input into integer
except ValueError:
raise ValueError
except Exception as e:
raise e
driver = run_login(driver, username, password, is_otp_enabled) # invoke run_login()
return run_lookup(driver, query, skip_first_mail) # invoke run_lookup()
except ValueError:
raise ValueError
elif ans == 'T':
# Give selection for Tabs
for i in tabs.keys():
ans = input("Would you like to delete mails in " + i + "tab (Y/N)? :")
if ans == 'Y':
try:
tabs[i] = input("Enter Number of times to repeat action for " + i + " tab :")
except ValueError:
raise ValueError
except Exception as e:
raise e
else:
tabs[i] = 0 # don't execute action
print("Executing Tabs:", tabs)
driver = run_login(driver, username, password, is_otp_enabled) # invoke run_login()
return run_tabs(driver, tabs, skip_first_mail) # invoke run_tabs() function
else:
print("ERROR: Incorrect response")
time.sleep(3) # Wait for user
driver.close() # Close the driver
exit()
except Exception as e:
skip_first_mail = 0
raise e
# ACTION DEFINITIONS
# Perform skip first few pages of mails
def action_skip_few_mails(driver, skip_first_mail):
try:
for x in range(skip_first_mail):
# Clicking on Older Button
# elem = driver.find_element_by_xpath("//div[@aria-label='Older']") # alternative code for the below line
elem = driver.wait.until(
EC.presence_of_element_located((By.XPATH, "//div[@role='button' and @aria-label='Older']")))
ac = ActionChains(driver)
ac.move_to_element(elem).move_by_offset(xpos, ypos).click().perform()
print("Click on Older mail button has been performed!")
time.sleep(3) # Wait for page to reload
except TimeoutError:
print("Could not perform skip mails action")
raise TimeoutError
except Exception as e:
raise e
return driver
# Perform select all action
def action_selectall_mails(driver):
try:
# Clicking on select all mails
# elem = driver.find_element_by_xpath("//span[@role='checkbox']") #alternative code for the below line
# elem = driver.wait.until(EC.presence_of_element_located((By.XPATH, "//span[@role='checkbox']"))) #alternative code
size = len(driver.find_elements_by_xpath("//span[@role='checkbox']"))
print("size:", size)
elem = driver.find_elements_by_xpath("//span[@role='checkbox']")
elem[size - 1].click() # perform click
# ac = ActionChains(driver) #alternative code for above line
# ac.move_to_element(elem).move_by_offset(xpos, ypos).click().perform() #alternative code for above line
print("Click on select all mails has been performed!")
except TimeoutError:
print("Couldn't perform select_all action")
raise TimeoutError
except Exception as e:
raise e
return driver
# Perform delete button click action
def action_delete_mails(driver):
try:
# Clicking on Delete Button
# elem = driver.find_element_by_xpath("//div[@aria-label='Delete']") #alternative code for the below line
elem = driver.wait.until(EC.presence_of_element_located((By.XPATH, "//div[@aria-label='Delete']")))
ac = ActionChains(driver)
ac.move_to_element(elem).move_by_offset(xpos, ypos).click().perform()
print("Click on Delete button has been performed!")
except TimeoutError:
print("Couldn't perform delete action")
raise TimeoutError
except Exception as e:
raise e
return driver
# RUN DEFINITIONS
# Function to login into account
def run_login(driver, username, password, is_otp_enabled):
try:
# Loading website
driver.get("http://mail.google.com")
# assert "Gmail Login" in driver.title
# elem = driver.find_element_by_name("identifier") #alternative code for the below line
elem = driver.wait.until(EC.presence_of_element_located((By.NAME, "identifier")))
elem.send_keys(username)
print("Username entered")
# elem = driver.find_element_by_id("identifierNext") #alternative code for the below line
elem = driver.wait.until(EC.presence_of_element_located((By.ID, "identifierNext")))
elem.click()
print("Next button pressed")
# elem = driver.find_element_by_name("password") #alternative code for the below line
elem = driver.wait.until(EC.presence_of_element_located((By.NAME, "password")))
elem.send_keys(password)
print("Password Entered")
elem.send_keys(Keys.RETURN)
print("Login button pressed")
if (is_otp_enabled == True):
otp = input("Please enter OTP: ")
wait = WebDriverWait(driver, 2)
# elem = driver.find_element_by_name("idvPin") #alternative code for the below line
elem = driver.wait.until(EC.presence_of_element_located((By.NAME, "idvPin")))
elem.send_keys(otp)
elem.send_keys(Keys.RETURN)
print("OK button pressed")
except TimeoutError:
raise TimeoutError
except Exception as e:
raise e
return driver
# Function to lookup a search term and delete mails from results returned
def run_lookup(driver, query, skip_first_mail):
try:
# Search Query
# elem = driver.find_element_by_xpath("//input[@aria-label='Search']") # alternative code for the below line
elem = driver.wait.until(EC.presence_of_element_located((By.XPATH, "//input[@aria-label='Search']")))
elem.send_keys(query["query_term"])
elem.send_keys(Keys.RETURN)
print("Search Query has been executed!")
time.sleep(10) # Wait for some time
driver = action_skip_few_mails(driver, skip_first_mail) # skip first mail action
time.sleep(3)
driver.switch_to_default_content()
for x in range(query["no_of_times"]):
driver = action_selectall_mails(driver) # select all click action
time.sleep(5)
driver = action_delete_mails(driver) # delete button click action
except TimeoutError:
raise TimeoutError
except Exception as e:
raise e
return driver
# Function to delete mails from under particular tabs
def run_tabs(driver, tabs, skip_first_mail):
# Run for all respective tabs
try:
for i in tabs.keys():
if int(tabs[i]) > 0:
try:
if i == "Social":
# Clicking on Social Tab
# elem = driver.find_element_by_xpath("//div[@aria-label='Social']") #alternative code for the below line
elem = driver.wait.until(
EC.presence_of_element_located((By.XPATH, "//div[@aria-label='Social']")))
ac = ActionChains(driver)
ac.move_to_element(elem).move_by_offset(xpos, ypos).click().perform()
print("Click on Social Tab has been performed!")
elif i == "Promotions":
# Clicking on Promotions Tab
# elem = driver.find_element_by_xpath("//div[@aria-label='Promotions']") #alternative code for the below line
elem = driver.wait.until(
EC.presence_of_element_located((By.XPATH, "//div[@aria-label='Promotions']")))
ac = ActionChains(driver)
ac.move_to_element(elem).move_by_offset(xpos, ypos).click().perform()
print("Click on Promotions Tab has been performed!")
elif i == "Updates":
# Clicking on Updates Tab
# elem = driver.find_element_by_xpath("//div[@aria-label='Updates']") #alternative code for the below line
elem = driver.wait.until(
EC.presence_of_element_located((By.XPATH, "//div[@aria-label='Updates']")))
ac = ActionChains(driver)
ac.move_to_element(elem).move_by_offset(xpos, ypos).click().perform()
print("Click on Updates Tab has been performed!")
elif i == "Forums":
# Clicking on Forums Tab
# elem = driver.find_element_by_xpath("//div[@aria-label='Forums']") #alternative code for the below line
elem = driver.wait.until(
EC.presence_of_element_located((By.XPATH, "//div[@aria-label='Forums']")))
ac = ActionChains(driver)
ac.move_to_element(elem).move_by_offset(xpos, ypos).click().perform()
print("Click on Forums Tab has been performed!")
elif i == "Primary":
# Clicking on Promotions Tab
# elem = driver.find_element_by_xpath("//div[@aria-label='Primary']") #alternative code for the below line
elem = driver.wait.until(
EC.presence_of_element_located((By.XPATH, "//div[@aria-label='Primary']")))
ac = ActionChains(driver)
ac.move_to_element(elem).move_by_offset(xpos, ypos).click().perform()
print("Click on Primary Tab has been performed!")
driver = action_skip_few_mails(driver, skip_first_mail) # skip first mail action
time.sleep(2)
for j in range(int(tabs[i])):
driver = action_selectall_mails(driver) # select all mail action
time.sleep(2)
driver = action_delete_mails(driver) # delete mail action
time.sleep(5) # Wait for page to load
except TimeoutError:
raise TimeoutError
except Exception as e:
raise e
except Exception as e:
raise e
return driver
if __name__ == "__main__":
driver = init_driver()
set_preferences(driver)
time.sleep(5)
driver.quit()