Skip to content
This repository has been archived by the owner on Apr 27, 2018. It is now read-only.

Commit

Permalink
Add UDF for computing MD5 checksum. Issue #211
Browse files Browse the repository at this point in the history
  • Loading branch information
lintool committed Jul 28, 2016
2 parents 16db934 + 3b4ebe2 commit ab72ae4
Show file tree
Hide file tree
Showing 9 changed files with 134 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import org.apache.spark.SerializableWritable
import org.warcbase.data.ArcRecordUtils
import org.warcbase.io.ArcRecordWritable
import org.warcbase.spark.matchbox.ExtractDate.DateComponent
import org.warcbase.spark.matchbox.{ExtractDate, ExtractDomain}
import org.warcbase.spark.matchbox.{RemoveHttpHeader, ExtractDate, ExtractDomain}

class ArcRecord(r: SerializableWritable[ArcRecordWritable]) extends ArchiveRecord {
val getCrawlDate: String = ExtractDate(r.t.getRecord.getMetaData.getDate, DateComponent.YYYYMMDD)
Expand All @@ -21,4 +21,12 @@ class ArcRecord(r: SerializableWritable[ArcRecordWritable]) extends ArchiveRecor

val getContentString: String = new String(getContentBytes)

val getImageBytes: Array[Byte] = {
if (getContentString.startsWith("HTTP/"))
getContentBytes.slice(
getContentString.indexOf(RemoveHttpHeader.headerEnd)
+ RemoveHttpHeader.headerEnd.length, getContentBytes.length)
else
getContentBytes
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,6 @@ trait ArchiveRecord extends Serializable {
val getContentString: String

val getContentBytes: Array[Byte]

val getImageBytes: Array[Byte]
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import org.warcbase.data.{ArcRecordUtils, WarcRecordUtils}
import org.warcbase.io.GenericArchiveRecordWritable
import org.warcbase.io.GenericArchiveRecordWritable.ArchiveFormat
import org.warcbase.spark.matchbox.ExtractDate.DateComponent
import org.warcbase.spark.matchbox.{ExtractDate, ExtractDomain}
import org.warcbase.spark.matchbox.{RemoveHttpHeader, ExtractDate, ExtractDomain}

class GenericArchiveRecord(r: SerializableWritable[GenericArchiveRecordWritable]) extends ArchiveRecord {
var arcRecord: ARCRecord = null
Expand Down Expand Up @@ -68,4 +68,13 @@ class GenericArchiveRecord(r: SerializableWritable[GenericArchiveRecordWritable]
}

val getDomain: String = ExtractDomain(getUrl)
}

val getImageBytes: Array[Byte] = {
if (getContentString.startsWith("HTTP/"))
getContentBytes.slice(
getContentString.indexOf(RemoveHttpHeader.headerEnd)
+ RemoveHttpHeader.headerEnd.length, getContentBytes.length)
else
getContentBytes
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import org.archive.util.ArchiveUtils
import org.warcbase.data.WarcRecordUtils
import org.warcbase.io.WarcRecordWritable
import org.warcbase.spark.matchbox.ExtractDate.DateComponent
import org.warcbase.spark.matchbox.{ExtractDate, ExtractDomain}
import org.warcbase.spark.matchbox.{RemoveHttpHeader, ExtractDate, ExtractDomain}

class WarcRecord(r: SerializableWritable[WarcRecordWritable]) extends ArchiveRecord {
val ISO8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssX")
Expand All @@ -25,4 +25,13 @@ class WarcRecord(r: SerializableWritable[WarcRecordWritable]) extends ArchiveRec
val getUrl = r.t.getRecord.getHeader.getUrl

val getDomain = ExtractDomain(getUrl)

val getImageBytes: Array[Byte] = {
if (getContentString.startsWith("HTTP/"))
getContentBytes.slice(
getContentString.indexOf(RemoveHttpHeader.headerEnd)
+ RemoveHttpHeader.headerEnd.length, getContentBytes.length)
else
getContentBytes
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.warcbase.spark.matchbox

import java.io.ByteArrayInputStream
import javax.imageio.ImageIO

/**
* Created by youngbinkim on 7/7/16.
*/
object ComputeImageSize {
def apply(bytes: Array[Byte]): (Int, Int) = {
val in = new ByteArrayInputStream(bytes)

try {
val image = ImageIO.read(in)
if (image == null)
return (0, 0)
(image.getWidth(), image.getHeight())
} catch {
case e: Throwable => {
e.printStackTrace()
return (0, 0)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.warcbase.spark.matchbox

import java.security.MessageDigest


/**
* compute MD5 checksum..
*
*/
object ComputeMD5 {
/**
*
* @param bytes
* @return
*/
def apply(bytes: Array[Byte]): String = {
new String(MessageDigest.getInstance("MD5").digest(bytes))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.warcbase.spark.matchbox

import org.warcbase.spark.rdd.RecordRDD._
import org.apache.spark.rdd.RDD
import org.warcbase.spark.archive.io.ArchiveRecord


/**
* Extract most popular images
*
* limit: number of most popular images in the output
* timeoutVal: time allowed to connect to each image
*/
object ExtractPopularImages {
def apply(records: RDD[ArchiveRecord], limit: Int, minWidth: Int = 30, minHeight: Int = 30) = {
val res = records
.keepImages()
.map(r => ((r.getUrl, r.getImageBytes), 1))
.map(img => (ComputeMD5(img._1._2), (ComputeImageSize(img._1._2), img._1._1, img._2)))
.filter(img => img._2._1._1 >= minWidth && img._2._1._2 >= minHeight)
.reduceByKey((image1, image2) => (image1._1, image1._2, image1._3 + image2._3))
.takeOrdered(limit)(Ordering[Int].on(x => -x._2._3))
res.foreach(x => println(x._2._2 + "\t" + x._2._3))
res
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.warcbase.spark.matchbox

/**
* Created by youngbinkim on 7/9/16.
*/
object RemoveHttpHeader {
val headerEnd = "\r\n\r\n"
def apply(content: String): String = {
try {
if (content.startsWith("HTTP/"))
content.substring(content.indexOf(headerEnd) + headerEnd.length)
else
content
} catch {
case e: Exception => {
println(e)
null
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,17 @@ object RecordRDD extends java.io.Serializable {
&& !r.getUrl.endsWith("robots.txt"))
}

def keepImages() = {
rdd.filter(r =>
r.getCrawlDate != null
&& (
(r.getMimeType != null && r.getMimeType.contains("image/"))
|| r.getUrl.endsWith("jpg")
|| r.getUrl.endsWith("jpeg")
|| r.getUrl.endsWith("png"))
&& !r.getUrl.endsWith("robots.txt"))
}

def keepMimeTypes(mimeTypes: Set[String]) = {
rdd.filter(r => mimeTypes.contains(r.getMimeType))
}
Expand Down

0 comments on commit ab72ae4

Please sign in to comment.