diff --git a/bigjob/__init__.py b/bigjob/__init__.py index c0747128..af15df8b 100644 --- a/bigjob/__init__.py +++ b/bigjob/__init__.py @@ -56,7 +56,7 @@ #print("Set logging level: %s"%(logging_level)) logging.basicConfig(datefmt='%m/%d/%Y %I:%M:%S %p', - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') + format='%(asctime)s - %(name)s - %(levelname)s - %(filename)s: %(lineno)d - %(message)s') logger = logging.getLogger(name='bigjob') #logger.basicConfig(datefmt='%m/%d/%Y %I:%M:%S %p', diff --git a/bigjob/bigjob_agent.py b/bigjob/bigjob_agent.py index 9144c896..18eabb21 100644 --- a/bigjob/bigjob_agent.py +++ b/bigjob/bigjob_agent.py @@ -77,6 +77,7 @@ def __init__(self, args): self.freenodes = [] self.busynodes = [] self.restarted = {} + self.duUrls = {} # read config file conf_file = os.path.dirname(os.path.abspath( __file__ )) + "/../" + CONFIG_FILE @@ -879,19 +880,25 @@ def __stage_in_data_units(self, input_data=[], target_directory="."): for i in input_data: if type(i) is dict: for du_url,all_files in i.iteritems(): - logger.debug("Get files: " + str(all_files)) - du = DataUnit(du_url=du_url) + logger.debug("Get files: " + str(all_files)) + if du_url in self.duUrls: + du= self.duUrls[du_url] + else: + du = DataUnit(du_url=du_url) + logger.debug("Restored DU... call get state()") + logger.debug("DU State: " + du.get_state()) + du.wait() + logger.debug("Reconnected to DU. Exporting it now...") + du.export(target_directory, all_files) + else: + if i in self.duUrls: + du= self.duUrls[i] + else: + du = DataUnit(du_url=i) logger.debug("Restored DU... call get state()") logger.debug("DU State: " + du.get_state()) du.wait() logger.debug("Reconnected to DU. Exporting it now...") - du.export(target_directory, all_files) - else: - du = DataUnit(du_url=i) - logger.debug("Restored DU... call get state()") - logger.debug("DU State: " + du.get_state()) - du.wait() - logger.debug("Reconnected to DU. Exporting it now...") du.export(target_directory) except: diff --git a/pilot/coordination/redis_adaptor.py b/pilot/coordination/redis_adaptor.py index 2284a5d1..0f6638bb 100644 --- a/pilot/coordination/redis_adaptor.py +++ b/pilot/coordination/redis_adaptor.py @@ -1,12 +1,14 @@ from pilot import * from bigjob import logger from redis.client import Lock +import redis try: import json except ImportError: import simplejson as json + from saga import Url as SAGAUrl class RedisCoordinationAdaptor: @@ -21,6 +23,8 @@ class RedisCoordinationAdaptor: PILOT_DATA_SERVICE_PATH=PILOT_DATA_PATH + SEPARATOR + "pds" DATA_UNIT_SERVICE_PATH=PILOT_DATA_PATH + SEPARATOR +"dus" COMPUTE_DATA_SERVICE_PATH = PILOT_DATA_PATH + SEPARATOR + "cds" + + redis_client = None ########################################################################### # Construct a base url for an application @@ -228,23 +232,23 @@ def get_cds_url(cls, application_url, cds_id): # internal Redis-related methods @classmethod def __get_redis_api_client(cls): - import redis ''' Initialize Redis API Client ''' - saga_url = SAGAUrl(RedisCoordinationAdaptor.BASE_URL) - username = saga_url.username - server = saga_url.host - server_port = saga_url.port - if username==None or username=="": - redis_client = redis.Redis(host=server, port=server_port, db=0) - else: - redis_client = redis.Redis(host=server, port=server_port, password=username, db=0) - - try: - redis_client.ping() - except: - logger.error("Please start Redis server!") - raise Exception("Please start Redis server!") - return redis_client + if cls.redis_client == None: + saga_url = SAGAUrl(RedisCoordinationAdaptor.BASE_URL) + username = saga_url.username + server = saga_url.host + server_port = saga_url.port + if username==None or username=="": + cls.redis_client = redis.Redis(host=server, port=server_port, db=0) + else: + cls.redis_client = redis.Redis(host=server, port=server_port, password=username, db=0) + + try: + cls.redis_client.ping() + except: + logger.error("Please start Redis server!") + raise Exception("Please start Redis server!") + return cls.redis_client @classmethod @@ -296,6 +300,6 @@ def __retrieve_entry(cls, entry_url): redis_client = cls.__get_redis_api_client() content = redis_client.hgetall(entry_url) - logger.debug("Retrieve Redis entry at: " + entry_url - + " Content: " + str(json.dumps(content))) + #logger.debug("Retrieve Redis entry at: " + entry_url + # + " Content: " + str(json.dumps(content))) return content diff --git a/pilot/impl/pilotdata_manager.py b/pilot/impl/pilotdata_manager.py index 730e50a4..08cf9edd 100644 --- a/pilot/impl/pilotdata_manager.py +++ b/pilot/impl/pilotdata_manager.py @@ -508,7 +508,7 @@ def __init__(self, pilot_data=None, data_unit_description=None, du_url=None): else: self.id = DataUnit._get_du_id(du_url) self.url = du_url - logger.debug("Restore du: %s"%self.id) + logger.debug("Restore du: %s"%self.id) self.__restore_state() self.transfer_threads=[] @@ -521,17 +521,18 @@ def cancel(self): CoordinationAdaptor.update_du(self) - def add_files(self, file_url_list=[]): + def add_files(self, file_url_list=[], exists=False): """Add files referenced in list to Data Unit""" self._update_state(State.Pending) item_list = DataUnitItem.create_data_unit_from_urls(None, file_url_list) for i in item_list: self.data_unit_items.append(i) - CoordinationAdaptor.update_du(self) - if len(self.pilot_data) > 0: - for i in self.pilot_data: - logger.debug("Update Pilot Data %s"%(i.get_url())) - i.put_du(self) + CoordinationAdaptor.update_du(self) + if not exists: + if len(self.pilot_data) > 0: + for i in self.pilot_data: + logger.debug("Update Pilot Data %s"%(i.get_url())) + i.put_du(self) self._update_state(State.Running) CoordinationAdaptor.update_du(self) @@ -626,7 +627,7 @@ def export(self, target_url, all_files=None): """ if self.get_state()!=State.Running: self.wait() - self.__restore_state() + self.__restore_state() if len(self.pilot_data) > 0: # Search for PD that is close to local machine diff --git a/setup.py b/setup.py index f7e99fa7..e5cfc500 100644 --- a/setup.py +++ b/setup.py @@ -85,7 +85,7 @@ def get_version(): install_requires=['setuptools>2.0', 'uuid', 'threadpool', 'virtualenv', 'redis', 'radical.utils', 'saga-python', 'google-api-python-client', 'python-hostlist', - 'globusonline-transfer-api-client', 'boto>=2.2,<2.3', 'simplejson<2.1', 'pexpect', 'tldextract'], + 'globusonline-transfer-api-client', 'boto>=2.2,<2.3', 'simplejson', 'pexpect', 'tldextract'], entry_points = { 'console_scripts': [ 'test-bigjob = examples.example_local_single:main',