Skip to content
This repository has been archived by the owner on Nov 28, 2020. It is now read-only.

Commit

Permalink
Feature/pyspark support and VCF (#153)
Browse files Browse the repository at this point in the history
* VCF datasource

* adding hadoop.io.compression.codecs

* Package rename

* Documentation
  • Loading branch information
mwiewior authored Oct 17, 2019
1 parent 6b84426 commit d83c3f0
Show file tree
Hide file tree
Showing 10 changed files with 243 additions and 6 deletions.
9 changes: 6 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import scala.util.Properties

name := """bdg-sequila"""

version := "0.5.5-spark-2.4.2"
version := "0.5.6-spark-2.4.3"

organization := "org.biodatageeks"

scalaVersion := "2.11.8"

val DEFAULT_SPARK_2_VERSION = "2.4.2"
val DEFAULT_SPARK_2_VERSION = "2.4.3"
val DEFAULT_HADOOP_VERSION = "2.6.5"


Expand Down Expand Up @@ -63,13 +63,16 @@ libraryDependencies += "org.apache.derby" % "derbyclient" % "10.14.2.0"

libraryDependencies += "org.biodatageeks" % "bdg-performance_2.11" % "0.2-SNAPSHOT" excludeAll (ExclusionRule("org.apache.hadoop"))

libraryDependencies += "org.disq-bio" % "disq" % "0.3.0"
libraryDependencies += "org.disq-bio" % "disq" % "0.3.3"

libraryDependencies += "com.lifeomic" % "spark-vcf" % "0.3.2"




fork := true
fork in Test := true

//parallelExecution in Test := false
//javaOptions in Test += "-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=9999"
//javaOptions in run += "-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=9999"
Expand Down
29 changes: 28 additions & 1 deletion docs/source/fileformats/fileformats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -374,11 +374,38 @@ Coverage information can be exported to standard BED format. Actually, calculate
s"""
|CREATE TABLE ${tableNameADAM}
|USING org.biodatageeks.datasources.ADAM.ADAMDataSource
|OPTIONS(path "/data/input/multisample/*.bame")
|OPTIONS(path "/data/input/multisample/*.bam")
|
""".stripMargin)
val cov = ss.sql("SELECT * FROM bdg_coverage('${tableNameBAM}','NA12878', 'blocks')")
cov.coalesce(1).write.mode("overwrite").option("delimiter", "\t").csv("/data/output/cov.bed")
VCF
****
.. code-block:: scala
val tableNameVCF = "test_vcf"
ss.sql("CREATE DATABASE BDGEEK")
ss.sql("USE BDGEEK")
ss.sql(
s"""
|CREATE TABLE ${tableNameVCF}
|USING org.biodatageeks.datasources.VCF.VCFDataSource
|OPTIONS(path "/data/input/vcf/*.vcf")
|
""".stripMargin)
ss.sql(s"SELECT * FROM ${tableNameVCF} LIMIT 5").show(false)
+------+-----+-----+-----+---------+---+---+----+------+--------------------+---+---+---+-----+---------+
|contig| pos|start| stop| id|ref|alt|qual|filter| info| gt| gq| dp| hq|sample_id|
+------+-----+-----+-----+---------+---+---+----+------+--------------------+---+---+---+-----+---------+
| 20|14370|14369|14370|rs6054257| G| A| 29| PASS|[ns -> 3, db -> D...|0|0| 48| 1|51,51| NA00001|
| 20|14370|14369|14370|rs6054257| G| A| 29| PASS|[ns -> 3, db -> D...|1|0| 48| 8|51,51| NA00002|
| 20|14370|14369|14370|rs6054257| G| A| 29| PASS|[ns -> 3, db -> D...|1/1| 43| 5| .,.| NA00003|
| 20|17330|17329|17330| .| T| A| 3| q10|[ns -> 3, dp -> 1...|0|0| 49| 3|58,50| NA00001|
| 20|17330|17329|17330| .| T| A| 3| q10|[ns -> 3, dp -> 1...|0|1| 3| 5| 65,3| NA00002|
+------+-----+-----+-----+---------+---+---+----+------+--------------------+---+---+---+-----+---------+
23 changes: 23 additions & 0 deletions src/main/resources/core-site.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<configuration>
<property>
<name>hadoop.io.compression.codecs</name>
<value>org.seqdoop.hadoop_bam.util.BGZFEnhancedGzipCodec</value>
</property>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ object UTVFRegister {
//
// spark
// .sql("select * from range(1,2)")
// .explain(true)
// .explain(true

//val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkathon")
//val context: SparkContext = new SparkContext(conf)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.biodatageeks.datasources.VCF

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider}

class VCFDataSource extends DataSourceRegister
with RelationProvider {

override def shortName(): String = "VCF"
override def createRelation(sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
new VCFRelation(parameters("path"))(sqlContext)
}

}
42 changes: 42 additions & 0 deletions src/main/scala/org/biodatageeks/datasources/VCF/VCFRelation.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package org.biodatageeks.datasources.VCF

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.sources._


class VCFRelation(path: String)(@transient val sqlContext: SQLContext) extends BaseRelation
with PrunedScan
with Serializable
with Logging{

val spark = sqlContext.sparkSession


lazy val df = spark.read
.format("com.lifeomic.variants")
.option("use.format.type", "false")
.load(path)
.withColumnRenamed("sampleid","sample_id")
.withColumnRenamed("chrom", "contig")




override def schema: org.apache.spark.sql.types.StructType = {
df.schema
}

override def buildScan(requiredColumns: Array[String] ) = {

{
if (requiredColumns.length > 0)
df.select(requiredColumns.head, requiredColumns.tail: _*)
else
df
}.rdd


}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package org.biodatageeks.preprocessing.coverage

import org.apache.spark.sql.{SequilaSession, SparkSession}
import org.biodatageeks.utils.SequilaRegister

object CovRun {

def main(args: Array[String]): Unit = {
//val bamPath = "/Users/marek/Downloads/data/NA12878.ga2.exome.maq.recal.bam"
val bamPath = "/Users/marek/data/NA12878.chrom20.ILLUMINA.bwa.CEU.low_coverage.20121211.bam"
val tableNameBAM = "reads"

val spark = SparkSession
.builder()
.config("spark.driver.memory", "8g")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.master("local[4]")
.getOrCreate()
val ss = new SequilaSession(spark)
SequilaRegister.register(ss)
ss.sqlContext.setConf("spark.biodatageeks.bam.useGKLInflate","false")
ss.sqlContext.setConf("spark.biodatageeks.bam.useSparkBAM","true")
ss.sql(s"DROP TABLE IF EXISTS ${tableNameBAM}")
ss.sql(
s"""
|CREATE TABLE ${tableNameBAM}
|USING org.biodatageeks.datasources.BAM.BAMDataSource
|OPTIONS(path "${bamPath}")
|
""".stripMargin)

spark.time {
ss.sql(
s"""SELECT * FROM bdg_coverage('${tableNameBAM}','NA12878.chr','bdg')
""".stripMargin).count
}

// spark.time {
// ss.sql(
// s"""SELECT * FROM bdg_coverage('${tableNameBAM}','NA12878.chr','bdg')
// | WHERE start >= 25109993
// """.stripMargin).show(20)
// }

// spark.time {
// ss.sql(
// s"""SELECT * FROM bdg_coverage('${tableNameBAM}','NA12878.chr','mosdepth')
// | WHERE start >= 25109993
// """.stripMargin).show(20)
// }
}

}
//25109928
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ case class BDGCoveragePlan [T<:BDGAlignInputFormat](plan: LogicalPlan, spark: Sp
logger.info(s"Processing ${samplePath} with reference: ${refPath}")
lazy val alignments = readBAMFile(spark.sqlContext, samplePath, if( refPath == null || refPath.length == 0) None else Some(refPath))

val filterFlag = spark.sqlContext.getConf(BDGInternalParams.filterReadsByFlag, "1796").toInt
val filterFlag = spark.conf.get(BDGInternalParams.filterReadsByFlag, "1796").toInt

lazy val events = CoverageMethodsMos.readsToEventsArray(alignments,filterFlag)

Expand Down
24 changes: 24 additions & 0 deletions src/test/resources/vcf/test.vcf
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
##fileformat=VCFv4.1
##fileDate=20090805
##source=myImputationProgramV3.1
##reference=file:///seq/references/1000GenomesPilot-NCBI36.fasta
##contig=<ID=20,length=62435964,assembly=B36,md5=f126cdf8a6e0c7f379d618ff66beb2da,species="Homo sapiens",taxonomy=x>
##phasing=partial
##INFO=<ID=NS,Number=1,Type=Integer,Description="Number of Samples With Data">
##INFO=<ID=DP,Number=1,Type=Integer,Description="Total Depth">
##INFO=<ID=AF,Number=A,Type=Float,Description="Allele Frequency">
##INFO=<ID=AA,Number=1,Type=String,Description="Ancestral Allele">
##INFO=<ID=DB,Number=0,Type=Flag,Description="dbSNP membership, build 129">
##INFO=<ID=H2,Number=0,Type=Flag,Description="HapMap2 membership">
##FILTER=<ID=q10,Description="Quality below 10">
##FILTER=<ID=s50,Description="Less than 50% of samples have data">
##FORMAT=<ID=GT,Number=1,Type=String,Description="Genotype">
##FORMAT=<ID=GQ,Number=1,Type=Integer,Description="Genotype Quality">
##FORMAT=<ID=DP,Number=1,Type=Integer,Description="Read Depth">
##FORMAT=<ID=HQ,Number=2,Type=Integer,Description="Haplotype Quality">
#CHROM POS ID REF ALT QUAL FILTER INFO FORMAT NA00001 NA00002 NA00003
20 14370 rs6054257 G A 29 PASS NS=3;DP=14;AF=0.5;DB;H2 GT:GQ:DP:HQ 0|0:48:1:51,51 1|0:48:8:51,51 1/1:43:5:.,.
20 17330 . T A 3 q10 NS=3;DP=11;AF=0.017 GT:GQ:DP:HQ 0|0:49:3:58,50 0|1:3:5:65,3 0/0:41:3
20 1110696 rs6040355 A G,T 67 PASS NS=2;DP=10;AF=0.333,0.667;AA=T;DB GT:GQ:DP:HQ 1|2:21:6:23,27 2|1:2:0:18,2 2/2:35:4
20 1230237 . T . 47 PASS NS=3;DP=13;AA=T GT:GQ:DP:HQ 0|0:54:7:56,60 0|0:48:4:51,51 0/0:61:2
20 1234567 microsat1 GTC G,GTCT 50 PASS NS=3;DP=9;AA=G GT:GQ:DP 0/1:35:4 0/2:17:2 1/1:40:3
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package pl.edu.pw.ii.biodatageeks.tests

import java.io.{OutputStreamWriter, PrintWriter}

import com.holdenkarau.spark.testing.{DataFrameSuiteBase, SharedSparkContext}
import org.bdgenomics.utils.instrumentation.{Metrics, MetricsListener, RecordedMetrics}
import org.biodatageeks.utils.BDGInternalParams
import org.scalatest.{BeforeAndAfter, FunSuite}

class VCFDataSourceTestSuite extends FunSuite with DataFrameSuiteBase with BeforeAndAfter with SharedSparkContext{


val vcfPath = getClass.getResource("/vcf/test.vcf").getPath
val tableNameVCF = "variants"
before{
spark.sql(s"DROP TABLE IF EXISTS ${tableNameVCF}")
spark.sql(
s"""
|CREATE TABLE ${tableNameVCF}
|USING org.biodatageeks.datasources.VCF.VCFDataSource
|OPTIONS(path "${vcfPath}")
|
""".stripMargin)

}
test("VCF - Row count VCFDataSource"){
val query = s"SELECT * FROM ${tableNameVCF}"
spark
.sql(query)
.printSchema()

println(spark.sparkContext.hadoopConfiguration.get("hadoop.io.compression.codecs"))

assert(spark
.sql(query)
.first()
.getString(8) === "PASS")

assert(spark.sql(query).count() === 21L)
}




after{
spark.sql(s"DROP TABLE IF EXISTS ${tableNameVCF}")
}

}

0 comments on commit d83c3f0

Please sign in to comment.