Skip to content

Commit

Permalink
update to make faster
Browse files Browse the repository at this point in the history
  • Loading branch information
vasudeva8 committed Jul 2, 2024
1 parent c553f40 commit 9ce3917
Show file tree
Hide file tree
Showing 4 changed files with 435 additions and 377 deletions.
197 changes: 115 additions & 82 deletions samples/DEMO.md
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,9 @@ The array aux fields can be updated using bam_aux_update_array api.
Refer: mod_aux_ba.c

Shows the records updated with an array of integers, containing count of ACGT
and N in that order.
and N in that order. The bases are decoded before count for the sake of
simplicity. Refer qtask_ordered.c for a better counting where decoding is made
outside the loop.

./mod_aux_ba samtools/test/mpileup/mpileup.1.bam

Expand Down Expand Up @@ -1386,22 +1388,21 @@ Refer: flags_htsopt_field.c
The HTSLib api supports thread pooling for better performance. There are a few
ways in which this can be used. The pool can be made specific for a file or a
generic pool can be created and shared across multiple files. Thread pool can
also be used to execute user defined tasks. The tasks are to be queued on a
queue and threads in pool executes them and results can be queued back if
required.

The samples are trivial ones to showcase the usage of api and the number of
threads to use has to be identified based on complexity and parallelism of the
job.
also be used to execute user defined tasks. The tasks are to be added to queue,
threads in pool executes them and results can be queued back if required.

To have a thread pool specific for a file, hts_set_opt api can be used with the
file pointer, HTS_OPT_NTHREADS and the number of threads to use in the pool.
Closure of file releases the thread pool as well. To have a thread pool which
can be shared across different files, it needs to be initialized using
hts_tpool_init api, passing number of threads as argument. This thread pool can
be associated with a file using hts_set_opt api. The file pointer,
HTS_OPT_THREAD_POOL and the thread pool address are to be passed as arguments
to api. The thread pool has to be released with hts_tpool_destroy.
file pointer, HTS_OPT_NTHREADS and the number of threads to be in the pool.
Thread pool is released on closure of file. To have a thread pool which can be
shared across different files, it needs to be initialized using hts_tpool_init
api, passing number of threads as an argument. This thread pool can be
associated with a file using hts_set_opt api. The file pointer,
HTS_OPT_THREAD_POOL and the thread pool address are to be passed as arguments to
the api. The thread pool has to be released with hts_tpool_destroy.

The samples are trivial ones to showcase the usage of api. The number of threads
to use for different tasks has to be identified based on complexity and
parallelism of the task.

Below excerpt shows file specific thread pool,

Expand Down Expand Up @@ -1444,9 +1445,9 @@ The order of execution of threads are decided based on many factors and load on
each task may vary, so the completion of the tasks may not be in the order of
their queueing. The queues can be used in two different ways, one where the
result is enqueued to queue again to be read in same order as initial queueing,
two where the resuls is not enqueued and is readily available, possibly in a
different order than initial queueing. Explicitly created threads can also be
used along with hts thread pool usage.
second where the resuls are not enqueued and completed possibly in a different
order than initial queueing. Explicitly created threads can also be used along
with hts thread pool usage.

hts_tpool_process_init initializes the queue / process, associates a queue with
thread pool and reserves space for given number of tasks on queue. It takes a
Expand All @@ -1464,124 +1465,156 @@ all the piled up tasks are processed, a possible case when the queueing and
processing happen at different speeds. hts_tpool_process_shutdown api stops the
processing of queue.

There are a few apis which let the user to check the status of processing. The
api hts_tpool_process_empty shows whether all the tasks are completed or not.
The api hts_tpool_process_sz gives the number of tasks, at different states of
processing. The api hts_tpool_process_len gives the number of results in output
queue waiting to be collected.

The order of execution of tasks depends on the number of threads involved and
how the threads are scheduled by operating system. When the results are enqueued
back to queue, they are read in same order of enqueueing of task and in that
case the order of execution will not be noticed. When the results are not
enqueued the results are available right away and the order of execution may be
noticeable. Based on the nature of task and need of order maintenance, users
noticeable. Based on the nature of task and the need of order maintenance, users
can select either of the queueing.

Below excerpts shows the usage of queues and threads in both cases. In the 1st,
alignments are updated with an aux tag indicating GC ratio. The order of data
has to be maintained even after update, hence the result queueing is used to
ensure same order as initial. A number of alignments are bunched together for
optimal processing.
ensure same order as initial. A number of alignments are bunched together and
reuse of allocated memory is made to make it perform better. A sentinel job is
used to identify the completion of all tasks at the result collection side.
...
void *thread_ordered_proc(void *args)
{
...
for ( i = 0; i < bamdata->count; ++i) {
//add count
if (addcount(bamdata->bamarray[i]) < 0) {
...
return bamdata;
for (pos = 0; pos < bamdata->bamarray[i]->core.l_qseq; ++pos) {
count[bam_seqi(data,pos)]++;
...
gcratio = (count[2] /*C*/ + count[4] /*G*/) / (float) (count[1] /*A*/ + count[8] /*T*/ + count[2] + count[4]);

if (bam_aux_append(bamdata->bamarray[i], "xr", 'f', sizeof(gcratio), (const uint8_t*)&gcratio) < 0) {
...
void *threadfn_orderedwrite(void *args)
{
...
do {
//get result and write; wait if no result is in queue - until shutdown of queue
while ((r = hts_tpool_next_result(tdata->queue))) {
bamdata = (data*) hts_tpool_result_data(r);
for (i = 0; i < bamdata->count; ++i) {
if (sam_write1(tdata->outfile, tdata->samhdr, bamdata->bamarray[i]) < 0) {
...
hts_tpool_delete_result(r, 0); //release the result memory
..
} while (pthread_cond_timedwait(tdata->done, tdata->lock, &timeout) == ETIMEDOUT);
...
//get result and write; wait if no result is in queue - until shutdown of queue
while (tdata->result == 0 &&
(r = hts_tpool_next_result_wait(tdata->queue)) != NULL) {
bamdata = (data*) hts_tpool_result_data(r);
...
for (i = 0; i < bamdata->count; ++i) {
if (sam_write1(tdata->outfile, tdata->samhdr, bamdata->bamarray[i]) < 0) {
...
hts_tpool_delete_result(r, 0); //release the result memory

// Shut down the process queue. If we stopped early due to a write failure,
// this will signal to the other end that something has gone wrong.
hts_tpool_process_shutdown(tdata->queue);
...
int main(int argc, char *argv[])
{
...
if (!(pool = hts_tpool_init(cnt))) { //thread pool
...
tpool.pool = pool; //to share the pool for file read and write as well
//queue to use with thread pool, for task and results
if (!(queue = hts_tpool_process_init(pool, cnt * 2, 0))) {
...
if (!(infile = sam_open(inname, "r"))) {
...
if (!(outfile = sam_open(file, "w"))) {
//share the thread pool with i/o files
if (hts_set_opt(infile, HTS_OPT_THREAD_POOL, &tpool) < 0 ||
hts_set_opt(outfile, HTS_OPT_THREAD_POOL, &tpool) < 0) {
...
//start output writer thread for ordered processing
if (pthread_create(&thread, NULL, threadfn_orderedwrite, &twritedata)) {
...
while (c >= 0) {
bamdata = getbamstorage(chunk);
//read alignments, upto max size for this lot
for (cnt = 0; cnt < bamdata->size; ++cnt) {
if (!(bamdata = getbamstorage(chunk, &bamcache))) {
...
for (cnt = 0; cnt < bamdata->maxsize; ++cnt) {
c = sam_read1(infile, in_samhdr, bamdata->bamarray[cnt]);
...
if (hts_tpool_dispatch3(pool, queue, thread_ordered_proc, bamdata,
cleanup_bamstorage, cleanup_bamstorage,
0) == -1) {
...
//queue the lot for processing
if (queue) {
if (-1 == c) {
// EOF read, send a marker to tell the threadfn_orderedwrite()
// function to shut down.
if (hts_tpool_dispatch(pool, queue, thread_ordered_proc,
bamdata) == -1) {
NULL) == -1) {
...
if (-1 == c) {
//EOF read, trigger processing for anything pending, NOTE: will be blocked until queue is cleared
if (hts_tpool_process_flush(queue) == -1) {
hts_tpool_process_shutdown(queue);
...
usleep(WAIT * 1000000); //to ensure result check is done atleast once after processing
//trigger exit for ordered write thread
pthread_cond_signal(twritedata.done);
// Wait for threadfn_orderedwrite to finish.
if (started_thread) {
pthread_join(thread, NULL);
...
//shutdown queues to exit the result wait
hts_tpool_process_shutdown(queue);
if (queue) {
// Once threadfn_orderedwrite has stopped, the queue can be
// cleaned up.
hts_tpool_process_destroy(queue);
}
...
Refer: qtask_ordered.c

In this 2nd, the alignments from input are split to different files containing
alignment specific to given regions. The order in which the output files are
created doesnt matter and hence no result queueing is used.
In this 2nd, the bases are counted and GC ratio of whole file is calculated.
Order in which bases are counted is not relevant and no result queue required.
The queue is created as input only.
...
void * splittoregions(void *args)
void *thread_unordered_proc(void *args)
{
...
if (!(infile = sam_open(data->infile, "r"))) {
...
if (!(in_samhdr = sam_hdr_read(infile))) {
...
if (!(bamdata = bam_init1())) {
...
if (!(idx = sam_index_load(infile, data->infile))) {
...
if (!(iter = sam_itr_querys(idx, in_samhdr, region))) {
...
if (!(outfile = sam_open(file, "w"))) {
for ( i = 0; i < bamdata->count; ++i) {
data = bam_get_seq(bamdata->bamarray[i]);
for (pos = 0; pos < bamdata->bamarray[i]->core.l_qseq; ++pos) {
...
if (sam_hdr_write(outfile, in_samhdr) < 0) {
counts[bam_seqi(data, pos)]++;
...
while ((ret = sam_itr_next(infile, iter, bamdata)) >= 0) { //read and write relevant data
if (sam_write1(outfile, in_samhdr, bamdata) < 0) {
//update result and add the memory block for reuse
pthread_mutex_lock(&bamdata->cache->lock);
for (i = 0; i < 16; i++) {
bamdata->bases->counts[i] += counts[i];
}

bamdata->next = bamdata->cache->list;
bamdata->cache->list = bamdata;
pthread_mutex_unlock(&bamdata->cache->lock);
...
int main(int argc, char *argv[])
{
...
//get regions from bedfile
if ((regcnt = readbedfile(bedfile, &regions)) < 0) {
...
if (!(pool = hts_tpool_init(cnt))) { //thread pool
...
if (!(queue = hts_tpool_process_init(pool, cnt * 2, 1))) {
...
for (c = 0; c < regcnt; ++c) {
c = 0;
while (c >= 0) {
...
for (cnt = 0; cnt < bamdata->maxsize; ++cnt) {
c = sam_read1(infile, in_samhdr, bamdata->bamarray[cnt]);
...
//schedule jobs to run in parallel
if (hts_tpool_dispatch(pool, queue, splittoregions, task) < 0) {
if (c >= -1 ) {
...
//trigger processing for anything pending, NOTE: will be blocked until queue is cleared
if (hts_tpool_process_flush(queue) == -1) {
if (hts_tpool_dispatch3(pool, queue, thread_unordered_proc, bamdata,
cleanup_bamstorage, cleanup_bamstorage,
0) == -1) {
...
//shutdown queues to exit the result wait
hts_tpool_process_shutdown(queue);
if (-1 == c) {
// EOF read, ensure all are processed, waits for all to finish
if (hts_tpool_process_flush(queue) == -1) {
fprintf(stderr, "Failed to flush queue\n");
} else { //all done
//refer seq_nt16_str to find position of required bases
fprintf(stdout, "GCratio: %f\nBase counts:\n",
(gccount.counts[2] /*C*/ + gccount.counts[4] /*G*/) / (float)
(gccount.counts[1] /*A*/ + gccount.counts[8] /*T*/ +
gccount.counts[2] + gccount.counts[4]));
...
if (queue) {
hts_tpool_process_destroy(queue);
}
...
Refer: qtask_unordered.c

Expand Down
5 changes: 2 additions & 3 deletions samples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,8 @@ indexed.
[Qtask_unordered][Qtask_unordered]

This application showcases the use of queues and threads for custom
processing. In this, alignments in input files are split to different files
for requested regions. The processing may occur in any order and results may
appear in the order completion of processing.
processing. The count of bases and GC ratio are calculated and displayed.
The order of counting is irrelevant and hence ordered retrieval is not used.

### More Information

Expand Down
Loading

0 comments on commit 9ce3917

Please sign in to comment.