Skip to content

3.1 Combining modules

Cove Sturtevant edited this page Mar 8, 2023 · 2 revisions

One major lesson we've learned in developing the new IS pipeline is that while modularity is fantastic for code re-usability, pipeline flexibility, and standardized algorithm implementation, there are performance costs with increasing modularity (resource usage, system response times, and pipeline run times). One solution to retaining code modularity while moderating costs is to perform multiple code modules in a single Pachyderm pipeline.

Combining modules is easy. There are two main aspects to this approach:

  • Create a docker image with both code modules included. This can even include both R and Python modules.
  • Run the modules sequentially in the pipeline spec, placing interim outputs into interim directories. Only the last module should place outputs in /pfs/out.

While the approach is simple, there are some rules you need to follow:

  • You CANNOT place interim directories or files in any subdirectory of /pfs. If you do, Pachyderm will continually reprocess the same input datum (why? who knows).
  • You CAN create interim directories or files elsewhere in the container directory space so long as the user that the docker image is run as has the requisite permissions. Users and their permissions are created in the dockerfile.
  • You CAN create a file in an interim directory and symbolically link it to /pfs/out with success.
  • You CANNOT symbolically link a directory or file from the input repo (i.e. /pfs/<input repo>) to the interim directory and then symbolically link from the interim directory to /pfs/out. No daisy chaining of symbol links.
  • You CAN copy or symbolically link a directory or file from the input pfs to the interim directory. Then you CAN perform a copy or symbolic link from the interim directory to /pfs/out. But, if one of those is a symbolic link, the other must be a copy.
  • You CAN symbolically link from the input pfs to an interim directory, then perform a straight copy of that interim directory to another interim directory, then symbolically link from the second interim directory to /pfs/out!!!! When copying links to the interim directory, make sure to use the -L flag to break the symbolic links upon copy.
  • Make sure to delete the interim directory or file between datums because (oddly) the interim directory is retained across datums. If you do not delete it between datums, you will continue to add to the interim directory over time and likely write the same file to /pfs/out multiple times, which results in error.
  • If you symbolically link from the interim directory to /pfs/out, you must delete the interim directory at the beginning of the code. If you delete it at the end, you destroy the link before pachyderm can link to the output repo.
  • When placing data in interim directories, pay attention to what modules expect the repository structure to look like. The two main expectations are:
    • Python modules typically rely on the same path positions among all inputs (e.g. the path index of the year, month, etc.)
    • R modules typically rely on 'pfs' being in the path structure, in the format .../pfs/repo_name/repo_contents...

Common sense module sequences have already been combined, such as joining data with calibrations and then performing the calibration conversion in the same pipeline. Using this combined module as an example, let's look at the relevant portions of the pipeline specs for the individual modules for the hmp155 source type, then we'll look at the combined pipeline spec. The specs have been annotated to point out important things.

hmp155_data_calibration_group (grouping data with calibrations)

---
pipeline:
  name: hmp155_data_calibration_group                                     
transform:
  image_pull_secrets: [battelleecology-quay-read-all-pull-secret]
  image: quay.io/battelleecology/filter_joiner:11                       <-- image for the filter-joiner module
  cmd: ["/bin/bash"]
  stdin:
  - "#!/bin/bash"
  - "python3 -m filter_joiner.filter_joiner_main"                       <-  Executes the filter-joiner module
  env:
    CONFIG: |                                                           <-- The input configuration for the filter-joiner module (below)
      ---
      # In Pachyderm root will be index 0, 'pfs' index 1.
      # Metadata indices will typically begin at index 3.
      input_paths:
        - path:
            name: DATA_PATH
            # Filter for data directory
            glob_pattern: /pfs/DATA_PATH/hmp155/*/*/*/*/**
            # Join on named location (already joined below by day)
            join_indices: [7]
            outer_join: true
        - path:
            name: CALIBRATION_PATH
            # Filter for data directory
            glob_pattern: /pfs/CALIBRATION_PATH/hmp155/*/*/*/*/**
            # Join on named location (already joined below by day)
            join_indices: [7]
    OUT_PATH: /pfs/out                                                  <-- Put output in /pfs/out 
    LOG_LEVEL: DEBUG
    RELATIVE_PATH_INDEX: "3"
    LINK_TYPE: SYMLINK # options are COPY or SYMLINK                    <-- Symlink files from inputs to output
input:
  join:                                                                 <-- Join between two main inputs (data and calibrations)
  - pfs:
      name: DATA_PATH
      repo: data_source_hmp155_linkmerge
      glob: /hmp155/(*)/(*)/(*)
      joinOn: $1/$2/$3
      outer_join: true
      empty_files: false 
  - pfs:
      name: CALIBRATION_PATH
      repo: hmp155_calibration_assignment
      glob: /hmp155/(*)/(*)/(*)
      joinOn: $1/$2/$3
      empty_files: false 

hmp155_calibration_conversion (performing calibration conversion)

---
pipeline:
  name: hmp155_calibration_conversion
transform:                
  cmd:                                                                  <-- Run individually, the calibration conversion module uses the cmd block to execute a single R script, followed by its input parameters
  - Rscript
  - ./flow.cal.conv.R
  - DirIn=$DIR_IN
  - DirOut=/pfs/out
  - DirErr=/pfs/out/errored_datums
  - FileSchmData=$FILE_SCHEMA_DATA
  - FileSchmQf=$FILE_SCHEMA_FLAGS
  - TermQf=temperature|relative_humidity|dew_point
  - TermFuncUcrt=temperature:def.ucrt.meas.cnst|relative_humidity:def.ucrt.meas.cnst|dew_point:def.ucrt.meas.rh.dew.frst.pt
  image: quay.io/battelleecology/neon-is-cal-conv-r:v1.0.4              <-- Image for the calibration conversion module
  image_pull_secrets:
  - battelleecology-quay-read-all-pull-secret
  env:
    LOG_LEVEL: INFO
    PARALLELIZATION_INTERNAL: '1'                                       <-- A couple environment variables that the calibration conversion module uses
input:
  cross:                                                                <-- A main data input and two avro schema inputs
  - pfs:
      name: DIR_IN
      repo: hmp155_data_calibration_group
      glob: /hmp155/*/*/*/
  - pfs:
      name: FILE_SCHEMA_DATA
      repo: avro_schemas_hmp155
      glob: /hmp155/hmp155_calibrated.avsc
  - pfs:
      name: FILE_SCHEMA_FLAGS
      repo: avro_schemas_hmp155
      glob: /hmp155/flags_calibration_hmp155.avsc

hmp155_calibration_group_and_convert (combined module joining data with calibrations, then performing calibration conversion)

---
pipeline:
  name: hmp155_calibration_group_and_convert
transform:
  image_pull_secrets: [battelleecology-quay-read-all-pull-secret]
  image: quay.io/battelleecology/neon-is-cal-grp-conv:v1.0.1            <-- The image with both modules in it
  cmd: ["/bin/bash"]
  stdin:                                                                <-- Use the stdin block to execute the modules in sequence
  - "#!/bin/bash"
  - '# Refresh interim directories with each datum '
  - rm -r -f /usr/src/app/pfs/interimData                               <-- Must remove and recreate the interim directory between each datum (first!)
  - mkdir -p /usr/src/app/pfs/interimData
  - '# Run first module - filter-joiner'
  - python3 -m filter_joiner.filter_joiner_main                         <-- Run the filter-joiner first. It uses environment variables below as input parameters
  - '# Run second module - calibration conversion'
  - Rscript ./flow.cal.conv.R                                           <-- Run the calibration conversion module second. 
    DirIn=/usr/src/app/pfs/interimData                                  <-- The input directory to the calibration module is the interim output directory of the filter-joiner
    DirOut=/pfs/out                                                     <-- Output from the calibration module goes directly into /pfs/out
    DirErr=/pfs/out/errored_datums 
    FileSchmData=$FILE_SCHEMA_DATA
    FileSchmQf=$FILE_SCHEMA_FLAGS
    "TermQf=temperature|relative_humidity|dew_point"
    "TermFuncUcrt=temperature:def.ucrt.meas.cnst|relative_humidity:def.ucrt.meas.cnst|dew_point:def.ucrt.meas.rh.dew.frst.pt"
  env:
    # Environment variables for filter-joiner
    CONFIG: |                                                           <-- The input parameters for the filter-joiner are identical to the individual module bc it is run first
      ---
      # In Pachyderm root will be index 0, 'pfs' index 1.
      # Metadata indices will typically begin at index 3.
      input_paths:
        - path:
            name: DATA_PATH
            # Filter for data directory
            glob_pattern: /pfs/DATA_PATH/hmp155/*/*/*/*/**
            # Join on named location (already joined below by day)
            join_indices: [7]
            outer_join: true
        - path:
            name: CALIBRATION_PATH
            # Filter for data directory
            glob_pattern: /pfs/CALIBRATION_PATH/hmp155/*/*/*/*/**
            # Join on named location (already joined below by day)
            join_indices: [7]
    OUT_PATH: /usr/src/app/pfs/interimData                              <-- The critical parameter for the filter joiner. Place output in an interim directory.
    LOG_LEVEL: INFO
    RELATIVE_PATH_INDEX: "3"
    LINK_TYPE: COPY # options are COPY or SYMLINK                       <-- Change the link type of the filter-joiner to copy so we avoid daisy chaining any links when the calibration conversion module is run afterward.
    # Environment variables for calibration module
    PARALLELIZATION_INTERNAL: '1' # Option for calibration conversion module
input:
  cross:                                                                <-- Note how the input block is a combination of the input blocks of the individual modules.
  - pfs:  
      name: FILE_SCHEMA_DATA
      repo: avro_schemas_hmp155
      glob: /hmp155/hmp155_calibrated.avsc
  - pfs:
      name: FILE_SCHEMA_FLAGS
      repo: avro_schemas_hmp155
      glob: /hmp155/flags_calibration_hmp155.avsc
  - join:
    - pfs:
        name: DATA_PATH
        repo: hmp155_data_source_trino
        glob: /hmp155/(*)/(*)/(*)
        joinOn: $1/$2/$3
        outer_join: true
        empty_files: false 
    - pfs:
        name: CALIBRATION_PATH
        repo: hmp155_calibration_assignment
        glob: /hmp155/(*)/(*)/(*)
        joinOn: $1/$2/$3
        empty_files: false