Skip to content

Commit

Permalink
add manyuser branch
Browse files Browse the repository at this point in the history
support udp over tcp
support chacha20 & salsa20 (base on libsodium)
  • Loading branch information
breakwa11 committed Jun 8, 2015
1 parent e001f18 commit f2efed9
Show file tree
Hide file tree
Showing 18 changed files with 1,327 additions and 57 deletions.
12 changes: 12 additions & 0 deletions Config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#Config
MYSQL_HOST = 'mdss.mengsky.net'
MYSQL_PORT = 3306
MYSQL_USER = 'ss'
MYSQL_PASS = 'ss'
MYSQL_DB = 'shadowsocks'

MANAGE_PASS = 'ss233333333'
#if you want manage in other server you should set this value to global ip
MANAGE_BIND_IP = '127.0.0.1'
#make sure this port is idle
MANAGE_PORT = 23333
1 change: 1 addition & 0 deletions __init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#!/usr/bin/python
100 changes: 100 additions & 0 deletions asyncmgr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Copyright (c) 2014 clowwindy
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

import time
import os
import socket
import struct
import re
import logging
from shadowsocks import common
from shadowsocks import lru_cache
from shadowsocks import eventloop
import server_pool
import Config

class ServerMgr(object):

def __init__(self):
self._loop = None
self._request_id = 1
self._hosts = {}
self._hostname_status = {}
self._hostname_to_cb = {}
self._cb_to_hostname = {}
self._last_time = time.time()
self._sock = None
self._servers = None

def add_to_loop(self, loop):
if self._loop:
raise Exception('already add to loop')
self._loop = loop
# TODO when dns server is IPv6
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM,
socket.SOL_UDP)
self._sock.bind((Config.MANAGE_BIND_IP, Config.MANAGE_PORT))
self._sock.setblocking(False)
loop.add(self._sock, eventloop.POLL_IN)
loop.add_handler(self.handle_events)

def _handle_data(self, sock):
data, addr = sock.recvfrom(128)
#manage pwd:port:passwd:action
args = data.split(':')
if len(args) < 4:
return
if args[0] == Config.MANAGE_PASS:
if args[3] == '0':
server_pool.ServerPool.get_instance().cb_del_server(args[1])
elif args[3] == '1':
server_pool.ServerPool.get_instance().new_server(args[1], args[2])

def handle_events(self, events):
for sock, fd, event in events:
if sock != self._sock:
continue
if event & eventloop.POLL_ERR:
logging.error('mgr socket err')
self._loop.remove(self._sock)
self._sock.close()
# TODO when dns server is IPv6
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM,
socket.SOL_UDP)
self._sock.setblocking(False)
self._loop.add(self._sock, eventloop.POLL_IN)
else:
self._handle_data(sock)
break

def close(self):
if self._sock:
self._sock.close()
self._sock = None


def test():
pass

if __name__ == '__main__':
test()
12 changes: 12 additions & 0 deletions config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"server":"0.0.0.0",
"server_ipv6": "::",
"server_port":8388,
"local_address": "127.0.0.1",
"local_port":1080,
"password":"m",
"timeout":300,
"method":"aes-256-cfb",
"fast_open": false,
"workers": 1
}
148 changes: 148 additions & 0 deletions db_transfer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
#!/usr/bin/python
# -*- coding: UTF-8 -*-

import logging
import cymysql
import time
import sys
from server_pool import ServerPool
import Config

class DbTransfer(object):

instance = None

def __init__(self):
self.last_get_transfer = {}

@staticmethod
def get_instance():
if DbTransfer.instance is None:
DbTransfer.instance = DbTransfer()
return DbTransfer.instance

def push_db_all_user(self):
#更新用户流量到数据库
last_transfer = self.last_get_transfer
curr_transfer = ServerPool.get_instance().get_servers_transfer()
#上次和本次的增量
dt_transfer = {}
for id in curr_transfer.keys():
if id in last_transfer:
if last_transfer[id][0] == curr_transfer[id][0] and last_transfer[id][1] == curr_transfer[id][1]:
continue
elif curr_transfer[id][0] == 0 and curr_transfer[id][1] == 0:
continue
elif last_transfer[id][0] <= curr_transfer[id][0] and \
last_transfer[id][1] <= curr_transfer[id][1]:
dt_transfer[id] = [curr_transfer[id][0] - last_transfer[id][0],
curr_transfer[id][1] - last_transfer[id][1]]
else:
dt_transfer[id] = [curr_transfer[id][0], curr_transfer[id][1]]
else:
if curr_transfer[id][0] == 0 and curr_transfer[id][1] == 0:
continue
dt_transfer[id] = [curr_transfer[id][0], curr_transfer[id][1]]

self.last_get_transfer = curr_transfer
query_head = 'UPDATE user'
query_sub_when = ''
query_sub_when2 = ''
query_sub_in = None
last_time = time.time()
for id in dt_transfer.keys():
query_sub_when += ' WHEN %s THEN u+%s' % (id, dt_transfer[id][0])
query_sub_when2 += ' WHEN %s THEN d+%s' % (id, dt_transfer[id][1])
if query_sub_in is not None:
query_sub_in += ',%s' % id
else:
query_sub_in = '%s' % id
if query_sub_when == '':
return
query_sql = query_head + ' SET u = CASE port' + query_sub_when + \
' END, d = CASE port' + query_sub_when2 + \
' END, t = ' + str(int(last_time)) + \
' WHERE port IN (%s)' % query_sub_in
#print query_sql
conn = cymysql.connect(host=Config.MYSQL_HOST, port=Config.MYSQL_PORT, user=Config.MYSQL_USER,
passwd=Config.MYSQL_PASS, db=Config.MYSQL_DB, charset='utf8')
cur = conn.cursor()
cur.execute(query_sql)
cur.close()
conn.commit()
conn.close()

@staticmethod
def pull_db_all_user():
#数据库所有用户信息
conn = cymysql.connect(host=Config.MYSQL_HOST, port=Config.MYSQL_PORT, user=Config.MYSQL_USER,
passwd=Config.MYSQL_PASS, db=Config.MYSQL_DB, charset='utf8')
cur = conn.cursor()
cur.execute("SELECT port, u, d, transfer_enable, passwd, switch, enable, plan FROM user")
rows = []
for r in cur.fetchall():
rows.append(list(r))
cur.close()
conn.close()
return rows

@staticmethod
def del_server_out_of_bound_safe(last_rows, rows):
#停止超流量的服务
#启动没超流量的服务
#需要动态载入switchrule,以便实时修改规则
cur_servers = {}
for row in rows:
try:
import switchrule
allow = switchrule.isTurnOn(row[7], row[5]) and row[6] == 1 and row[1] + row[2] < row[3]
except Exception, e:
allow = False

cur_servers[row[0]] = row[4]

if ServerPool.get_instance().server_is_run(row[0]) > 0:
if not allow:
logging.info('db stop server at port [%s]' % (row[0]))
ServerPool.get_instance().del_server(row[0])
elif (row[0] in ServerPool.get_instance().tcp_servers_pool and ServerPool.get_instance().tcp_servers_pool[row[0]]._config['password'] != row[4]) \
or (row[0] in ServerPool.get_instance().tcp_ipv6_servers_pool and ServerPool.get_instance().tcp_ipv6_servers_pool[row[0]]._config['password'] != row[4]):
#password changed
logging.info('db stop server at port [%s] reason: password changed' % (row[0]))
ServerPool.get_instance().del_server(row[0])
elif ServerPool.get_instance().server_run_status(row[0]) is False:
if allow:
logging.info('db start server at port [%s] pass [%s]' % (row[0], row[4]))
ServerPool.get_instance().new_server(row[0], row[4])

for row in last_rows:
if row[0] in cur_servers:
if row[4] == cur_servers[row[0]]:
pass
else:
logging.info('db stop server at port [%s] reason: port not exist' % (row[0]))
ServerPool.get_instance().del_server(row[0])

@staticmethod
def thread_db():
import socket
import time
timeout = 60
socket.setdefaulttimeout(timeout)
last_rows = []
while True:
#logging.warn('db loop')

try:
DbTransfer.get_instance().push_db_all_user()
rows = DbTransfer.get_instance().pull_db_all_user()
DbTransfer.del_server_out_of_bound_safe(last_rows, rows)
last_rows = rows
except Exception as e:
logging.warn('db thread except:%s' % e)
finally:
time.sleep(15)


#SQLData.pull_db_all_user()
#print DbTransfer.get_instance().test()
22 changes: 22 additions & 0 deletions server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time
import sys
import thread
import os
os.chdir(os.path.split(os.path.realpath(__file__))[0])

import server_pool
import db_transfer

#def test():
# thread.start_new_thread(DbTransfer.thread_db, ())
# Api.web_server()

if __name__ == '__main__':
#server_pool.ServerPool.get_instance()
#server_pool.ServerPool.get_instance().new_server(2333, '2333')
thread.start_new_thread(db_transfer.DbTransfer.thread_db, ())
while True:
time.sleep(99999)
Loading

0 comments on commit f2efed9

Please sign in to comment.