Skip to content

BigJob Tutorial Part 8: Automated Data Staging Example

melrom edited this page Oct 3, 2012 · 7 revisions

This page is part of the BigJob Tutorial.


Compute-Data

The example demonstrates the movement of data with the executable to ensure the successful execution of said executable. The Pilot-API is responsible for moving the necessary data to the executable working directory. This is useful when the executable has input file dependencies.

Create a test.txt file in $HOME directory cat /etc/motd > $HOME/test.txt

In your $HOME directory, open a new file compute_data.py with your favorite editor (e.g., vim) and paste the following content:

mport sys
import os
import time
import logging
from pilot import PilotComputeService, PilotDataService,ComputeDataService, State

COORDINATION_URL = "redis://[email protected]:6379"

if __name__ == "__main__":

    pilot_compute_service = PilotComputeService(coordination_url=COORDINATION_URL)

    # create pilot job service and initiate a pilot job
    pilot_compute_description = {
                             "service_url": 'pbs://localhost',
                             "number_of_processes": 12,
                             "working_directory": os.getenv("HOME")+"/agent",
                             "walltime":10,
                             'affinity_datacenter_label':"eu-de-south",
                             'affinity_machine_label': "mymachine-1"
                             }

    pilotjob = pilot_compute_service.create_pilot(pilot_compute_description=pilot_compute_description)
    # create pilot data service (factory for data pilots (physical,distributed storage))
    # and pilot data
    pilot_data_service = PilotDataService(coordination_url=COORDINATION_URL)
    pilot_data_description={
                                "service_url":"ssh://localhost/"+os.getenv("HOME")+"/pilotdata",
                                "size": 100,
                                "affinity_datacenter_label":"eu-de-south",
                                "affinity_machine_label":"mymachine-1"
                           }
    ps = pilot_data_service.create_pilot(pilot_data_description=pilot_data_description)

    compute_data_service = ComputeDataService()
    compute_data_service.add_pilot_compute_service(pilot_compute_service)
    compute_data_service.add_pilot_data_service(pilot_data_service)

    input_data_unit_description = { "file_urls": ["ssh://localhost" +os.path.join(os.getcwd(), "test.txt")],
                                    "affinity_datacenter_label":"eu-de-south",
                                    "affinity_machine_label": "mymachine-1"}

    # submit pilot data to a pilot store
    input_du = compute_data_service.submit_data_unit(input_data_unit_description)
    input_du.wait()


    ### Create Output DU to store the output files.
    output_data_unit_description = { "file_urls": [],
                                     "affinity_datacenter_label":input_du.data_unit_description['affinity_datacenter_label'],
                                     "affinity_machine_label":input_du.data_unit_description['affinity_machine_label']}

    # submit pilot data to a pilot store
    output_du = compute_data_service.submit_data_unit(output_data_unit_description)
    output_du.wait()


    # start compute unit
    compute_unit_description = {
            "executable": "/bin/cat",
            "arguments": ["test.txt"],
            "number_of_processes": 1,
            "output": "stdout.txt",
            "error": "stderr.txt",
            "input_data" : [input_du.get_url()], # this stages the content of the data unit to the working directory of the compute unit
            "output_data":[{ output_du.get_url(): ['std*']} ]
    }

    compute_unit = compute_data_service.submit_compute_unit(compute_unit_description)

    logging.debug("Finished setup of PSS and PDS. Waiting for scheduling of PD")
    compute_data_service.wait()

    ## export the output files to local directory.
    output_du.export(os.getcwd())

    logging.debug("Terminate Pilot Compute/Data Service")
    compute_data_service.cancel()
    pilot_data_service.cancel()
    pilot_compute_service.cancel()

Execute the script using command

python compute_data.py

Can you extend the script to use multiple Pilot-Jobs and see how data is moved along with compute unit? Hint: use mulitple_pilotjobs.py example