-
Notifications
You must be signed in to change notification settings - Fork 0
/
proxy_queue.py
86 lines (69 loc) · 2.79 KB
/
proxy_queue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# -*- coding: utf-8 -*-
from proxy_util import logger
import redis
from ipproxy import IPProxy
from proxy_util import proxy_to_dict, proxy_from_dict, _is_proxy_available
from settings import PROXIES_REDIS_EXISTED, PROXIES_REDIS_FORMATTER, MAX_CONTINUOUS_TIMES, PROXY_CHECK_BEFOREADD
"""
Proxy Queue Base Class
"""
class BaseQueue(object):
def __init__(self, server):
"""Initialize the proxy queue instance
Parameters
----------
server : StrictRedis
Redis client instance
serializer : object
Serialization and deserialization by 'serialize' and 'deserialize' methods
"""
self.server = server
def _serialize_proxy(self, proxy):
"""Serialize proxy instance"""
return proxy_to_dict(proxy)
def _deserialize_proxy(self, serialized_proxy):
"""deserialize proxy instance"""
return proxy_from_dict(eval(serialized_proxy))
def __len__(self, schema='http'):
"""Return the length of the queue"""
raise NotImplementedError
def push(self, proxy, need_check):
"""Push a proxy"""
raise NotImplementedError
def pop(self, schema='http', timeout=0):
"""Pop a proxy"""
raise NotImplementedError
class FifoQueue(BaseQueue):
"""First in first out queue"""
def __len__(self, schema='http'):
"""Return the length of the queue"""
return self.server.llen(PROXIES_REDIS_FORMATTER.format(schema))
def push(self, proxy, need_check=PROXY_CHECK_BEFOREADD):
"""Push a proxy"""
if need_check and not _is_proxy_available(proxy):
return
elif proxy.continuous_failed < MAX_CONTINUOUS_TIMES and not self._is_existed(proxy):
key = PROXIES_REDIS_FORMATTER.format(proxy.schema)
self.server.rpush(key, self._serialize_proxy(proxy))
def pop(self, schema='http', timeout=0):
"""Pop a proxy"""
if timeout > 0:
p = self.server.blpop(PROXIES_REDIS_FORMATTER.format(schema.lower()), timeout)
if isinstance(p, tuple):
p = p[1]
else:
p = self.server.lpop(PROXIES_REDIS_FORMATTER.format(schema.lower()))
if p:
p = self._deserialize_proxy(p)
self.server.srem(PROXIES_REDIS_EXISTED, p._get_url())
return p
def _is_existed(self, proxy):
added = self.server.sadd(PROXIES_REDIS_EXISTED, proxy._get_url())
return added == 0
if __name__ == '__main__':
r = redis.StrictRedis(host='localhost', port=6379 ,password='root')
queue = FifoQueue(r)
# proxy = IPProxy('http', '218.66.253.144', '80')
# queue.push(proxy)
proxy = queue.pop(schema='http')
print(proxy._get_url())