From 9267fe552954ac7e8ce5124b012c7edc62ef04c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Wiewi=C3=B3rka?= Date: Mon, 17 Jul 2017 14:35:12 +0200 Subject: [PATCH] Added chr to increase parallelism level (#39) * Added chr to increase parallelism level * Change data source to PGSQL --- R/CNVCALLER.RUNNER/inst/run_cnvcaller.R | 15 +++++---- sql/loading_hive_pgsql.scala | 23 ++++++++++++++ sql/parameter_generator.sql | 42 +++++++++++++++++++++++++ 3 files changed, 74 insertions(+), 6 deletions(-) create mode 100644 sql/loading_hive_pgsql.scala create mode 100644 sql/parameter_generator.sql diff --git a/R/CNVCALLER.RUNNER/inst/run_cnvcaller.R b/R/CNVCALLER.RUNNER/inst/run_cnvcaller.R index d2cf8a4..76ddafa 100755 --- a/R/CNVCALLER.RUNNER/inst/run_cnvcaller.R +++ b/R/CNVCALLER.RUNNER/inst/run_cnvcaller.R @@ -29,6 +29,7 @@ read_parameters <- function(tabName, id, conn){ K_from <- parameters[1,'k_from'] K_to <- parameters[1,'k_to'] lmax <- parameters[1,'lmax'] + chr <- parameters[1,'chr'] return(list(caller=caller, cov_table=cov_table, mapp_thresh=mapp_thresh, @@ -40,7 +41,8 @@ read_parameters <- function(tabName, id, conn){ gc_thresh_to=gc_thresh_to, K_from=K_from, K_to=K_to, - lmax=lmax)) + lmax=lmax, + chr=chr)) } save_calls <- function(calls, conn){ @@ -53,11 +55,12 @@ save_calls <- function(calls, conn){ } } -read_coverage_table <- function(cov_table, conn){ +read_coverage_table <- function(cov_table, conn,chr){ #query <- paste("select * from ", cov_table, sep="") - query <- paste("select * from ", cov_table, sep="") + query <- paste("select sample_name,target_id,chr,pos_min,pos_max,cov_avg from ", cov_table," where chr='",chr,"'", sep="") + print(query) ds <- dbGetQuery(conn, query) - colnames(ds) <- c("sample_name", "target_id", "chr", "pos_min", "pos_max", "cov_avg") + colnames(ds) <- c("sample_name", "target_id", "chr", "pos_min", "pos_max", "cov_avg") #hardcoded column order!!! ds } @@ -93,8 +96,8 @@ drv_psql <- JDBC("org.postgresql.Driver", "./postgresql-42.1.1.jar",identifier.q conn_psql <- dbConnect(drv_psql, "jdbc:postgresql://cdh00.ii.pw.edu.pl:15432/cnv-opt", "cnv-opt", "zsibio321") parameters <- read_parameters(opt$tabName, opt$id, conn_psql) -#print(parameters) -cov_table <- read_coverage_table(parameters$cov_table, conn_hive) +print(parameters) +cov_table <- read_coverage_table(parameters$cov_table, conn_psql,parameters$chr) #print(cov_table) calls <- run_caller(parameters, cov_table) #print(calls) diff --git a/sql/loading_hive_pgsql.scala b/sql/loading_hive_pgsql.scala new file mode 100644 index 0000000..1331062 --- /dev/null +++ b/sql/loading_hive_pgsql.scala @@ -0,0 +1,23 @@ +/*run from zsi-bio-spark-shell*/ + +val createTableString="""CREATE TABLE IF NOT EXISTS pgsql.coverage_target +USING org.apache.spark.sql.jdbc +OPTIONS ( + url "jdbc:postgresql://cdh00.ii.pw.edu.pl:15432/cnv-opt", + dbtable "public.coverage_target", + user 'cnv-opt', + password 'zsibio321' +)""" + +spark.sqlContext.sql(createTableString) + +val insertString="""INSERT INTO pgsql.coverage_target SELECT + chr, + sample_name, + target_id, + pos_min, + pos_max, + cov_avg +FROM cnv.coverage_target +""" +spark.sqlContext.sql(insertString) \ No newline at end of file diff --git a/sql/parameter_generator.sql b/sql/parameter_generator.sql new file mode 100644 index 0000000..7815785 --- /dev/null +++ b/sql/parameter_generator.sql @@ -0,0 +1,42 @@ +--create backup +create table test_params_bck as select * from test_parameters; + +--generate +drop table if exists test_parameters_chr; +create table test_parameters_chr as +select cast(row_number() over () as integer ) as id, + caller , + cast('public.coverage_target' as text) as cov_table, + mapp_thresh , + cov_thresh_from , + cov_thresh_to , + length_thresh_from , + length_thresh_to , + gc_thresh_from , + gc_thresh_to , + k_from , + k_to , + lmax, + chr +from test_params_bck par, (select cast (generate_series(1,22) as text ) as chr +union +select 'X' +union +select 'Y') chr; + +--delete from test_parameters_chr where chr<>'Y' + +---create coverage table in postgres +CREATE TABLE coverage_target( + chr text, + sample_name , + target_id int, + pos_min int, + pos_max int, + cov_avg numeric); + + ALTER TABLE public.coverage_target OWNER TO "cnv-opt"; +CREATE INDEX coveage_targe_idx1 ON coverage_target(chr); + + +