Skip to content

Commit

Permalink
test for redistributing vcores at the end of map phase
Browse files Browse the repository at this point in the history
  • Loading branch information
ddecap committed Apr 16, 2015
1 parent e69a598 commit e82c955
Show file tree
Hide file tree
Showing 13 changed files with 144 additions and 112 deletions.
15 changes: 11 additions & 4 deletions halvade/src/be/ugent/intec/halvade/HalvadeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ public class HalvadeOptions {
protected int stand_emit_conf = -1;
protected SAMSequenceDictionary dict;
protected String chr = null;
protected int reducersPerContainer = -1;
protected int mapsPerContainer = -1;
protected int reducerContainersPerNode = -1;
protected int mapContainersPerNode = -1;
protected boolean reuseJVM = false;
protected boolean justAlign = false;
protected String exomeBedFile = null;
Expand All @@ -93,6 +93,7 @@ public class HalvadeOptions {
protected boolean useSharedMemory = false;
protected boolean useBamInput = false;
protected boolean setMapContainers = true, setReduceContainers = true;
protected boolean redistribute = false;
protected DecimalFormat onedec;
private static final double REDUCE_TASKS_FACTOR = 1.68*15;
private static final double DEFAULT_COVERAGE = 50;
Expand Down Expand Up @@ -128,6 +129,7 @@ public int GetOptions(String[] args, Configuration hConf) throws IOException, UR
HalvadeConf.setUseIPrep(hConf, useIPrep);
HalvadeConf.setUseUnifiedGenotyper(hConf, useGenotyper);
HalvadeConf.setReuseJVM(hConf, reuseJVM);
HalvadeConf.setRedistribute(hConf, redistribute);
HalvadeConf.setReadGroup(hConf, "ID:" + RGID + " LB:" + RGLB + " PL:" + RGPL + " PU:" + RGPU + " SM:" + RGSM);
HalvadeConf.setkeepChrSplitPairs(hConf, keepChrSplitPairs);
if(STARGenome != null && useSharedMemory) HalvadeConf.setStarDirPass2HDFS(hConf, out);
Expand Down Expand Up @@ -380,6 +382,8 @@ protected void createOptions() {
.create( "shmem" );
Option optBamIn= OptionBuilder.withDescription( "Uses aligned bam as input files instead of unaligned fastq files.")
.create( "bam" );
Option optRedis= OptionBuilder.withDescription( "This will enable Halvade to redistribute resources when possible when not all containers are used.")
.create( "redistribute" );


options.addOption(optIn);
Expand Down Expand Up @@ -423,6 +427,7 @@ protected void createOptions() {
options.addOption(optShmem);
options.addOption(optBamIn);
options.addOption(optCustomArgs);
options.addOption(optRedis);
}

protected boolean parseArguments(String[] args, Configuration halvadeConf) throws ParseException {
Expand Down Expand Up @@ -460,11 +465,11 @@ protected boolean parseArguments(String[] args, Configuration halvadeConf) throw
mem = Double.parseDouble(line.getOptionValue("mem"));
if(line.hasOption("mpn")) {
setMapContainers = false;
mapsPerContainer = Integer.parseInt(line.getOptionValue("mpn"));
mapContainersPerNode = Integer.parseInt(line.getOptionValue("mpn"));
}
if(line.hasOption("rpn")) {
setReduceContainers = false;
reducersPerContainer = Integer.parseInt(line.getOptionValue("rpn"));
reducerContainersPerNode = Integer.parseInt(line.getOptionValue("rpn"));
}
if(line.hasOption("scc"))
stand_call_conf = Integer.parseInt(line.getOptionValue("scc"));
Expand All @@ -474,6 +479,8 @@ protected boolean parseArguments(String[] args, Configuration halvadeConf) throw
reportAll = true;
if(line.hasOption("keep"))
keepFiles = true;
if(line.hasOption("redistribute"))
redistribute = true;
if(line.hasOption("s"))
paired = false;
if(line.hasOption("justalign")) {
Expand Down
20 changes: 11 additions & 9 deletions halvade/src/be/ugent/intec/halvade/HalvadeResourceManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class HalvadeResourceManager {
//mapmem, redmem
{MEM_STAR, ALL}, // RNA with shared memory pass1
{MEM_STAR, MEM_REF}, // RNA with shared memory pass2
{MEM_STAR, MEM_REF}, // RNA without shared memory
{MEM_STAR, MEM_REF}, // RNA without shared memory <- to be deleted
{MEM_REF, MEM_REF}, // DNA
{4*1024, 4*1024} // combine
};
Expand All @@ -55,20 +55,22 @@ public static void setJobResources(HalvadeOptions opt, Configuration conf, int t
}

if (opt.setMapContainers)
opt.mapsPerContainer = Math.min(tmpvcores / 2, Math.max(tmpmem / RESOURCE_REQ[type][0],1));
opt.mapContainersPerNode = Math.min(tmpvcores, Math.max(tmpmem / RESOURCE_REQ[type][0],1));
if (opt.setReduceContainers)
opt.reducersPerContainer = Math.min(tmpvcores, Math.max(tmpmem / RESOURCE_REQ[type][1], 1));

opt.maps = Math.max(1,opt.nodes*opt.mapsPerContainer);
opt.mthreads = Math.max(1, tmpvcores/opt.mapsPerContainer);
opt.rthreads = Math.max(1, tmpvcores/opt.reducersPerContainer);
opt.reducerContainersPerNode = Math.min(tmpvcores, Math.max(tmpmem / RESOURCE_REQ[type][1], 1));

opt.maps = Math.max(1,opt.nodes*opt.mapContainersPerNode);
Logger.DEBUG("set # map containers: " + opt.maps);
HalvadeConf.setMapContainerCount(conf, opt.maps);
HalvadeConf.setVcores(conf, opt.vcores);
opt.mthreads = Math.max(1, tmpvcores/opt.mapContainersPerNode);
opt.rthreads = Math.max(1, tmpvcores/opt.reducerContainersPerNode);
int mmem = RESOURCE_REQ[type][0];
int rmem = RESOURCE_REQ[type][1] == ALL ? tmpmem : RESOURCE_REQ[type][1];

Logger.DEBUG("resources set to " + opt.mapsPerContainer + " maps ["
Logger.DEBUG("resources set to " + opt.mapContainersPerNode + " maps ["
+ opt.mthreads + " cpu , " + mmem + " mb] per node and "
+ opt.reducersPerContainer + " reducers ["
+ opt.reducerContainersPerNode + " reducers ["
+ opt.rthreads + " cpu, " + rmem + " mb] per node");

conf.set("mapreduce.map.cpu.vcores", "" + opt.mthreads);
Expand Down
1 change: 0 additions & 1 deletion halvade/src/be/ugent/intec/halvade/MapReduceRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ public int run(String[] strings) throws Exception {
protected int runPass1RNAJob(Configuration pass1Conf, String tmpOutDir) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {
HalvadeConf.setIsPass2(pass1Conf, false);
HalvadeResourceManager.setJobResources(halvadeOpts, pass1Conf, HalvadeResourceManager.RNA_SHMEM_PASS1, false);

Job pass1Job = Job.getInstance(pass1Conf, "Halvade pass 1 RNA pipeline");
pass1Job.addCacheArchive(new URI(halvadeOpts.halvadeBinaries));
pass1Job.setJarByClass(be.ugent.intec.halvade.hadoop.mapreduce.HalvadeMapper.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ protected String makeRegionFile(Context context, ChromosomeRange r, Preprocessin
Logger.DEBUG("empty region file, no vcf results!!");
return null;
}
HalvadeFileUtils.removeLocalFile(keep, exomebed);
} else
r.writeToPicardRegionFile(region);
return region;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,7 @@ protected void cleanup(Context context) throws IOException, InterruptedException
HalvadeFileUtils.uploadFileToHDFS(context, fs, mergeFile.getAbsolutePath(), out + mergeFile.getName());

// build new genome ref
Logger.DEBUG("jobid: " + jobId);
String newGenomeDir = refDir + taskId + "-nsg/";
String newGenomeDir = refDir + jobId + "-nsg/";
File starOut = new File(newGenomeDir);
starOut.mkdirs();

Expand All @@ -119,6 +118,7 @@ protected void cleanup(Context context) throws IOException, InterruptedException
File pass2check = new File(newGenomeDir + HalvadeFileUtils.HALVADE_STAR_SUFFIX_P2);
pass2check.createNewFile();
if(requireUploadToHDFS) {
Logger.DEBUG("Uploading STAR genome to parallel filesystem...");
fs.mkdirs(new Path(pass2GenDir));
File[] genFiles = starOut.listFiles();
for(File gen : genFiles) {
Expand All @@ -136,6 +136,7 @@ protected void setup(Context context) throws IOException, InterruptedException {
keyFactors = new ArrayList<>();
tmpDir = HalvadeConf.getScratchTempDir(context.getConfiguration());
refDir = HalvadeConf.getRefDirOnScratch(context.getConfiguration());
requireUploadToHDFS = refDir.startsWith(tmpDir);
out = HalvadeConf.getOutDir(context.getConfiguration());
jobId = context.getJobID().toString();
taskId = context.getTaskAttemptID().toString();
Expand Down
41 changes: 39 additions & 2 deletions halvade/src/be/ugent/intec/halvade/tools/AlignerInstance.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@
import be.ugent.intec.halvade.utils.HalvadeConf;
import be.ugent.intec.halvade.utils.ProcessBuilderWrapper;
import fi.tkk.ics.hadoop.bam.SAMRecordWritable;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URISyntaxException;
import java.util.Arrays;
import net.sf.samtools.SAMFileHeader;
import net.sf.samtools.SAMRecord;
Expand Down Expand Up @@ -58,12 +61,15 @@ public abstract class AlignerInstance {
protected boolean keepChrSplitPairs;
protected boolean keep = false;
protected ChromosomeSplitter splitter;
protected HalvadeHeartBeat hhb;
protected int containerMinusTasksLeft;
protected boolean redistribute;


protected AlignerInstance(Mapper.Context context, String bin) throws IOException {
protected AlignerInstance(Mapper.Context context, String bin) throws IOException, URISyntaxException {
AlignerInstance.context = context;
header = null;
containerMinusTasksLeft = HalvadeConf.lessTasksLeftThanContainers(context.getConfiguration());
redistribute = HalvadeConf.getRedistribute(context.getConfiguration());
writableRecord = new SAMRecordWritable();
writableRegion = new ChromosomeRegion();
writeableCompactRegion = new GenomeSJ();
Expand All @@ -84,6 +90,37 @@ protected AlignerInstance(Mapper.Context context, String bin) throws IOException
keep = HalvadeConf.getKeepFiles(context.getConfiguration());
}

protected void getIdleCores(Mapper.Context context) throws IOException {
int totalCores = HalvadeConf.getVcores(context.getConfiguration());
int usedCores = 0;
// run "ps -eo args" and search for any process with bwa aln
// count the -t options of all bwa aln instances this is used cores;
String line;
Process p = Runtime.getRuntime().exec("ps -eo args");
BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
while ((line = input.readLine()) != null) {
if(line.contains("bwa")) {
System.out.println(line); //<-- Parse data here.
if(line.contains("sampe") || line.contains("samse"))
usedCores++;
else if (line.contains("aln")) {
String[] cmd = line.split("\\s+");
int i = 0;
while (i < cmd.length && !cmd[i].equals("-t"))
i++;
int alnCores = 1;
if (i + 1 < cmd.length)
alnCores = Integer.parseInt(cmd[i+1]);
Logger.DEBUG("Aln used cores: " + alnCores);
usedCores += alnCores;
}
}
}
input.close();

threads = Math.max(threads, totalCores - usedCores);
}

protected int feedLine(String line, ProcessBuilderWrapper proc) throws IOException {
if (proc.getState() != 1) {
Logger.DEBUG("writing \'" + line +"\' to process with state " + proc.getState());
Expand Down
49 changes: 31 additions & 18 deletions halvade/src/be/ugent/intec/halvade/tools/BWAAlnInstance.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@ public class BWAAlnInstance extends AlignerInstance {
private BufferedWriter fastqFile1;
private BufferedWriter fastqFile2;
private String taskId;
private String alnCustomArgs;

private BWAAlnInstance(Mapper.Context context, String bin) throws IOException, URISyntaxException {
super(context, bin);
taskId = context.getTaskAttemptID().toString();
taskId = taskId.substring(taskId.indexOf("m_"));
ref = HalvadeFileUtils.downloadBWAIndex(context, taskId);
alnCustomArgs = HalvadeConf.getCustomArgs(context.getConfiguration(), "bwa", "aln");
}

public int feedLine(String line, int read) throws IOException, InterruptedException {
Expand All @@ -60,7 +62,8 @@ public int feedLine(String line, int read) throws IOException, InterruptedExcept
return feedLine(line, reads1);
} else if (read == 2) {
fastqFile2.write(line + "\n");
return feedLine(line, reads2);
if(threads > 1) return feedLine(line, reads2);
else return 0;
}
return -1;
}
Expand All @@ -87,10 +90,13 @@ protected String getFileName(String dir, String id, boolean isSai, int read) {
protected void startAligner(Mapper.Context context) throws IOException, InterruptedException {
// make command
// use half the threads if paired reads ( 2 instances of aln will run)
if(redistribute && containerMinusTasksLeft > 0) {
getIdleCores(context);
Logger.DEBUG("Redistributing cores: using " + threads);
}
int threadsToUse = threads;
if (isPaired && threadsToUse > 1 ) threadsToUse /= 2;
String customArgs = HalvadeConf.getCustomArgs(context.getConfiguration(), "bwa", "aln");
String[] command1 = CommandGenerator.bwaAln(bin, ref, "/dev/stdin", getFileName(tmpdir, taskId, true, 1), threadsToUse, customArgs);
String[] command1 = CommandGenerator.bwaAln(bin, ref, "/dev/stdin", getFileName(tmpdir, taskId, true, 1), threadsToUse, alnCustomArgs);
reads1 = new ProcessBuilderWrapper(command1, bin);
reads1.setThreads(threadsToUse);
reads1.startProcess(null, System.err);
Expand All @@ -104,21 +110,21 @@ protected void startAligner(Mapper.Context context) throws IOException, Interrup
}
fastqFile1 = new BufferedWriter(new FileWriter(file1.getAbsoluteFile()));
if(isPaired) {
String[] command2 = CommandGenerator.bwaAln(bin, ref, "/dev/stdin", getFileName(tmpdir, taskId, true, 2), threadsToUse, customArgs);
reads2 = new ProcessBuilderWrapper(command2, bin);
reads2.setThreads(threadsToUse);
reads2.startProcess(null, System.err);
if(!reads2.isAlive())
throw new ProcessException("BWA aln", reads2.getExitState());
if(threads > 1) {
String[] command2 = CommandGenerator.bwaAln(bin, ref, "/dev/stdin", getFileName(tmpdir, taskId, true, 2), threadsToUse, alnCustomArgs);
reads2 = new ProcessBuilderWrapper(command2, bin);
reads2.setThreads(threadsToUse);
reads2.startProcess(null, System.err);
if(!reads2.isAlive())
throw new ProcessException("BWA aln", reads2.getExitState());
}
File file2 = new File(getFileName(tmpdir, taskId,false, 2));
if (!file2.exists()) {
file2.createNewFile();
}
fastqFile2 = new BufferedWriter(new FileWriter(file2.getAbsoluteFile()));
}

// hhb = new HalvadeHeartBeat(context);
// hhb.start();
}

/**
Expand All @@ -127,7 +133,7 @@ protected void startAligner(Mapper.Context context) throws IOException, Interrup
*/
@Override
public int getState() {
if (isPaired)
if (isPaired & threads > 1)
return reads1.getState() & reads2.getState();
else
return reads1.getState();
Expand All @@ -140,24 +146,31 @@ private void closeBWAAln() throws InterruptedException {
reads1.getSTDINWriter().close();
fastqFile1.close();
if(isPaired) {
reads2.getSTDINWriter().flush();
reads2.getSTDINWriter().close();
fastqFile2.close();
if(threads > 1) {
reads2.getSTDINWriter().flush();
reads2.getSTDINWriter().close();
}
}
} catch (IOException ex) {
// hhb.jobFinished();
// hhb.join();
Logger.EXCEPTION(ex);
throw new ProcessException("BWA aln", -1);
}

int error = reads1.waitForCompletion();
// hhb.jobFinished();
// hhb.join();
if(error != 0)
throw new ProcessException("BWA aln", error);
context.getCounter(HalvadeCounters.TIME_BWA_ALN).increment(reads1.getExecutionTime());
if(isPaired) {
if(threads == 1) {
String[] command2 = CommandGenerator.bwaAln(bin, ref, getFileName(tmpdir, taskId,false, 2), getFileName(tmpdir, taskId, true, 2),
threads, alnCustomArgs);
reads2 = new ProcessBuilderWrapper(command2, bin);
reads2.setThreads(threads);
reads2.startProcess(null, System.err);
if(!reads2.isAlive())
throw new ProcessException("BWA aln", reads2.getExitState());
}
error = reads2.waitForCompletion();
if(error != 0)
throw new ProcessException("BWA aln", error);
Expand Down
5 changes: 0 additions & 5 deletions halvade/src/be/ugent/intec/halvade/tools/GATKTools.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,6 @@ public void runBaseRecalibrator(String input, String table, String ref, String[]
*/

ArrayList<String> command = new ArrayList<>();
String[] covString = {
"-cov", "ReadGroupCovariate",
"-cov", "QualityScoreCovariate",
"-cov", "ContextCovariate"};
String[] gatkcmd = {
java, mem, "-jar", gatk,
"-T", "BaseRecalibrator",
Expand All @@ -128,7 +124,6 @@ public void runBaseRecalibrator(String input, String table, String ref, String[]
"-L", region,
DISABLE_VCF_LOCKING};
command.addAll(Arrays.asList(gatkcmd));
command.addAll(Arrays.asList(covString));
for(String knownSite : knownSites) {
command.add("-knownSites");
command.add(knownSite);
Expand Down
Loading

0 comments on commit e82c955

Please sign in to comment.