Skip to content

Commit

Permalink
Added chr to increase parallelism level (#39)
Browse files Browse the repository at this point in the history
* Added chr to increase parallelism level

* Change data source to PGSQL
  • Loading branch information
mwiewior authored Jul 17, 2017
1 parent e920ca9 commit 9267fe5
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 6 deletions.
15 changes: 9 additions & 6 deletions R/CNVCALLER.RUNNER/inst/run_cnvcaller.R
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ read_parameters <- function(tabName, id, conn){
K_from <- parameters[1,'k_from']
K_to <- parameters[1,'k_to']
lmax <- parameters[1,'lmax']
chr <- parameters[1,'chr']
return(list(caller=caller,
cov_table=cov_table,
mapp_thresh=mapp_thresh,
Expand All @@ -40,7 +41,8 @@ read_parameters <- function(tabName, id, conn){
gc_thresh_to=gc_thresh_to,
K_from=K_from,
K_to=K_to,
lmax=lmax))
lmax=lmax,
chr=chr))
}

save_calls <- function(calls, conn){
Expand All @@ -53,11 +55,12 @@ save_calls <- function(calls, conn){
}
}

read_coverage_table <- function(cov_table, conn){
read_coverage_table <- function(cov_table, conn,chr){
#query <- paste("select * from ", cov_table, sep="")
query <- paste("select * from ", cov_table, sep="")
query <- paste("select sample_name,target_id,chr,pos_min,pos_max,cov_avg from ", cov_table," where chr='",chr,"'", sep="")
print(query)
ds <- dbGetQuery(conn, query)
colnames(ds) <- c("sample_name", "target_id", "chr", "pos_min", "pos_max", "cov_avg")
colnames(ds) <- c("sample_name", "target_id", "chr", "pos_min", "pos_max", "cov_avg") #hardcoded column order!!!
ds
}

Expand Down Expand Up @@ -93,8 +96,8 @@ drv_psql <- JDBC("org.postgresql.Driver", "./postgresql-42.1.1.jar",identifier.q
conn_psql <- dbConnect(drv_psql, "jdbc:postgresql://cdh00.ii.pw.edu.pl:15432/cnv-opt", "cnv-opt", "zsibio321")

parameters <- read_parameters(opt$tabName, opt$id, conn_psql)
#print(parameters)
cov_table <- read_coverage_table(parameters$cov_table, conn_hive)
print(parameters)
cov_table <- read_coverage_table(parameters$cov_table, conn_psql,parameters$chr)
#print(cov_table)
calls <- run_caller(parameters, cov_table)
#print(calls)
Expand Down
23 changes: 23 additions & 0 deletions sql/loading_hive_pgsql.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*run from zsi-bio-spark-shell*/

val createTableString="""CREATE TABLE IF NOT EXISTS pgsql.coverage_target
USING org.apache.spark.sql.jdbc
OPTIONS (
url "jdbc:postgresql://cdh00.ii.pw.edu.pl:15432/cnv-opt",
dbtable "public.coverage_target",
user 'cnv-opt',
password 'zsibio321'
)"""

spark.sqlContext.sql(createTableString)

val insertString="""INSERT INTO pgsql.coverage_target SELECT
chr,
sample_name,
target_id,
pos_min,
pos_max,
cov_avg
FROM cnv.coverage_target
"""
spark.sqlContext.sql(insertString)
42 changes: 42 additions & 0 deletions sql/parameter_generator.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
--create backup
create table test_params_bck as select * from test_parameters;

--generate
drop table if exists test_parameters_chr;
create table test_parameters_chr as
select cast(row_number() over () as integer ) as id,
caller ,
cast('public.coverage_target' as text) as cov_table,
mapp_thresh ,
cov_thresh_from ,
cov_thresh_to ,
length_thresh_from ,
length_thresh_to ,
gc_thresh_from ,
gc_thresh_to ,
k_from ,
k_to ,
lmax,
chr
from test_params_bck par, (select cast (generate_series(1,22) as text ) as chr
union
select 'X'
union
select 'Y') chr;

--delete from test_parameters_chr where chr<>'Y'

---create coverage table in postgres
CREATE TABLE coverage_target(
chr text,
sample_name ,
target_id int,
pos_min int,
pos_max int,
cov_avg numeric);

ALTER TABLE public.coverage_target OWNER TO "cnv-opt";
CREATE INDEX coveage_targe_idx1 ON coverage_target(chr);



0 comments on commit 9267fe5

Please sign in to comment.