Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/walltime sampling #8

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 17 additions & 16 deletions bin/radical-synapse-sample
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,24 @@ import radical.synapse.utils as rsu

# ------------------------------------------------------------------------------
#
def run_samples (url, cpu_flops, sto_in, sto_out, mem_rss, n_samples):
def run_samples (url, time, cpu_flops, sto_in, sto_out, mem_rss, n_samples):

samples = list()

# assume 1 sample per second -- but that is not interpreted anyway...
for n in range(n_samples):

# assume 1 sample per second -- but that is not interpreted anyway...
time = n+1
# append TIME sample
samples.append(['time', float(n+1), [time]])

# append CPU sample (set efficiency to 1)
samples.append(['cpu', float(time), [cpu_flops, 1]])
samples.append(['cpu', float(n+1), [cpu_flops, 1]])

# append STO sample
samples.append(['sto', float(time), [sto_in, sto_out]])
samples.append(['sto', float(n+1), [sto_in, sto_out]])

# append MEM sample
samples.append(['mem', float(time), [mem_rss]])
samples.append(['mem', float(n+1), [mem_rss]])

info, ret, out = rs.emulate (samples=samples)
pprint.pprint (info)
Expand All @@ -42,8 +43,9 @@ def usage (msg=None, noexit=False):
print "\n Error: %s" % msg

print """
usage : %s -m <mode> [-f <flops>] [-i <input>] [-o <output>]
[-r <memory>] [-s <samples>] [-u <url>]
usage : %s -m <mode> [-t <seconds>] [-f <flops>] [-i <input>]
[-o <output>] [-r <memory>] [-s <samples>]
[-u <url>]

examples : %s -m sample -f 10000000

Expand All @@ -55,6 +57,7 @@ def usage (msg=None, noexit=False):

arguments :

-t : number of seconds to spend sleeping
-f : number of flops to emulate
-i : number of bytes to read from disk
-o : number of bytes to write to disk
Expand All @@ -67,7 +70,7 @@ def usage (msg=None, noexit=False):

The default mode is 'sample'.

""" % (sys.argv[0], sys.argv[0], sys.argv[0], sys.argv[0])
""" % (sys.argv[0], sys.argv[0])

if msg:
sys.exit (1)
Expand All @@ -84,6 +87,7 @@ if __name__ == '__main__':
parser = argparse.ArgumentParser (add_help=False)

parser.add_argument('-m', '--mode', dest='mode')
parser.add_argument('-t', '--time', dest='time')
parser.add_argument('-f', '--flops', dest='cpu_flops')
parser.add_argument('-i', '--input', dest='sto_in')
parser.add_argument('-o', '--output', dest='sto_out')
Expand All @@ -96,18 +100,21 @@ if __name__ == '__main__':

mode = arguments.mode
url = arguments.url
time = arguments.time
cpu_flops = arguments.cpu_flops
sto_in = arguments.sto_in
sto_out = arguments.sto_out
mem_rss = arguments.mem_rss
samples = arguments.samples

if not time : time = 0.0
if not cpu_flops : cpu_flops = 0.0
if not sto_in : sto_in = 0.0
if not sto_out : sto_out = 0.0
if not mem_rss : mem_rss = 0.0
if not samples : samples = 1

time = float(time )
cpu_flops = float(cpu_flops)
sto_in = float(sto_in )
sto_out = float(sto_out )
Expand All @@ -126,13 +133,7 @@ if __name__ == '__main__':
if not url:
url = os.environ.get ('RADICAL_SYNAPSE_DBURL')

if not cpu_flops : cpu_flops = 0.0
if not sto_in : sto_in = 0.0
if not sto_out : sto_out = 0.0
if not mem_rss : mem_rss = 0.0
if not samples : samples = 1

if mode == 'sample' : run_samples(url, cpu_flops, sto_in, sto_out, mem_rss, samples)
if mode == 'sample' : run_samples(url, time, cpu_flops, sto_in, sto_out, mem_rss, samples)
elif mode == 'help' : usage(noexit=True)
else : usage("unknown mode '%s'" % mode)

Expand Down
24 changes: 13 additions & 11 deletions src/radical/synapse/atoms/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@


from constants import UNKNOWN, COMPUTE, MEMORY, STORAGE, NETWORK
from ._atoms import atom_compute_asm
from ._atoms import atom_compute
from ._atoms import atom_time
from ._atoms import atom_memory
from ._atoms import atom_storage
from ._atoms import atom_network

from base import AtomBase
from compute import Compute
from memory import Memory
from storage import Storage
from network import Network
from .constants import UNKNOWN, TIME, COMPUTE, MEMORY, STORAGE, NETWORK

from _atoms import atom_compute_asm
from _atoms import atom_compute
from _atoms import atom_memory
from _atoms import atom_storage
from _atoms import atom_network
from .base import AtomBase
from .timer import Time
from .compute import Compute
from .memory import Memory
from .storage import Storage
from .network import Network

17 changes: 17 additions & 0 deletions src/radical/synapse/atoms/atoms.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ static char module_docstring[] =
/* Available functions */
static PyObject *atom_compute_asm (PyObject * self, PyObject * args);
static PyObject *atom_compute (PyObject * self, PyObject * args);
static PyObject *atom_time (PyObject * self, PyObject * args);
static PyObject *atom_memory (PyObject * self, PyObject * args);
static PyObject *atom_storage (PyObject * self, PyObject * args);
static PyObject *atom_network (PyObject * self, PyObject * args);
Expand All @@ -27,6 +28,7 @@ static PyObject *atom_network (PyObject * self, PyObject * args);
static PyMethodDef module_methods[] = {
{"atom_compute_asm", atom_compute_asm, METH_VARARGS, NULL},
{"atom_compute", atom_compute , METH_VARARGS, NULL},
{"atom_time", atom_time , METH_VARARGS, NULL},
{"atom_memory", atom_memory , METH_VARARGS, NULL},
{"atom_storage", atom_storage , METH_VARARGS, NULL},
{"atom_network", atom_network , METH_VARARGS, NULL},
Expand Down Expand Up @@ -76,6 +78,21 @@ atom_compute (PyObject * self, PyObject * args)
}


/* -----------------------------------------------------------------------------
*/
static PyObject *
atom_time (PyObject * self, PyObject * args)
{
double time = 0.0;

if ( ! PyArg_ParseTuple (args, "d", &time) )
return NULL;

_atom_time(time);
Py_RETURN_NONE;
}


/* -----------------------------------------------------------------------------
*/
static PyObject *
Expand Down
1 change: 1 addition & 0 deletions src/radical/synapse/atoms/atoms.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

int _atom_compute_asm (long flops);
int _atom_compute (long flops);
int _atom_time (double time);
int _atom_memory (long size);
int _atom_storage (const char* src,
long rsize,
Expand Down
2 changes: 0 additions & 2 deletions src/radical/synapse/atoms/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
import radical.utils.logger as rul
import radical.utils.signatures as rus

from constants import UNKNOWN, COMPUTE, STORAGE, NETWORK


# ------------------------------------------------------------------------------
#
Expand Down
1 change: 1 addition & 0 deletions src/radical/synapse/atoms/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# Atom Types
#
UNKNOWN = 'unknown'
TIME = 'time'
COMPUTE = 'compute'
COMPUTE = 'compute_asm'
MEMORY = 'memory'
Expand Down
52 changes: 52 additions & 0 deletions src/radical/synapse/atoms/synapse_time.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@

#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/resource.h>

#define PROFILE 0

int _atom_time(double time)
{
usleep(time * 1000 * 1000); /* usleep wants microseconds */

if ( PROFILE )
{
struct rusage ru;

if ( 0 != getrusage (RUSAGE_SELF, &ru) )
{
fprintf (stderr, "no ru: %s", strerror (errno));
return (1);
}

fprintf (stdout, "ru.utime : %ld.%ld\n", ru.ru_utime.tv_sec,
ru.ru_utime.tv_usec ); /* user CPU time used */
fprintf (stdout, "ru.stime : %ld.%ld\n", ru.ru_stime,
ru.ru_stime.tv_usec ); /* system CPU time used */
fprintf (stdout, "ru.maxrss : %ld\n", ru.ru_maxrss*1024 ); /* maximum resident set size */
fprintf (stdout, "ru.ixrss : %ld\n", ru.ru_ixrss ); /* integral shared memory size */
fprintf (stdout, "ru.idrss : %ld\n", ru.ru_idrss ); /* integral unshared data size */
fprintf (stdout, "ru.isrss : %ld\n", ru.ru_isrss ); /* integral unshared stack size */
fprintf (stdout, "ru.minflt : %ld\n", ru.ru_minflt ); /* page reclaims (soft page faults) */
fprintf (stdout, "ru.majflt : %ld\n", ru.ru_majflt ); /* page faults (hard page faults) */
fprintf (stdout, "ru.nswap : %ld\n", ru.ru_nswap ); /* swaps */
fprintf (stdout, "ru.inblock : %ld\n", ru.ru_inblock ); /* block input operations */
fprintf (stdout, "ru.outblock : %ld\n", ru.ru_oublock ); /* block output operations */
fprintf (stdout, "ru.msgsnd : %ld\n", ru.ru_msgsnd ); /* IPC messages sent */
fprintf (stdout, "ru.msgrcv : %ld\n", ru.ru_msgrcv ); /* IPC messages received */
fprintf (stdout, "ru.nsignals : %ld\n", ru.ru_nsignals ); /* signals received */
fprintf (stdout, "ru.nvcsw : %ld\n", ru.ru_nvcsw ); /* voluntary context switches */
fprintf (stdout, "ru.nivcsw : %ld\n", ru.ru_nivcsw ); /* involuntary context switches */
}

return 0;
}

51 changes: 51 additions & 0 deletions src/radical/synapse/atoms/timer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@

__author__ = "Andre Merzky"
__copyright__ = "Copyright 2013, The SAGA Project"
__license__ = "LGPL.v3"


import radical.utils.signatures as rus

from _atoms import atom_time
from base import AtomBase
from constants import TIME

# ------------------------------------------------------------------------------
#
class Time(AtomBase):
"""
This Time Synapse emulates a an applications walltime behavior without
consuming resources.
"""

# --------------------------------------------------------------------------
#
@rus.takes ('Time')
@rus.returns(rus.nothing)
def __init__(self):

AtomBase.__init__(self, TIME)


# --------------------------------------------------------------------------
#
@rus.takes ('Time', list)
@rus.returns(rus.nothing)
def emulate (self, vals):

print 'time: %s' % vals

self._run(vals[0])


# --------------------------------------------------------------------------
#
@rus.takes ('Time', float)
@rus.returns(rus.nothing)
def _emulate(self, ops):

atom_time(ops)


#-------------------------------------------------------------------------------

16 changes: 11 additions & 5 deletions src/radical/synapse/synapse.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ def profile (command, *args, **kwargs) :
_CPU = 'cpu'
_MEM = 'mem'
_STO = 'sto'
_TIM = 'time'

_TYPE = 0
_TIME = 1
Expand All @@ -213,6 +214,7 @@ def _emulator (samples) :
state = dict() # there is at most one atom for each type in 'state'

# create atoms for all sample types
atoms[_TIM] = rsa.Time ()
atoms[_CPU] = rsa.Compute ()
atoms[_MEM] = rsa.Memory ()
atoms[_STO] = rsa.Storage ()
Expand Down Expand Up @@ -297,12 +299,16 @@ def emulate(command=None, samples=None):
pprint.pprint (prof)

# get time series to emulate (all types of operations are mixed)
# FIXME: we should also sample walltime for _TIM. As it is, mixing
# time and other samples will yield incorrect results due to
# mismatch in granularity.
samples = list()
samples += [[_CPU, x[0], [x[1].get('ops', 0),
x[1].get('efficiency', 0)]] for x in prof['cpu']['sequence']]
samples += [[_MEM, x[0], [x[1].get('size', 0)]] for x in prof['mem']['sequence']]
samples += [[_STO, x[0], [x[1].get('read', 0),
x[1].get('write', 0)]] for x in prof['sto']['sequence']]
samples += [[_TIM, x[0], [x[1].get('real', 0.0)]] for x in prof['time']]
samples += [[_CPU, x[0], [x[1].get('ops', 0) ,
x[1].get('efficiency', 0) ]] for x in prof['cpu']['sequence']]
samples += [[_MEM, x[0], [x[1].get('size', 0) ]] for x in prof['mem']['sequence']]
samples += [[_STO, x[0], [x[1].get('read', 0) ,
x[1].get('write', 0) ]] for x in prof['sto']['sequence']]


# sort samples by time
Expand Down