-
Notifications
You must be signed in to change notification settings - Fork 8
BigJob Tutorial Part 8: Automated Data Staging Example
This page is part of the BigJob Tutorial.
The example demonstrates the movement of data with the executable to ensure the successful execution of said executable. The Pilot-API is responsible for moving the necessary data to the executable working directory. This is useful when the executable has input file dependencies.
Create a test.txt file in $HOME directory
cat /etc/motd > $HOME/test.txt
In your $HOME directory, open a new file compute_data.py with your favorite editor (e.g., vim) and paste the following content. The example assumes that you want to run BJ on a PBS resource:
"service_url": 'pbs://localhost'
Please modify the URL to the resource URL that you would like to use, e.g. fork://localhost
.
Example:
import sys
import os
import time
import logging
from pilot import PilotComputeService, PilotDataService,ComputeDataService, State
COORDINATION_URL = "redis://[email protected]:6379"
if __name__ == "__main__":
pilot_compute_service = PilotComputeService(coordination_url=COORDINATION_URL)
# create pilot job service and initiate a pilot job
pilot_compute_description = {
"service_url": 'pbs://localhost',
"number_of_processes": 12,
"working_directory": os.getenv("HOME")+"/agent",
"walltime":10,
'affinity_datacenter_label':"eu-de-south",
'affinity_machine_label': "mymachine-1"
}
pilotjob = pilot_compute_service.create_pilot(pilot_compute_description=pilot_compute_description)
# create pilot data service (factory for data pilots (physical,distributed storage))
# and pilot data
pilot_data_service = PilotDataService(coordination_url=COORDINATION_URL)
pilot_data_description={
"service_url":"ssh://localhost/"+os.getenv("HOME")+"/pilotdata",
"size": 100,
"affinity_datacenter_label":"eu-de-south",
"affinity_machine_label":"mymachine-1"
}
ps = pilot_data_service.create_pilot(pilot_data_description=pilot_data_description)
compute_data_service = ComputeDataService()
compute_data_service.add_pilot_compute_service(pilot_compute_service)
compute_data_service.add_pilot_data_service(pilot_data_service)
input_data_unit_description = { "file_urls": ["ssh://localhost" +os.path.join(os.getcwd(), "test.txt")],
"affinity_datacenter_label":"eu-de-south",
"affinity_machine_label": "mymachine-1"}
# submit pilot data to a pilot store
input_du = compute_data_service.submit_data_unit(input_data_unit_description)
input_du.wait()
### Create Output DU to store the output files.
output_data_unit_description = { "file_urls": [],
"affinity_datacenter_label":input_du.data_unit_description['affinity_datacenter_label'],
"affinity_machine_label":input_du.data_unit_description['affinity_machine_label']}
# submit pilot data to a pilot store
output_du = compute_data_service.submit_data_unit(output_data_unit_description)
output_du.wait()
# start compute unit
compute_unit_description = {
"executable": "/bin/cat",
"arguments": ["test.txt"],
"number_of_processes": 1,
"output": "stdout.txt",
"error": "stderr.txt",
"input_data" : [input_du.get_url()], # this stages the content of the data unit to the working directory of the compute unit
"output_data":[{ output_du.get_url(): ['std*']} ]
}
compute_unit = compute_data_service.submit_compute_unit(compute_unit_description)
logging.debug("Finished setup of PSS and PDS. Waiting for scheduling of PD")
compute_data_service.wait()
## export the output files to local directory.
output_du.export(os.getcwd())
logging.debug("Terminate Pilot Compute/Data Service")
compute_data_service.cancel()
pilot_data_service.cancel()
pilot_compute_service.cancel()
Execute the script using command
python compute_data.py
Can you extend the script to use multiple Pilot-Jobs and see how data is moved along with compute unit?
Hint: use mulitple_pilotjobs.py
example