Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JP-2980 Documentation improvement for multiprocessing: a script which spawns processes on import will cause system failure #8408

Merged
merged 19 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ documentation

- Added docs for the NIRSpec MSA metadata file to the data products area of RTD.
[#8399]

- Added documentation for multiprocessing. [#8408]

extract_1d
----------
Expand Down
163 changes: 163 additions & 0 deletions docs/jwst/user_documentation/running_pipeline_python.rst
Original file line number Diff line number Diff line change
Expand Up @@ -529,3 +529,166 @@ individual step parameter must be set when using this method, or else the coded
defaults will be used, which may be inappropriate for the dataset being processed.

See :ref:`call_examples` for more information.


.. _multiprocessing:

Multiprocessing
===============

hbushouse marked this conversation as resolved.
Show resolved Hide resolved
Multiprocessing is supported to speed up certain computationally-intensive steps
in the pipeline, including the :ref:`jump detection <jump_step>`,
:ref:`ramp fitting <ramp_fitting_step>`, and
:ref:`WFSS contamination correction <wfss_contam_step>` steps. The examples below show how
multiprocessing can be enabled for these steps, as well as how to set up
multiprocessing to simultaneously run the entire pipeline on multiple observations.

Python's multiprocessing module explicitly imports and executes a script's
`__main__` with each and every worker. If `__main__` is not present the behavior is
hbushouse marked this conversation as resolved.
Show resolved Hide resolved
undefined. Hence, Python will crash unless the multiprocess code in enclosed in a
`__main__` block like this:


::

import sys

def main():
[code used in multiprocessing]

if __name__ = '__main__':
sys.exit(main())

penaguerrero marked this conversation as resolved.
Show resolved Hide resolved

There are a couple of scenarios to use multiprocessing with the pipeline:

1. Multiprocessing within a pipeline step. At the moment, the steps that
support this are the :ref:`jump <jump_step>`,
:ref:`ramp_fitting <ramp_fitting_step>`,
and :ref:`wfss_contam <wfss_contam_step>` steps. To enable multiprocessing, the
optional parameter is `maximum_cores` for the ``jump``, ``ramp_fitting``, and
``wfss_contam`` steps. This parameter can be set to a numerical value given
as a string or it can be set to the words `quarter`, `half`, `all`,
or `none`, which is the default value.

The following example turns on a step's multiprocessing option. Notice only
one of the steps has multiprocessing turned on. We do not recommend
simultaneously enabling both steps to do multiprocessing, as this may likely
lead to running out of system memory.



::

# SampleScript1

import os, sys
hbushouse marked this conversation as resolved.
Show resolved Hide resolved
from jwst.pipeline import Detector1Pipeline

uncal_file = 'jw0000_0000_uncal.fits'
output_dir = '/my_project'

def main():
det1 = Detector1Pipeline()
parameter_dict = {"ramp_fit": {"maximum_cores": 'all'}}
det1.call(uncal_file, save_results=True, steps=parameter_dict, output_dir=output_dir)

if __name__ = '__main__':
sys.exit(main())


2. Calling the pipeline using multiprocessing. The following example uses this
option setting up a log file for each run of the pipeline and a text file with
the full traceback in case there is a crash. Notice that the ``import`` statement
of the pipeline is within the multiprocessing block that gets called by every
worker. This is to avoid a known memory leak.
hbushouse marked this conversation as resolved.
Show resolved Hide resolved


::

# SampleScript2
hbushouse marked this conversation as resolved.
Show resolved Hide resolved

import os, sys
import traceback
import configparser
import multiprocessing
from glob import glob

def mk_stpipe_log_cfg(output_dir, log_name):
"""
Create a configuration file with the name log_name, where
the pipeline will write all output.
Args:
outpur_dir: str, path of the output directory
log_name: str, name of the log to record screen output
Returns:
nothing
"""
config = configparser.ConfigParser()
config.add_section("*")
config.set("*", "handler", "file:" + log_name)
config.set("*", "level", "INFO")
pipe_log_config = os.path.join(output_dir, "pipeline-log.cfg")
config.write(open(pipe_log_config, "w"))

def run_det1(uncal_file, output_dir):
"""
Run the Detector1 pipeline on the given file.
Args:
uncal_file: str, name of uncalibrated file to run
outpur_dir: str, path of the output directory
Returns:
nothing
"""
log_name = os.path.basename(uncal_file).replace('.fits', '')
mk_stpipe_log_cfg(output_dir, log_name+'.log')
from jwst.pipeline.calwebb_detector1 import Detector1Pipeline
pipe_success = False
try:
det1 = Detector1Pipeline()
det1.call(uncal_file, output_dir=output_dir, logcfg="pipeline-log.cfg", save_results=True)
pipe_success = True
print('\n * Pipeline finished for file: ', uncal_file, ' \n')
except Exception:
print('\n *** OH NO! The detector1 pipeline crashed! *** \n')
pipe_crash_msg = traceback.print_exc()
if not pipe_success:
crashfile = open(log_name+'_pipecrash.txt', 'w')
hbushouse marked this conversation as resolved.
Show resolved Hide resolved
print('Printing file with full traceback')
print(pipe_crash_msg, file=crashfile)

def main():
input_data_dir = '/my_project_dir'
output_dir = input_data_dir

# get the files to run
files_to_run = glob(os.path.join(input_data_dir, '*_uncal.fits'))
print('Will run the pipeline on {} files'.format(len(files_to_run)))

# the output list should be the same length as the files to run
outptd = [output_dir for _ in range(len(files_to_run))]

# get the cores to use
cores2use = int(os.cpu_count()/2) # half of all available cores
print('* Using ', cores2use, ' cores for multiprocessing.')

# set the pool and run multiprocess
with multiprocessing.Pool(cores2use) as pool:
pool.starmap(run_det1, zip(files_to_run, outptd))

print('\n * Finished multiprocessing! \n')

if __name__ == '__main__':
sys.exit(main())


.. warning::
Although it is technically possible to call the pipeline with
multiprocessing while also enabling this option in a step, we
strongly recommend not to do this. This scenario would be the same as
`SampleScript2` except with adding and calling the parameter dictionary
`parameter_dict` in `SampleScript1`. However, Python will crash
if both multiprocessing options are set to use all the cores or even
less, because it is not permitted that a worker has children processes.
We recommend not enabling step multiprocessing for parallel pipeline
runs to avoid potentially running out of memory.
Loading