-
Notifications
You must be signed in to change notification settings - Fork 11
/
loadtest.py
89 lines (67 loc) · 2.71 KB
/
loadtest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import os
import sys
curr_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.split(curr_dir)[0])
import time
import argparse
import asyncio as aio
from asyncio_pool import AioPool, getres
async def loadtest_spawn(tasks, pool_size, duration):
futures = []
async with AioPool(size=pool_size) as pool:
for i in range(tasks):
fut = await pool.spawn(aio.sleep(duration))
futures.append(fut)
return [getres.flat(fut) for fut in futures]
async def loadtest_spawn_n(tasks, pool_size, duration):
futures = []
async with AioPool(size=pool_size) as pool:
for i in range(tasks):
fut = pool.spawn_n(aio.sleep(duration))
futures.append(fut)
return [getres.flat(f) for f in futures]
async def loadtest_map(tasks, pool_size, duration):
async def wrk(i):
await aio.sleep(duration)
async with AioPool(size=pool_size) as pool:
return await pool.map(wrk, range(tasks))
async def loadtest_itermap(tasks, pool_size, duration):
async def wrk(i):
await aio.sleep(duration)
results = []
async with AioPool(size=pool_size) as pool:
async for res in pool.itermap(wrk, range(tasks)):
results.append(res)
return results
def print_stats(args, exec_time):
ideal = args.task_duration * (args.tasks / args.pool_size)
overhead = exec_time - ideal
per_task = overhead / args.tasks
overhead_perc = ((exec_time / ideal) - 1) * 100
print(f'{ideal:15.5f}s -- ideal result')
print(f'{exec_time:15.5f}s -- total executing time')
print(f'{overhead:15.5f}s -- total overhead')
print(f'{per_task:15.5f}s -- overhead per task')
print(f'{overhead_perc:13.3f}% -- overhead total percent')
if __name__ == "__main__":
methods = {
'spawn': loadtest_spawn,
'spawn_n': loadtest_spawn_n,
'map': loadtest_map,
'itermap': loadtest_itermap,
}
p = argparse.ArgumentParser()
p.add_argument('method', choices=methods.keys())
p.add_argument('--tasks', '-t', type=int, default=10**5)
p.add_argument('--task-duration', '-d', type=float, default=0.2)
p.add_argument('--pool-size', '-p', type=int, default=10**3)
args = p.parse_args()
print('>>> Running %d tasks in pool of size=%s, each task takes %.3f sec.' %
(args.tasks, args.pool_size, args.task_duration))
print('>>> This will run more than %.5f seconds' %
(args.task_duration * (args.tasks / args.pool_size)))
ts_start = time.perf_counter()
m = methods.get(args.method)(args.tasks, args.pool_size, args.task_duration)
aio.get_event_loop().run_until_complete(m)
exec_time = time.perf_counter() - ts_start
print_stats(args, exec_time)