Skip to content

2.2 So, you've made it to the date gap filler

Cove Sturtevant edited this page Jun 28, 2023 · 5 revisions

The date date filler module creates placeholder data for missing days and/or locations that are supposed to be producing data but didn't. Additionally, the module restricts the data making it through the pipeline to those locations that are supposed to be producing data. How do we know what locations are supposed to be producing data for a given source type and date? This is controlled by the active periods for each named location (you populated this information in Steps 4 & 5 of the Workflow for product creation).

Some extra care is needed in setting up the <SOURCE_TYPE>_date_gap_filler pipeline (or the combined version with regularization <SOURCE_TYPE>_fill_date_gaps_and_regularize) because we only want a little bit of test data when we're developing, but in normal operation this set of pipelines will fill in ALL locations for ALL days that were supposed to be producing data. That would really slow us down having to process all that data in development, so we're going to adjust things a bit.

Here's the architecture of what feeds into the <SOURCE_TYPE>_date_gap_filler:

Date Gap Filler Architecture

Note that you should have already stood up <SOURCE_TYPE>_merge_data_by_location by the time you are reading this (or the combined module <SOURCE_TYPE>_location_group_and_restructure), and location_loader_<SOURCE_TYPE> (blue) should also be stood up. Note that the <SOURCE_TYPE>_cron_daily_and_date_control pipeline controls the days that will be populated. Below are instructions for setting up the <SOURCE_TYPE>_location_active_dates_assignment pipeline, populating the empty_files_<SOURCE_TYPE> repo for your source type, and finally standing up the <SOURCE_TYPE>_date_gap_filler pipeline for development.

Follow these steps:

Set up location_active_dates_assignment

Use a template for the <SOURCE_TYPE>_location_active_dates_assignment pipeline spec from another pipeline that has a similar structure to yours (pipelines for almost all source types are very similar at this stage). Modify the name and contents for your source type, which probably means simply finding and replacing the source type throughout the pipeline spec. Deploy the pipeline. It will take a while to finish.

If all went well, you should see at least one year of files in <SOURCE_TYPE>_location_active_dates_assignment, for example:

$ pachctl glob file prt_location_active_dates_assignment@master:/**
NAME                                                            TYPE SIZE
/prt/2019/01/01/CFGLOC100238/location/CFGLOC100238.json         file 955B
/prt/2019/01/01/CFGLOC100241/location/CFGLOC100241.json         file 955B
/prt/2019/01/01/CFGLOC100244/location/CFGLOC100244.json         file 955B
/prt/2019/01/02/CFGLOC100238/location/CFGLOC100238.json         file 955B
/prt/2019/01/02/CFGLOC100241/location/CFGLOC100241.json         file 955B
/prt/2019/01/02/CFGLOC100244/location/CFGLOC100244.json         file 955B
/prt/2019/01/03/CFGLOC100238/location/CFGLOC100238.json         file 955B
/prt/2019/01/03/CFGLOC100241/location/CFGLOC100241.json         file 955B
/prt/2019/01/03/CFGLOC100244/location/CFGLOC100244.json         file 955B
...
/prt/2019/01/27/CFGLOC100238/location/CFGLOC100238.json         file 955B
/prt/2019/01/27/CFGLOC100241/location/CFGLOC100241.json         file 955B
/prt/2019/01/27/CFGLOC100244/location/CFGLOC100244.json         file 955B
/prt/2019/01/28/CFGLOC100238/location/CFGLOC100238.json         file 955B
/prt/2019/01/28/CFGLOC100241/location/CFGLOC100241.json         file 955B
/prt/2019/01/28/CFGLOC100244/location/CFGLOC100244.json         file 955B
...

Create empty files

The empty_files_<SOURCE_TYPE> repo contains template files for the pipeline to use when there is no data. They are empty files (who knew?) with the columns you need in your output data. When data are missing, the pipeline will use the file to generate a table of all NAs that is in the correct format for future pipeline components to use.

Within the empty files repo, the structure is very simple (Hierarchical view):

/<SOURCE_TYPE>_empty_files
  |
  +- /<SOURCE_TYPE>
      |
      +- /data/
      |   |
      |   +<SOURCE_TYPE>_location_year-month-day.parquet
      |
      +- /flags/
      |   |
      |   +<SOURCE_TYPE>_location_year-month-day_flagsCal.parquet
      | 
      +- /uncertainty_data/
      |   |
      |   +<SOURCE_TYPE>_location_year-month-day_uncertaintyData.parquet
      |
      +- /uncertainty_coef/
          |
          +<SOURCE_TYPE>_location_year-month-day_uncertaintyCoef.json

There should be a file for the data, flags, uncertainty data (if applicable), and uncertainty coef (if applicable), each containing one template .parquet or .json file. Note the file names--the year-month-day in the filename are populated with real dates as needed in the output of the pipeline.

To see what files you need in your empty_files_<SOURCE_TYPE>, execute the following command in your terminal (filling in a valid year, month, day, and location ID). This assumes you are using the combined module approach:

pachctl glob file <SOURCE_TYPE>_location_group_and_restructure@master:/<SOURCE_TYPE>/<YEAR>/<MONTH>/<DAY>/<LOCATION>

Which will display the subdirectories needed in your <SOURCE_TYPE>_empty_files/SOURCE_TYPE/ directory. Note that you will never make an empty file for the location directory.

The source of truth for empty files is the NEON-IS-avro-schemas Git repo. Go ahead and make a directory for your sensor. The 'easiest' way to make your files is to:

  1. grab real data from Pachyderm for data, flags, calibration_coef, etc..., and export it to your local environment (e.g. the som server)
  2. load each file into R or python as preferred, remove all the rows from the data frame (leaving the columns), and re-save the empty files locally. An example of how to do this is in /utilities/R_coding/flow.make.file.empt.R.
  3. move the empty files to the NEON-IS-avro-schemas Git repo
  4. load the empty files into Pachyderm.

Create the <SOURCE_TYPE>_date_gap_filler pipeline

This is a critical step to pare down the locations that make it through the <SOURCE_TYPE>_date_gap_filler so that only a few test locations are propagated to further testing. Use a template for the <SOURCE_TYPE>_date_gap_filler pipeline spec from another pipeline that has a similar structure to yours. Search for and replace your source type throughout the pipeline spec. Now make sure the OUTPUT_DIRECTORIES environment variable in the env portion of the pipeline spec contains all the subdirectories that are present for each named location in the <SOURCE_TYPE>_merge_data_by_location output of your pipeline. The env section looks like this (note the OUTPUT_DIRECTORIES line):

  env:
    LOG_LEVEL: DEBUG
    OUT_PATH: /pfs/out
    OUTPUT_DIRECTORIES: data,location,uncertainty_data,uncertainty_coef,flags
    DATA_SOURCE_TYPE_INDEX: '3'
    DATA_YEAR_INDEX: '4'
    DATA_MONTH_INDEX: '5'
    DATA_DAY_INDEX: '6'
    DATA_LOCATION_INDEX: '7'
    DATA_TYPE_INDEX: '8'
    LOCATION_SOURCE_TYPE_INDEX: '3'
    LOCATION_YEAR_INDEX: '4'
    LOCATION_MONTH_INDEX: '5'
    LOCATION_DAY_INDEX: '6'
    LOCATION_INDEX: '7'
    EMPTY_FILE_TYPE_INDEX: '4'
    LINK_TYPE: SYMLINK # options are COPY or SYMLINK

Now comes the part where we limit the named locations that make it through the <SOURCE_TYPE>_date_gap_filler for development purposes. Look for the pfs inputs with name: DATA_PATH and name: LOCATION_PATH. In production, it's going to look something like this (example is for the prt pipeline):

  - group:
    - pfs:
        name: DATA_PATH
        repo: prt_merge_data_by_location
        glob: /prt/(*/*/*) 
        group_by: $1
        empty_files: false 
    - join:
      - pfs:
          name: LOCATION_PATH
          repo: prt_location_active_dates_assignment
          glob: /prt/(*/*/*) 
          joinOn: $1
          group_by: $1
          empty_files: false 

The glob pattern above sets each day as a datum (e.g. /prt/2019/01/01). The parenthesis around the (//*), called a capture group, is used for both the join operation and the grouping operation (see Pachyderm reference for detailed descriptions of these concepts). In short, their combined effect is to make sure data for each day in all the input repos of the pipeline spec are sent into the container for processing. For development, we can extend the glob pattern to apply a regex expression that matches only certain locations, like this:

  • group:
    • pfs: name: DATA_PATH repo: prt_merge_data_by_location glob: /prt/(//*/(CFGLOC100471|CFGLOC100474|CFGLOC100480)) group_by: $1 empty_files: false
    • join:
      • pfs: name: LOCATION_PATH repo: prt_location_active_dates_assignment glob: /prt/((//*)/(CFGLOC100471|CFGLOC100474|CFGLOC100480)) joinOn: $1 group_by: $1 empty_files: false
Replace the location identifiers shown above with those you want to test your pipeline with. Which named locations should you include? Check those in your `<SOURCE_TYPE>_merge_data_by_location` pipeline and include them all or just a subset. It's a good idea to include a named location that is in the `<SOURCE_TYPE>_location_active_dates_assignment` repo but not in `<SOURCE_TYPE>_merge_data_by_location` so that you can test the behavior of the rest of the pipeline when only quality flags should be produced (because there is no sensor data from the location, a common occurrence).

The full pipeline spec for the `prt_date_gap_filler` pipeline (for development) looks like this:

pipeline: name: prt_date_gap_filler transform: image_pull_secrets:

  • battelleecology-quay-read-all-pull-secret image: quay.io/battelleecology/date_gap_filler:12 cmd:
  • /bin/bash stdin:
  • '#!/bin/bash'
  • python3 -m date_gap_filler.date_gap_filler_main env: LOG_LEVEL: DEBUG OUT_PATH: /pfs/out OUTPUT_DIRECTORIES: data,location,uncertainty_data,uncertainty_coef,flags DATA_SOURCE_TYPE_INDEX: '3' DATA_YEAR_INDEX: '4' DATA_MONTH_INDEX: '5' DATA_DAY_INDEX: '6' DATA_LOCATION_INDEX: '7' DATA_TYPE_INDEX: '8' LOCATION_SOURCE_TYPE_INDEX: '3' LOCATION_YEAR_INDEX: '4' LOCATION_MONTH_INDEX: '5' LOCATION_DAY_INDEX: '6' LOCATION_INDEX: '7' EMPTY_FILE_TYPE_INDEX: '4' LINK_TYPE: SYMLINK # options are COPY or SYMLINK input: cross:
  • pfs: name: EMPTY_FILE_PATH repo: empty_files_prt glob: /prt empty_files: false
  • group:
    • pfs: name: DATA_PATH repo: prt_merge_data_by_location glob: /prt/(//*/(CFGLOC100471|CFGLOC100474|CFGLOC100480)) group_by: $1 empty_files: false
    • join:
      • pfs: name: LOCATION_PATH repo: prt_location_active_dates_assignment glob: /prt/((//*)/(CFGLOC100471|CFGLOC100474|CFGLOC100480)) joinOn: $1 group_by: $1 empty_files: false
      • pfs: name: DATE_LIMITER_PATH repo: prt_date_gap_filler_limiter glob: /(//*) joinOn: $1 group_by: $1 empty_files: true parallelism_spec: constant: 4 resource_requests: memory: 400M cpu: 0.5 pod_patch: '[{"op":"replace","path":"/containers/1/resources/requests/memory","value":"2G"},{"op":"replace","path":"/containers/1/resources/requests/cpu","value":"1.5"}]' scheduling_spec: node_selector: cloud.google.com/machine-family: n2d autoscaling: true

### Stand up the <SOURCE_TYPE>_date_gap_filler pipeline.
Do it.