This repository has been archived by the owner on Feb 16, 2024. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 41
/
server.py
129 lines (110 loc) · 4.21 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# -*- coding: utf-8 -*-
import http.server
import urllib.parse
from urllib.parse import urlparse
from helpers import FILE_DIR
from smarthome import *
# import cgi
reqHandler = SmartHomeReqHandler()
get_mappings = {}
post_mappings = {}
mappings = {}
def addGetMappings(path, func):
get_mappings[path] = func
def addPostMappings(path, func):
post_mappings[path] = func
class AogServer(http.server.BaseHTTPRequestHandler):
global reqHandler
global mappings
# Handler for the GET requests
def do_GET(self):
# print(self.headers)
self.process_Headers()
try:
get_mappings[self.only_path](reqHandler, self)
except Exception as error:
self.send_message(404, "Page not found!: %s" % error)
# Handler for the POST requests
def do_POST(self):
# print(self.headers)
self.form = {}
self.body = self.rfile.read(int(self.headers['content-length'])).decode('utf-8')
try:
if 'x-www-form-urlencoded' in self.headers['content-type']:
un = urllib.parse.unquote(self.body)
self.form = dict(qc.split("=") for qc in un.split("&"))
except (ValueError, Exception):
pass
self.process_Headers()
# self.form = cgi.FieldStorage(fp=self.rfile, headers=self.headers, environ={'REQUEST_METHOD': 'POST'})
try:
post_mappings[self.only_path](reqHandler, self)
except Exception as error:
self.send_message(404, "Page not found!: %s" % error)
def send_message(self, code, msg, headers=None, b=False):
if self.path.endswith(".jpg"):
f = open(FILE_DIR + self.path, 'rb')
self.send_response(200)
self.send_header('Content-type', 'image/jpg')
self.end_headers()
self.wfile.write(f.read())
f.close()
return
elif self.path.endswith(".png"):
f = open(FILE_DIR + self.path, 'rb')
self.send_response(200)
self.send_header('Content-type', 'image/png')
self.end_headers()
self.wfile.write(f.read())
f.close()
return
elif self.path.endswith(".js"):
f = open(FILE_DIR + self.path, 'rb')
self.send_response(200)
self.send_header('Content-type', 'text/javascript')
self.end_headers()
self.wfile.write(f.read())
f.close()
return
elif self.path.endswith(".css"):
f = open(FILE_DIR + self.path, 'rb')
self.send_response(200)
self.send_header('Content-type', 'text/css')
self.end_headers()
self.wfile.write(f.read())
f.close()
return
self.send_response(code)
self.send_header('Content-type', 'text/html; charset=utf-8')
if headers is not None:
for k, v in headers.items():
self.send_header(k, v)
if self.isNewCookie:
self.send_header('Set-Cookie', self.cookie.OutputString())
self.end_headers()
if not b:
self.wfile.write(bytes(msg, "utf-8"))
else:
self.wfile.write(msg)
def send_json(self, code, json_req, b=False):
self.send_response(code)
self.send_header('Content-type', 'application/json; charset=utf-8')
self.end_headers()
if not b:
self.wfile.write(bytes(json_req, "utf-8"))
else:
self.wfile.write(json_req)
def redirect(self, path, httptype=307):
self.send_response(httptype) # 307 Temporary redirect
self.send_header('Location', path)
self.end_headers()
def process_Headers(self):
self.isNewCookie, self.cookie = reqHandler._session_cookie(self)
self.url = urlparse(self.path)
self.only_path = self.url.path
try:
self.query = self.url.query
self.query_components = dict(qc.split("=") for qc in self.query.split("&"))
except (ValueError, Exception):
self.query = ""
self.query_components = {}