-
Notifications
You must be signed in to change notification settings - Fork 3
Creating new ZIPPY modules
Want to make a new ZIPPY module? Awesome! Here's what you need to know:
- ZIPPY modules must be defined in a file of your choice that is importable from python. To tell ZIPPY that you are using custom modules, you must set 'imports' in your parameter json file to a list of names for python to import.
- ZIPPY modules are subclasses of ModularRunner
- ZIPPY module names must terminate in Runner
- ZIPPY modules have three important parts
- workflow. The workflow function is where the heavy lifting happens. The workflow function is passed a pyflow workflowRunner object, which should be used to execute any jobs needed to perform the module's task.
- dependencies. The module must call collect_dependencies(sample) to find the list of dependencies that sample depends on. These dependencies are used for the initial set of tasks added to the workflowRunner. Similarly, the module must output all dependencies that consumers of its output would require. By default, setting self.task[sample] to be a list of all tasks that the sample depends on is sufficient. If something more complicated is needed, the function get_dependencies can be overridden. Note that the dependency format is inherited from pyflow. See the pyflow documentation for more details.
- output. The module must define the function
def get_output(self, sample):
to return the file or files it produces as output
- A module can find its input samples using self.collect_samples(). This returns a list of sample objects. The sample object is a namedtuple with fields sample.id and sample.name
- The params object contains all the parameters for the class that need to be chosen by the user. There are two parts of the namespace
- global. For example, params.genome refers to the genome fasta file, which is the same for all pipeline modules.
- local. Some parameters vary per instance of the module, such as the output directory. We do some magic so that these parameters can be accessed using the self keyword, e.g., params.self.output_dir.
- optional. Another feature of the params object is that parameters can be defined to be optional. To do so, use the syntax: params.optional.my_optional_param. Optional parameters must be tagged optional at every use, otherwise they will be considered required. Optional parameters may be in either the global or the local namespace.
Let's look at a simple runner:
class SubsampleBAMRunner(ModularRunner):
def get_output(self, sample):
return {'bam': os.path.join(self.params.self.output_dir, sample.name+".sub.bam")}
def workflow(self, workflowRunner):
self.task = defaultdict(list)
if not os.path.exists(self.params.self.output_dir):
os.makedirs(self.params.self.output_dir)
for sample in self.collect_samples():
sample_name = sample.name
dependencies = self.collect_dependencies(sample)
bam_file = self.collect_input(sample, 'bam')
new_bam_file = os.path.join(self.params.self.output_dir, sample_name+".sub.bam")
subsample_command = 'samtools view -s {3}{0} -b {1} > {2}'.format(
self.params.self.subsample_fraction, bam_file, new_bam_file, random.randint(0,100000))
self.task[sample].append(workflowRunner.addTask('{}_{}'.format(self.identifier, sample.id), subsample_command, dependencies=dependencies))
There are a few interesting things to note about this runner.
-
It uses two instance specific parameters: params.self.output_directory, and params.self.subsample_fraction. This allows it to be instantiated multiple times with different subsample_fractions in the same pipeline.
-
You will often follow this useful pattern to instantiate a workflow:
self.task = defaultdict(list) if not os.path.exists(self.params.self.output_dir): os.makedirs(self.params.self.output_dir) for sample in self.sample_sheet.get_sample_ids(): sample_name = sample.name dependencies = self.collect_dependencies(sample) bam_file = self.collect_input(sample, 'bam')
This pattern lets you set up your output, iterate through samples, get the sample_name, get your pipeline dependencies, and your input file. Note the built in helper functions 'collect_dependencies' and 'collect_input', that return all of the relevant input/dependencies per-sample. You're now all set to setup your command.
-
You will often follow this other useful pattern to close out a workflow:
self.task[sample].append(workflowRunner.addTask('{}_{}'.format(self.identifier, sample), subsample_command, dependencies=dependencies))
Stages should store the dependencies they create for their children in the tasks object. In the usual case, you will want to set up a default dict from sample to a task or list of tasks. This pattern adds your command to the workflow, mindful of dependencies, and sets up self.task[sample] to contain this sample's new dependencies for this stage. By default, this stage's get_dependencies will return self.task[sample].
-
Your output should be per-sample, and in the form of a python dictionary. The map should be from file_type to file_name. If you are using a common file type, make sure that your file_type descriptor matches what other modules expect! Current common file types are:
- fastq
- bam (by convention, output bams should be sorted and indexed, as many other tools require that)
- vcf
-
Our example here is of the usual case of one sample in -> one sample out. In more complicated cases, you will have to override a bit more functionality.
There are many other examples in ModularRunner.py that show a variety of ways to implement your pipeline stage!