Skip to content

Commit

Permalink
fixed snakemake multithreading
Browse files Browse the repository at this point in the history
  • Loading branch information
Ubuntu committed Feb 25, 2024
1 parent 57f0517 commit 57be315
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 35 deletions.
6 changes: 4 additions & 2 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
#Path for input genomes
GENOMES: "/datasets/gzipped_birds_new"
#Reference tree (optional, default set as null)
REFERENCE: reference_trees/birds_reftree_363_very_latest.nwk
REFERENCE: NULL
#Length of each of the genes
LENGTH: 500
#Number of genes per iteration
GENE_COUNT: 250
GENE_COUNT: 4000
#Minimum % uppercase for sampling valid genes
UPPER_CASE: 0.90
#ROADIES output directory (current iteration output in --converge option)
Expand All @@ -31,3 +31,5 @@ FILTERFRAGMENTS: 0.50
MASKSITES: 0.02
#Threshold for high support (only used in --converge option)
SUPPORT_THRESHOLD: 0.95
#Number of instances to run in parallel
NUM_INSTANCES: 4
6 changes: 3 additions & 3 deletions workflow/rules/align.smk
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ rule lastz:
config["OUT_DIR"]+"/alignments/{sample}.maf"
benchmark:
config["OUT_DIR"]+"/benchmarks/{sample}.lastz.txt"
threads: 4
threads: lambda wildcards: int(config['num_threads'])
params:
species = "{sample}",
identity = config['IDENTITY'],
Expand Down Expand Up @@ -64,7 +64,7 @@ rule pasta:
suffix = "fa.aln"
benchmark:
config["OUT_DIR"]+"/benchmarks/{id}.pasta.txt"
threads: 4
threads: lambda wildcards: int(config['num_threads'])
conda:
"../envs/msa.yaml"
shell:
Expand All @@ -91,7 +91,7 @@ rule pasta:
if [ "$all_matched" = true ]; then
cp "$input_file" "$output_file"
else
python pasta/run_pasta.py -i {input} -j {params.prefix} --alignment-suffix={params.suffix} --num-cpus 4
python pasta/run_pasta.py -i {input} -j {params.prefix} --alignment-suffix={params.suffix} --num-cpus {threads}
fi
fi
touch {output}
Expand Down
1 change: 0 additions & 1 deletion workflow/rules/fasttree.smk
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ rule fasttree:
params:
m = config["MIN_ALIGN"],
max_len = int(100*config["LENGTH"]/config["IDENTITY"])
threads: 1
benchmark:
config["OUT_DIR"]+"/benchmarks/{id}.fasttree.txt"
shell:
Expand Down
7 changes: 3 additions & 4 deletions workflow/rules/mashtree.smk
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ rule sequence_select:
config["OUT_DIR"]+"/benchmarks/{sample}.sample.txt"
output:
config["OUT_DIR"]+"/samples/{sample}_temp.fa"
threads:workflow.cores
shell:
'''
echo "We are starting to sample {input}"
Expand Down Expand Up @@ -72,7 +71,7 @@ rule lastz:
config["OUT_DIR"]+"/alignments/{sample}.maf"
benchmark:
config["OUT_DIR"]+"/benchmarks/{sample}.lastz.txt"
threads: 4
threads: lambda wildcards: int(config['num_threads'])
params:
species = "{sample}",
identity = config['IDENTITY'],
Expand Down Expand Up @@ -127,12 +126,12 @@ rule mashtree:
gene_dir = config["OUT_DIR"] + "/genes/gene_{id}"
benchmark:
config["OUT_DIR"]+"/benchmarks/{id}.mashtree.txt"
threads: 8
threads: lambda wildcards: int(config['num_threads'])
shell:
'''
if [[ `grep -n '>' {input} | wc -l` -gt {params.m} ]] || [[ `awk 'BEGIN{{l=0;n=0;st=0}}{{if (substr($0,1,1) == ">") {{st=1}} else {{st=2}}; if(st==1) {{n+=1}} else if(st==2) {{l+=length($0)}}}} END{{if (n>0) {{print int((l+n-1)/n)}} else {{print 0}} }}' {input}` -gt {params.max_len} ]]
then
mashtree --mindepth 0 --numcpus 8 --kmerlength 10 {params.gene_dir}/*.fa > {output}
mashtree --mindepth 0 --numcpus {threads} --kmerlength 10 {params.gene_dir}/*.fa > {output}
fi
touch {output}
'''
Expand Down
1 change: 0 additions & 1 deletion workflow/rules/sampling.smk
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ rule sequence_select:
config["OUT_DIR"]+"/benchmarks/{sample}.sample.txt"
output:
config["OUT_DIR"]+"/samples/{sample}_temp.fa"
threads:workflow.cores
shell:
'''
echo "We are starting to sample {input}"
Expand Down
4 changes: 2 additions & 2 deletions workflow/rules/tree.smk
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ rule raxmlng:
params:
m = config["MIN_ALIGN"],
max_len = int(100*config["LENGTH"]/config["IDENTITY"])
threads: 4
threads: lambda wildcards: int(config['num_threads'])
benchmark:
config["OUT_DIR"]+"/benchmarks/{id}.iqtree.txt"
shell:
'''
if [[ `grep -n '>' {input.msa} | wc -l` -gt {params.m} ]] && [[ `awk 'BEGIN{{l=0;n=0;st=0}}{{if (substr($0,1,1) == ">") {{st=1}} else {{st=2}}; if(st==1) {{n+=1}} else if(st==2) {{l+=length($0)}}}} END{{if (n>0) {{print int((l+n-1)/n)}} else {{print 0}} }}' {input.msa}` -lt {params.max_len} ]]
then
raxml-ng --msa {input.msa} --model GTR+G+F --redo --threads auto{{4}} --blopt nr_safe
raxml-ng --msa {input.msa} --model GTR+G+F --redo --threads auto{{threads}} --blopt nr_safe
else
touch {output.gene_tree}
fi
Expand Down
28 changes: 16 additions & 12 deletions workflow/scripts/converge.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,19 @@ def read_initial_gene_count(config_path):
return config['GENE_COUNT']

# function to run snakemake with settings and add to run folder
def run_snakemake(cores, mode, out_dir, run, roadies_dir, config_path):
def run_snakemake(cores, mode, out_dir, run, roadies_dir, config_path, fixed_parallel_instances):

# Set threads per instance dynamically
num_threads = cores/fixed_parallel_instances

cmd = [
"snakemake",
"--cores",
str(cores),
"--jobs",
str(cores),
"--config",
"mode=" + str(mode),
"config_path=" + str(config_path),
"num_threads=" + str(num_threads),
"--use-conda",
"--rerun-incomplete",
]
Expand All @@ -71,7 +74,7 @@ def run_snakemake(cores, mode, out_dir, run, roadies_dir, config_path):


# function to combine gene trees and mapping files from all iterations
def combine_iter(out_dir, run):
def combine_iter(out_dir, run, cores):
os.system(
"cat {0}/{1}/gene_tree_merged.nwk >> {0}/master_gt.nwk".format(out_dir, run)
)
Expand All @@ -81,13 +84,13 @@ def combine_iter(out_dir, run):

# open both files and get lines, each line is a separate gene tree
os.system(
"ASTER-Linux/bin/astral-pro -t 16 -i {0}/master_gt.nwk -o {0}/{1}.nwk -a {0}/master_map.txt".format(
out_dir, run
"ASTER-Linux/bin/astral-pro -t {2} -i {0}/master_gt.nwk -o {0}/{1}.nwk -a {0}/master_map.txt".format(
out_dir, run, cores
)
)
os.system(
"ASTER-Linux/bin/astral-pro -t 16 -u 3 -i {0}/master_gt.nwk -o {0}/{1}_stats.nwk -a {0}/master_map.txt".format(
out_dir, run
"ASTER-Linux/bin/astral-pro -t {2} -u 3 -i {0}/master_gt.nwk -o {0}/{1}_stats.nwk -a {0}/master_map.txt".format(
out_dir, run, cores
)
)
# open both master files and get gene trees and mapping
Expand All @@ -98,7 +101,7 @@ def combine_iter(out_dir, run):


# function for convergence run
def converge_run(iteration, cores, mode, out_dir, ref_exist, roadies_dir, support_thr, config_path):
def converge_run(iteration, cores, mode, out_dir, ref_exist, roadies_dir, support_thr, config_path, fixed_parallel_instances):
os.system("rm -r {0}".format(roadies_dir))
os.system("mkdir {0}".format(roadies_dir))
run = "iteration_"
Expand All @@ -111,9 +114,9 @@ def converge_run(iteration, cores, mode, out_dir, ref_exist, roadies_dir, suppor
if iteration >= 2:
base_gene_count = read_initial_gene_count(config_path) # Read initial GENE_COUNT value
update_config(config_path, iteration, base_gene_count)
run_snakemake(cores, mode, out_dir, run, roadies_dir, config_path)
run_snakemake(cores, mode, out_dir, run, roadies_dir, config_path, fixed_parallel_instances)
# merging gene trees and mapping files
gene_trees = combine_iter(out_dir, run)
gene_trees = combine_iter(out_dir, run, cores)
t = Tree(out_dir + "/" + run + ".nwk")
# add species tree to tree list
if ref_exist:
Expand Down Expand Up @@ -177,6 +180,7 @@ def converge_run(iteration, cores, mode, out_dir, ref_exist, roadies_dir, suppor
LENGTH = config["LENGTH"]
support_thr = config["SUPPORT_THRESHOLD"]
roadies_dir = config["OUT_DIR"]
fixed_parallel_instances = config["NUM_INSTANCES"]
master_gt = out_dir + "/master_gt.nwk"
master_map = out_dir + "/master_map.txt"
os.system("rm -r {0}".format(out_dir))
Expand All @@ -198,7 +202,7 @@ def converge_run(iteration, cores, mode, out_dir, ref_exist, roadies_dir, suppor
t_out.write("Start time: " + str(start_time_l) + "\n")
while True:
percent_high_support, num_gt, outputtree = converge_run(
iteration, CORES, MODE, out_dir, ref_exist, roadies_dir, support_thr, config_path
iteration, CORES, MODE, out_dir, ref_exist, roadies_dir, support_thr, config_path, fixed_parallel_instances
)
curr_time = time.time()
curr_time_l = time.asctime(time.localtime(time.time()))
Expand Down
24 changes: 14 additions & 10 deletions workflow/scripts/noconverge.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,19 @@ def comp_tree(t1, t2):


# function to run snakemake with settings and add to run folder
def run_snakemake(cores, mode, config_path):
def run_snakemake(cores, mode, config_path, fixed_parallel_instances):

# Set threads per instance dynamically
num_threads = cores/fixed_parallel_instances

cmd = [
"snakemake",
"--cores",
str(cores),
"--jobs",
str(cores),
"--config",
"mode=" + str(mode),
"config_path=" + str(config_path),
"num_threads=" + str(num_threads),
"--use-conda",
"--rerun-incomplete",
]
Expand All @@ -49,18 +52,18 @@ def run_snakemake(cores, mode, config_path):


# function for convergence run
def converge_run(cores, mode, ref_exist, trees, roadies_dir, config_path):
def converge_run(cores, mode, ref_exist, trees, roadies_dir, config_path, fixed_parallel_instances):
# run snakemake with specificed gene number and length
run_snakemake(cores, mode, config_path)
run_snakemake(cores, mode, config_path, fixed_parallel_instances)
# merging gene trees and mapping files
os.system(
"ASTER-Linux/bin/astral-pro -t 16 -i {0}/genetrees/gene_tree_merged.nwk -o {0}/roadies.nwk -a {0}/genes/mapping.txt".format(
roadies_dir
"ASTER-Linux/bin/astral-pro -t {1} -i {0}/genetrees/gene_tree_merged.nwk -o {0}/roadies.nwk -a {0}/genes/mapping.txt".format(
roadies_dir,cores
)
)
os.system(
"ASTER-Linux/bin/astral-pro -t 16 -u 3 -i {0}/genetrees/gene_tree_merged.nwk -o {0}/roadies_stats.nwk -a {0}/genes/mapping.txt".format(
roadies_dir
"ASTER-Linux/bin/astral-pro -t {1} -u 3 -i {0}/genetrees/gene_tree_merged.nwk -o {0}/roadies_stats.nwk -a {0}/genes/mapping.txt".format(
roadies_dir,cores
)
)
gt = open(roadies_dir + "/genetrees/gene_tree_merged.nwk", "r")
Expand Down Expand Up @@ -112,6 +115,7 @@ def converge_run(cores, mode, ref_exist, trees, roadies_dir, config_path):
NUM_GENES = config["GENE_COUNT"]
LENGTH = config["LENGTH"]
roadies_dir = config["OUT_DIR"]
fixed_parallel_instances = config["NUM_INSTANCES"]
os.system("rm -r {0}".format(roadies_dir))
os.system("mkdir {0}".format(roadies_dir))
sys.setrecursionlimit(2000)
Expand All @@ -127,7 +131,7 @@ def converge_run(cores, mode, ref_exist, trees, roadies_dir, config_path):
time_stamps.append(start_time)
with open(roadies_dir + "/time_stamps.csv", "a") as t_out:
t_out.write("Start time: " + str(start_time_l) + "\n")
num_gt = converge_run(CORES, MODE, ref_exist, trees, roadies_dir, config_path)
num_gt = converge_run(CORES, MODE, ref_exist, trees, roadies_dir, config_path, fixed_parallel_instances)
curr_time = time.time()
curr_time_l = time.asctime(time.localtime(time.time()))
to_previous = curr_time - time_stamps[len(time_stamps) - 1]
Expand Down

0 comments on commit 57be315

Please sign in to comment.