Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert to Python3 #181

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions bigjob/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#READ config

try:
import ConfigParser
import configparser
_CONFIG_FILE="bigjob.conf"
_conf_file = os.path.dirname(os.path.abspath( __file__ )) + "/../" + _CONFIG_FILE
if not os.path.exists(_conf_file):
Expand All @@ -18,7 +18,7 @@

#print "using conf file: " + str(_conf_file)

_config = ConfigParser.ConfigParser()
_config = configparser.ConfigParser()
_config.read(_conf_file)
default_dict = _config.defaults()

Expand All @@ -41,7 +41,7 @@
else:
# 4 = DEBUG + INFO + WARNING + ERROR
if BIGJOB_VERBOSE >= 4:
print "set to DEBUG"
print("set to DEBUG")
logging_level = logging.DEBUG
# 3 = INFO + WARNING + ERROR
elif BIGJOB_VERBOSE == 3:
Expand Down
95 changes: 95 additions & 0 deletions bigjob/__init__.py.bak
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import os
import sys
import logging
import traceback
version = "latest"

#from pkg_resources import Requirement, resource_filename

#READ config

try:
import ConfigParser
_CONFIG_FILE="bigjob.conf"
_conf_file = os.path.dirname(os.path.abspath( __file__ )) + "/../" + _CONFIG_FILE
if not os.path.exists(_conf_file):
_conf_file = os.path.join(sys.prefix, _CONFIG_FILE)


#print "using conf file: " + str(_conf_file)

_config = ConfigParser.ConfigParser()
_config.read(_conf_file)
default_dict = _config.defaults()

####################################################
# logging
logging_level = logging.FATAL
BIGJOB_VERBOSE=None
try:
BIGJOB_VERBOSE = int(os.getenv('BIGJOB_VERBOSE'))
#print("BIGJOB_VERBOSE: %d"%BIGJOB_VERBOSE)
except Exception:
pass

if BIGJOB_VERBOSE==None: # use logging level defined in config file
#print "Read log level from bigjob.conf"
level = default_dict["logging.level"]
#print("Logging level: %s"%level)
if level.startswith("logging."):
logging_level = eval(level)
else:
# 4 = DEBUG + INFO + WARNING + ERROR
if BIGJOB_VERBOSE >= 4:
print "set to DEBUG"
logging_level = logging.DEBUG
# 3 = INFO + WARNING + ERROR
elif BIGJOB_VERBOSE == 3:
logging_level = logging.INFO
# 2 = WARNING + ERROR
elif BIGJOB_VERBOSE == 2:
logging_level = logging.WARNING
# 1 = ERROR ONLY
elif BIGJOB_VERBOSE == 1:
logging_level = logging.ERROR


#print("Set logging level: %s"%(logging_level))
logging.basicConfig(datefmt='%m/%d/%Y %I:%M:%S %p',
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(name='bigjob')

#logger.basicConfig(datefmt='%m/%d/%Y %I:%M:%S %p',
# format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger.setLevel(logging_level)

paramiko_logger = logging.getLogger(name="paramiko.transport")
paramiko_logger.setLevel(logging.ERROR)
#logging.basicConfig(level=logging_level)

except:
print("bigjob.conf could not be read")
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback.print_exc(limit=1, file=sys.stdout)

import socket
try:
fn = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", 'VERSION')
if not os.path.exists(fn):
fn = os.path.join(sys.prefix, 'VERSION')
#print "Open Version file: " + str(fn)
version = open(fn).read().strip()
logger.info("Loading BigJob version: " + version + " on " + socket.gethostname())
except IOError:
pass



# define external-facing API
from bigjob.bigjob_manager import bigjob
from bigjob.bigjob_manager import subjob
try:
from bigjob.bigjob_manager import description
except:
pass

64 changes: 32 additions & 32 deletions bigjob/bigjob_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import time
import pdb
import traceback
import ConfigParser
import configparser
import types
import logging
import shutil
Expand Down Expand Up @@ -83,26 +83,26 @@ def __init__(self, args):
if not os.path.exists(conf_file):
conf_file = os.path.join(sys.prefix, CONFIG_FILE)
logging.debug ("read configfile: " + conf_file)
config = ConfigParser.ConfigParser()
config = configparser.ConfigParser()
config.read(conf_file)
default_dict = config.defaults()
self.CPR=False
if default_dict.has_key("cpr"):
if "cpr" in default_dict:
self.CPR = default_dict["cpr"]
self.SHELL="/bin/bash"
if default_dict.has_key("shell"):
if "shell" in default_dict:
self.SHELL=default_dict["shell"]
self.MPIRUN="mpirun"
# On TACC resources the default MPICH is
# linked under mpirun_rsh
if default_dict.has_key("mpirun"):
if "mpirun" in default_dict:
self.MPIRUN=default_dict["mpirun"]

if default_dict.has_key("number_executor_threads"):
if "number_executor_threads" in default_dict:
THREAD_POOL_SIZE=int(default_dict["number_executor_threads"])

self.OUTPUT_TAR=False
if default_dict.has_key("create_output_tar"):
if "create_output_tar" in default_dict:
self.OUTPUT_TAR=eval(default_dict["create_output_tar"])
logger.debug("Create output tar: %r", self.OUTPUT_TAR)

Expand Down Expand Up @@ -181,7 +181,7 @@ def __init__(self, args):
############################################################################
# Detect launch method
self.LAUNCH_METHOD="ssh"
if default_dict.has_key("launch_method"):
if "launch_method" in default_dict:
self.LAUNCH_METHOD=default_dict["launch_method"]

self.LAUNCH_METHOD=self.__get_launch_method(self.LAUNCH_METHOD)
Expand All @@ -194,7 +194,7 @@ def __init__(self, args):
##############################################################################
# start background thread for polling new jobs and monitoring current jobs
# check whether user requested a certain threadpool size
if self.pilot_description!=None and self.pilot_description.has_key("number_executor_threads"):
if self.pilot_description!=None and "number_executor_threads" in self.pilot_description:
THREAD_POOL_SIZE=int(self.pilot_description["number_executor_threads"])
logger.debug("Creating executor thread pool of size: %d"%(THREAD_POOL_SIZE))
self.resource_lock=threading.RLock()
Expand Down Expand Up @@ -306,7 +306,7 @@ def init_pbs(self):

# get number of requested slots from pilot description
number_of_requested_processes = self.pilot_description["number_of_processes"]
if os.environ.has_key("PBS_NNODES"):
if "PBS_NNODES" in os.environ:
# use PBS assigned node count if available
number_nodes = os.environ.get("PBS_NNODES")
else:
Expand All @@ -333,7 +333,7 @@ def init_pbs(self):
node_dict[i] = num_cpus

self.freenodes=[]
for i in node_dict.keys():
for i in list(node_dict.keys()):
logger.debug("host: " + i + " nodes: " + str(node_dict[i]))
for j in range(0, node_dict[i]):
logger.debug("add host: " + i.strip())
Expand Down Expand Up @@ -362,22 +362,22 @@ def execute_job(self, job_url, job_dict):
logger.debug("Start job id %s specification %s: "%(job_id, str(job_dict)))
numberofprocesses = "1"
try:
if (job_dict.has_key("NumberOfProcesses") == True):
if (("NumberOfProcesses" in job_dict) == True):
numberofprocesses = job_dict["NumberOfProcesses"]
except:
pass # ignore in particular if Bliss is used

spmdvariation="single"
try:
if (job_dict.has_key("SPMDVariation") == True):
if (("SPMDVariation" in job_dict) == True):
spmdvariation = job_dict["SPMDVariation"]
except:
pass # ignore in particular if Bliss is used

arguments = ""
if (job_dict.has_key("Arguments") == True):
if (("Arguments" in job_dict) == True):
arguments_raw = job_dict['Arguments'];
if type(arguments_raw) == types.ListType:
if type(arguments_raw) == list:
arguments_list = arguments_raw
else:
arguments_list = eval(job_dict["Arguments"])
Expand All @@ -387,9 +387,9 @@ def execute_job(self, job_url, job_dict):
environment = os.environ
envi = ""
self.number_subjobs=1
if (job_dict.has_key("Environment") == True):
if (("Environment" in job_dict) == True):
env_raw = job_dict['Environment']
if type(env_raw) == types.ListType:
if type(env_raw) == list:
env_list = env_raw
else:
env_list = eval(job_dict["Environment"])
Expand All @@ -411,7 +411,7 @@ def execute_job(self, job_url, job_dict):
executable = self.__expand_directory(executable)

workingdirectory = os.path.join(os.getcwd(), job_id)
if (job_dict.has_key("WorkingDirectory") == True):
if (("WorkingDirectory" in job_dict) == True):
workingdirectory = job_dict["WorkingDirectory"]
workingdirectory = self.__expand_directory(workingdirectory)
try:
Expand All @@ -421,13 +421,13 @@ def execute_job(self, job_url, job_dict):
logging.debug("Sub-Job: %s, Working_directory: %s"%(job_id, workingdirectory))

output="stdout"
if (job_dict.has_key("Output") == True):
if (("Output" in job_dict) == True):
output = job_dict["Output"]
if not os.path.isabs(output):
output=os.path.join(workingdirectory, output)

error=os.path.join(workingdirectory,"stderr")
if (job_dict.has_key("Error") == True):
if (("Error" in job_dict) == True):
error = job_dict["Error"]
if not os.path.isabs(error):
error=os.path.join(workingdirectory, error)
Expand Down Expand Up @@ -457,22 +457,22 @@ def execute_job(self, job_url, job_dict):

#######################################################################################################
# File Stage-In of dependent data units
if job_dict.has_key("InputData"):
if "InputData" in job_dict:
self.coordination.set_job_state(job_url, str(bigjob.state.Staging))
self.__stage_in_data_units(eval(job_dict["InputData"]), workingdirectory)

# File Stage-In - Move pilot-level files to working directory of sub-job
if self.pilot_description!=None:
try:
if self.pilot_description.has_key("description"):
if "description" in self.pilot_description:
file_list = eval(self.pilot_description["description"])
if file_list != None and len(file_list)>0:
logger.debug("Copy %d files to SJ work dir"%len(file_list)>0)
for i in file_list:
logger.debug("Process file: %s"%i)
if i.find(">")>0:
base_filename = os.path.basename(i[:i.index(">")].strip())
if environment.has_key("_CONDOR_SCRATCH_DIR"):
if "_CONDOR_SCRATCH_DIR" in environment:
source_filename = os.path.join(environment["_CONDOR_SCRATCH_DIR"], base_filename)
else:
source_filename = os.path.join(self.work_dir, base_filename)
Expand Down Expand Up @@ -598,7 +598,7 @@ def setup_charmpp_nodefile(self, allocated_nodes):

nodefile_string=""
for i in allocated_nodes:
if i.has_key("private_hostname"):
if "private_hostname" in i:
nodefile_string=nodefile_string + "host "+ i["private_hostname"] + " ++cpus " + str(i["cpu_count"]) + " ++shell ssh\n"
else:
nodefile_string=nodefile_string + "host "+ i["hostname"] + " ++cpus " + str(i["cpu_count"]) + " ++shell ssh\n"
Expand Down Expand Up @@ -728,7 +728,7 @@ def monitor_jobs(self):
#pdb.set_trace()
logger.debug("Monitor jobs - # current jobs: %d"%len(self.jobs))
for i in self.jobs:
if self.processes.has_key(i): # only if job has already been starteds
if i in self.processes: # only if job has already been starteds
p = self.processes[i]
p_state = p.poll()
logger.debug(self.print_job(i) + " state: " + str(p_state) + " return code: " + str(p.returncode))
Expand All @@ -738,9 +738,9 @@ def monitor_jobs(self):
# Handle stage-out
self.update_output_file() # for Condor case
job_dict = self.coordination.get_job(i) # for Pilot Data case
if job_dict.has_key("OutputData"):
if "OutputData" in job_dict:
workingdirectory = os.path.join(os.getcwd(), job_dict["job-id"])
if (job_dict.has_key("WorkingDirectory") == True):
if (("WorkingDirectory" in job_dict) == True):
workingdirectory = job_dict["WorkingDirectory"]
workingdirectory = self.__expand_directory(workingdirectory)
self.__stage_out_data_units(eval(job_dict["OutputData"]), workingdirectory)
Expand Down Expand Up @@ -816,7 +816,7 @@ def is_stopped(self, base_url):
except:
pass
logger.debug("Pilot State: " + str(state))
if state==None or state.has_key("stopped")==False or state["stopped"]==True:
if state==None or ("stopped" in state)==False or state["stopped"]==True:
return True
else:
return False
Expand Down Expand Up @@ -863,7 +863,7 @@ def __stage_out_data_units(self, output_data=[], workingdirectory=None):
try:
for data_unit_dict in output_data:
logger.debug("Process: " + str(data_unit_dict))
for du_url in data_unit_dict.keys(): # go through all dicts (each representing 1 PD)
for du_url in list(data_unit_dict.keys()): # go through all dicts (each representing 1 PD)
#pd_url = self.__get_pd_url(du_url)
#pilot_data = PilotData(pd_url=pd_url)
#du = pilot_data.get_du(du_url)
Expand Down Expand Up @@ -977,9 +977,9 @@ def __get_launch_method(self, requested_method):

def __print_traceback(self):
exc_type, exc_value, exc_traceback = sys.exc_info()
print "*** print_tb:"
print("*** print_tb:")
traceback.print_tb(exc_traceback, limit=1, file=sys.stderr)
print "*** print_exception:"
print("*** print_exception:")
traceback.print_exception(exc_type, exc_value, exc_traceback,
limit=2, file=sys.stderr)

Expand All @@ -990,7 +990,7 @@ def __print_traceback(self):
args = sys.argv
num_args = len(args)
if (num_args<3):
print "Usage: \n " + args[0] + " <coordination host url> <coordination namespace url>"
print("Usage: \n " + args[0] + " <coordination host url> <coordination namespace url>")
sys.exit(1)

bigjob_agent = bigjob_agent(args)
Expand Down
Loading