Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Register existing duf iles #180

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bigjob/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@

#print("Set logging level: %s"%(logging_level))
logging.basicConfig(datefmt='%m/%d/%Y %I:%M:%S %p',
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
format='%(asctime)s - %(name)s - %(levelname)s - %(filename)s: %(lineno)d - %(message)s')
logger = logging.getLogger(name='bigjob')

#logger.basicConfig(datefmt='%m/%d/%Y %I:%M:%S %p',
Expand Down
25 changes: 16 additions & 9 deletions bigjob/bigjob_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def __init__(self, args):
self.freenodes = []
self.busynodes = []
self.restarted = {}
self.duUrls = {}

# read config file
conf_file = os.path.dirname(os.path.abspath( __file__ )) + "/../" + CONFIG_FILE
Expand Down Expand Up @@ -879,19 +880,25 @@ def __stage_in_data_units(self, input_data=[], target_directory="."):
for i in input_data:
if type(i) is dict:
for du_url,all_files in i.iteritems():
logger.debug("Get files: " + str(all_files))
du = DataUnit(du_url=du_url)
logger.debug("Get files: " + str(all_files))
if du_url in self.duUrls:
du= self.duUrls[du_url]
else:
du = DataUnit(du_url=du_url)
logger.debug("Restored DU... call get state()")
logger.debug("DU State: " + du.get_state())
du.wait()
logger.debug("Reconnected to DU. Exporting it now...")
du.export(target_directory, all_files)
else:
if i in self.duUrls:
du= self.duUrls[i]
else:
du = DataUnit(du_url=i)
logger.debug("Restored DU... call get state()")
logger.debug("DU State: " + du.get_state())
du.wait()
logger.debug("Reconnected to DU. Exporting it now...")
du.export(target_directory, all_files)
else:
du = DataUnit(du_url=i)
logger.debug("Restored DU... call get state()")
logger.debug("DU State: " + du.get_state())
du.wait()
logger.debug("Reconnected to DU. Exporting it now...")
du.export(target_directory)

except:
Expand Down
40 changes: 22 additions & 18 deletions pilot/coordination/redis_adaptor.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from pilot import *
from bigjob import logger
from redis.client import Lock
import redis

try:
import json
except ImportError:
import simplejson as json


from saga import Url as SAGAUrl

class RedisCoordinationAdaptor:
Expand All @@ -21,6 +23,8 @@ class RedisCoordinationAdaptor:
PILOT_DATA_SERVICE_PATH=PILOT_DATA_PATH + SEPARATOR + "pds"
DATA_UNIT_SERVICE_PATH=PILOT_DATA_PATH + SEPARATOR +"dus"
COMPUTE_DATA_SERVICE_PATH = PILOT_DATA_PATH + SEPARATOR + "cds"

redis_client = None

###########################################################################
# Construct a base url for an application
Expand Down Expand Up @@ -228,23 +232,23 @@ def get_cds_url(cls, application_url, cds_id):
# internal Redis-related methods
@classmethod
def __get_redis_api_client(cls):
import redis
''' Initialize Redis API Client '''
saga_url = SAGAUrl(RedisCoordinationAdaptor.BASE_URL)
username = saga_url.username
server = saga_url.host
server_port = saga_url.port
if username==None or username=="":
redis_client = redis.Redis(host=server, port=server_port, db=0)
else:
redis_client = redis.Redis(host=server, port=server_port, password=username, db=0)

try:
redis_client.ping()
except:
logger.error("Please start Redis server!")
raise Exception("Please start Redis server!")
return redis_client
if cls.redis_client == None:
saga_url = SAGAUrl(RedisCoordinationAdaptor.BASE_URL)
username = saga_url.username
server = saga_url.host
server_port = saga_url.port
if username==None or username=="":
cls.redis_client = redis.Redis(host=server, port=server_port, db=0)
else:
cls.redis_client = redis.Redis(host=server, port=server_port, password=username, db=0)

try:
cls.redis_client.ping()
except:
logger.error("Please start Redis server!")
raise Exception("Please start Redis server!")
return cls.redis_client


@classmethod
Expand Down Expand Up @@ -296,6 +300,6 @@ def __retrieve_entry(cls, entry_url):
redis_client = cls.__get_redis_api_client()
content = redis_client.hgetall(entry_url)

logger.debug("Retrieve Redis entry at: " + entry_url
+ " Content: " + str(json.dumps(content)))
#logger.debug("Retrieve Redis entry at: " + entry_url
# + " Content: " + str(json.dumps(content)))
return content
17 changes: 9 additions & 8 deletions pilot/impl/pilotdata_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ def __init__(self, pilot_data=None, data_unit_description=None, du_url=None):
else:
self.id = DataUnit._get_du_id(du_url)
self.url = du_url
logger.debug("Restore du: %s"%self.id)
logger.debug("Restore du: %s"%self.id)
self.__restore_state()

self.transfer_threads=[]
Expand All @@ -521,17 +521,18 @@ def cancel(self):
CoordinationAdaptor.update_du(self)


def add_files(self, file_url_list=[]):
def add_files(self, file_url_list=[], exists=False):
"""Add files referenced in list to Data Unit"""
self._update_state(State.Pending)
item_list = DataUnitItem.create_data_unit_from_urls(None, file_url_list)
for i in item_list:
self.data_unit_items.append(i)
CoordinationAdaptor.update_du(self)
if len(self.pilot_data) > 0:
for i in self.pilot_data:
logger.debug("Update Pilot Data %s"%(i.get_url()))
i.put_du(self)
CoordinationAdaptor.update_du(self)
if not exists:
if len(self.pilot_data) > 0:
for i in self.pilot_data:
logger.debug("Update Pilot Data %s"%(i.get_url()))
i.put_du(self)
self._update_state(State.Running)
CoordinationAdaptor.update_du(self)

Expand Down Expand Up @@ -626,7 +627,7 @@ def export(self, target_url, all_files=None):
"""
if self.get_state()!=State.Running:
self.wait()
self.__restore_state()
self.__restore_state()

if len(self.pilot_data) > 0:
# Search for PD that is close to local machine
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def get_version():

install_requires=['setuptools>2.0', 'uuid', 'threadpool', 'virtualenv', 'redis',
'radical.utils', 'saga-python', 'google-api-python-client', 'python-hostlist',
'globusonline-transfer-api-client', 'boto>=2.2,<2.3', 'simplejson<2.1', 'pexpect', 'tldextract'],
'globusonline-transfer-api-client', 'boto>=2.2,<2.3', 'simplejson', 'pexpect', 'tldextract'],
entry_points = {
'console_scripts': [
'test-bigjob = examples.example_local_single:main',
Expand Down