-
Notifications
You must be signed in to change notification settings - Fork 8
BigJob Tutorial Part 8: Automated Data Staging Example
melrom edited this page Oct 3, 2012
·
7 revisions
This page is part of the BigJob Tutorial.
The example demonstrates the movement of data with the executable to ensure the successful execution of said executable. The Pilot-API is responsible for moving the necessary data to the executable working directory. This is useful when the executable has input file dependencies.
Create a test.txt file in $HOME directory
cat /etc/motd > $HOME/test.txt
In your $HOME directory, open a new file compute_data.py with your favorite editor (e.g., vim) and paste the following content:
mport sys
import os
import time
import logging
from pilot import PilotComputeService, PilotDataService,ComputeDataService, State
COORDINATION_URL = "redis://[email protected]:6379"
if __name__ == "__main__":
pilot_compute_service = PilotComputeService(coordination_url=COORDINATION_URL)
# create pilot job service and initiate a pilot job
pilot_compute_description = {
"service_url": 'pbs://localhost',
"number_of_processes": 12,
"working_directory": os.getenv("HOME")+"/agent",
"walltime":10,
'affinity_datacenter_label':"eu-de-south",
'affinity_machine_label': "mymachine-1"
}
pilotjob = pilot_compute_service.create_pilot(pilot_compute_description=pilot_compute_description)
# create pilot data service (factory for data pilots (physical,distributed storage))
# and pilot data
pilot_data_service = PilotDataService(coordination_url=COORDINATION_URL)
pilot_data_description={
"service_url":"ssh://localhost/"+os.getenv("HOME")+"/pilotdata",
"size": 100,
"affinity_datacenter_label":"eu-de-south",
"affinity_machine_label":"mymachine-1"
}
ps = pilot_data_service.create_pilot(pilot_data_description=pilot_data_description)
compute_data_service = ComputeDataService()
compute_data_service.add_pilot_compute_service(pilot_compute_service)
compute_data_service.add_pilot_data_service(pilot_data_service)
input_data_unit_description = { "file_urls": ["ssh://localhost" +os.path.join(os.getcwd(), "test.txt")],
"affinity_datacenter_label":"eu-de-south",
"affinity_machine_label": "mymachine-1"}
# submit pilot data to a pilot store
input_du = compute_data_service.submit_data_unit(input_data_unit_description)
input_du.wait()
### Create Output DU to store the output files.
output_data_unit_description = { "file_urls": [],
"affinity_datacenter_label":input_du.data_unit_description['affinity_datacenter_label'],
"affinity_machine_label":input_du.data_unit_description['affinity_machine_label']}
# submit pilot data to a pilot store
output_du = compute_data_service.submit_data_unit(output_data_unit_description)
output_du.wait()
# start compute unit
compute_unit_description = {
"executable": "/bin/cat",
"arguments": ["test.txt"],
"number_of_processes": 1,
"output": "stdout.txt",
"error": "stderr.txt",
"input_data" : [input_du.get_url()], # this stages the content of the data unit to the working directory of the compute unit
"output_data":[{ output_du.get_url(): ['std*']} ]
}
compute_unit = compute_data_service.submit_compute_unit(compute_unit_description)
logging.debug("Finished setup of PSS and PDS. Waiting for scheduling of PD")
compute_data_service.wait()
## export the output files to local directory.
output_du.export(os.getcwd())
logging.debug("Terminate Pilot Compute/Data Service")
compute_data_service.cancel()
pilot_data_service.cancel()
pilot_compute_service.cancel()
Execute the script using command
python compute_data.py
Can you extend the script to use multiple Pilot-Jobs and see how data is moved along with compute unit?
Hint: use mulitple_pilotjobs.py
example