From 3b4ebe297a217ada143f4084d3116a2785bc79ca Mon Sep 17 00:00:00 2001 From: Youngbin Kim Date: Tue, 26 Jul 2016 21:04:21 -0400 Subject: [PATCH] checksum --- .../warcbase/spark/archive/io/ArcRecord.scala | 10 ++++++- .../spark/archive/io/ArchiveRecord.scala | 2 ++ .../archive/io/GenericArchiveRecord.scala | 13 ++++++++-- .../spark/archive/io/WarcRecord.scala | 11 +++++++- .../spark/matchbox/ComputeImageSize.scala | 25 ++++++++++++++++++ .../warcbase/spark/matchbox/ComputeMD5.scala | 19 ++++++++++++++ .../spark/matchbox/ExtractPopularImages.scala | 26 +++++++++++++++++++ .../spark/matchbox/RemoveHttpHeader.scala | 21 +++++++++++++++ .../org/warcbase/spark/rdd/RecordRDD.scala | 11 ++++++++ 9 files changed, 134 insertions(+), 4 deletions(-) create mode 100644 warcbase-core/src/main/scala/org/warcbase/spark/matchbox/ComputeImageSize.scala create mode 100644 warcbase-core/src/main/scala/org/warcbase/spark/matchbox/ComputeMD5.scala create mode 100644 warcbase-core/src/main/scala/org/warcbase/spark/matchbox/ExtractPopularImages.scala create mode 100644 warcbase-core/src/main/scala/org/warcbase/spark/matchbox/RemoveHttpHeader.scala diff --git a/warcbase-core/src/main/scala/org/warcbase/spark/archive/io/ArcRecord.scala b/warcbase-core/src/main/scala/org/warcbase/spark/archive/io/ArcRecord.scala index c4ea11b..7aff49b 100644 --- a/warcbase-core/src/main/scala/org/warcbase/spark/archive/io/ArcRecord.scala +++ b/warcbase-core/src/main/scala/org/warcbase/spark/archive/io/ArcRecord.scala @@ -4,7 +4,7 @@ import org.apache.spark.SerializableWritable import org.warcbase.data.ArcRecordUtils import org.warcbase.io.ArcRecordWritable import org.warcbase.spark.matchbox.ExtractDate.DateComponent -import org.warcbase.spark.matchbox.{ExtractDate, ExtractDomain} +import org.warcbase.spark.matchbox.{RemoveHttpHeader, ExtractDate, ExtractDomain} class ArcRecord(r: SerializableWritable[ArcRecordWritable]) extends ArchiveRecord { val getCrawlDate: String = ExtractDate(r.t.getRecord.getMetaData.getDate, DateComponent.YYYYMMDD) @@ -21,4 +21,12 @@ class ArcRecord(r: SerializableWritable[ArcRecordWritable]) extends ArchiveRecor val getContentString: String = new String(getContentBytes) + val getImageBytes: Array[Byte] = { + if (getContentString.startsWith("HTTP/")) + getContentBytes.slice( + getContentString.indexOf(RemoveHttpHeader.headerEnd) + + RemoveHttpHeader.headerEnd.length, getContentBytes.length) + else + getContentBytes + } } diff --git a/warcbase-core/src/main/scala/org/warcbase/spark/archive/io/ArchiveRecord.scala b/warcbase-core/src/main/scala/org/warcbase/spark/archive/io/ArchiveRecord.scala index 610ebfc..1f4c992 100644 --- a/warcbase-core/src/main/scala/org/warcbase/spark/archive/io/ArchiveRecord.scala +++ b/warcbase-core/src/main/scala/org/warcbase/spark/archive/io/ArchiveRecord.scala @@ -14,4 +14,6 @@ trait ArchiveRecord extends Serializable { val getContentString: String val getContentBytes: Array[Byte] + + val getImageBytes: Array[Byte] } diff --git a/warcbase-core/src/main/scala/org/warcbase/spark/archive/io/GenericArchiveRecord.scala b/warcbase-core/src/main/scala/org/warcbase/spark/archive/io/GenericArchiveRecord.scala index 7e8b0ad..7a7e2ba 100644 --- a/warcbase-core/src/main/scala/org/warcbase/spark/archive/io/GenericArchiveRecord.scala +++ b/warcbase-core/src/main/scala/org/warcbase/spark/archive/io/GenericArchiveRecord.scala @@ -11,7 +11,7 @@ import org.warcbase.data.{ArcRecordUtils, WarcRecordUtils} import org.warcbase.io.GenericArchiveRecordWritable import org.warcbase.io.GenericArchiveRecordWritable.ArchiveFormat import org.warcbase.spark.matchbox.ExtractDate.DateComponent -import org.warcbase.spark.matchbox.{ExtractDate, ExtractDomain} +import org.warcbase.spark.matchbox.{RemoveHttpHeader, ExtractDate, ExtractDomain} class GenericArchiveRecord(r: SerializableWritable[GenericArchiveRecordWritable]) extends ArchiveRecord { var arcRecord: ARCRecord = null @@ -68,4 +68,13 @@ class GenericArchiveRecord(r: SerializableWritable[GenericArchiveRecordWritable] } val getDomain: String = ExtractDomain(getUrl) -} + + val getImageBytes: Array[Byte] = { + if (getContentString.startsWith("HTTP/")) + getContentBytes.slice( + getContentString.indexOf(RemoveHttpHeader.headerEnd) + + RemoveHttpHeader.headerEnd.length, getContentBytes.length) + else + getContentBytes + } +} \ No newline at end of file diff --git a/warcbase-core/src/main/scala/org/warcbase/spark/archive/io/WarcRecord.scala b/warcbase-core/src/main/scala/org/warcbase/spark/archive/io/WarcRecord.scala index 6b18bbc..bfae6c5 100644 --- a/warcbase-core/src/main/scala/org/warcbase/spark/archive/io/WarcRecord.scala +++ b/warcbase-core/src/main/scala/org/warcbase/spark/archive/io/WarcRecord.scala @@ -7,7 +7,7 @@ import org.archive.util.ArchiveUtils import org.warcbase.data.WarcRecordUtils import org.warcbase.io.WarcRecordWritable import org.warcbase.spark.matchbox.ExtractDate.DateComponent -import org.warcbase.spark.matchbox.{ExtractDate, ExtractDomain} +import org.warcbase.spark.matchbox.{RemoveHttpHeader, ExtractDate, ExtractDomain} class WarcRecord(r: SerializableWritable[WarcRecordWritable]) extends ArchiveRecord { val ISO8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssX") @@ -25,4 +25,13 @@ class WarcRecord(r: SerializableWritable[WarcRecordWritable]) extends ArchiveRec val getUrl = r.t.getRecord.getHeader.getUrl val getDomain = ExtractDomain(getUrl) + + val getImageBytes: Array[Byte] = { + if (getContentString.startsWith("HTTP/")) + getContentBytes.slice( + getContentString.indexOf(RemoveHttpHeader.headerEnd) + + RemoveHttpHeader.headerEnd.length, getContentBytes.length) + else + getContentBytes + } } diff --git a/warcbase-core/src/main/scala/org/warcbase/spark/matchbox/ComputeImageSize.scala b/warcbase-core/src/main/scala/org/warcbase/spark/matchbox/ComputeImageSize.scala new file mode 100644 index 0000000..91eb0c6 --- /dev/null +++ b/warcbase-core/src/main/scala/org/warcbase/spark/matchbox/ComputeImageSize.scala @@ -0,0 +1,25 @@ +package org.warcbase.spark.matchbox + +import java.io.ByteArrayInputStream +import javax.imageio.ImageIO + +/** + * Created by youngbinkim on 7/7/16. + */ +object ComputeImageSize { + def apply(bytes: Array[Byte]): (Int, Int) = { + val in = new ByteArrayInputStream(bytes) + + try { + val image = ImageIO.read(in) + if (image == null) + return (0, 0) + (image.getWidth(), image.getHeight()) + } catch { + case e: Throwable => { + e.printStackTrace() + return (0, 0) + } + } + } +} diff --git a/warcbase-core/src/main/scala/org/warcbase/spark/matchbox/ComputeMD5.scala b/warcbase-core/src/main/scala/org/warcbase/spark/matchbox/ComputeMD5.scala new file mode 100644 index 0000000..d2000fe --- /dev/null +++ b/warcbase-core/src/main/scala/org/warcbase/spark/matchbox/ComputeMD5.scala @@ -0,0 +1,19 @@ +package org.warcbase.spark.matchbox + +import java.security.MessageDigest + + +/** + * compute MD5 checksum.. + * + */ +object ComputeMD5 { + /** + * + * @param bytes + * @return + */ + def apply(bytes: Array[Byte]): String = { + new String(MessageDigest.getInstance("MD5").digest(bytes)) + } +} \ No newline at end of file diff --git a/warcbase-core/src/main/scala/org/warcbase/spark/matchbox/ExtractPopularImages.scala b/warcbase-core/src/main/scala/org/warcbase/spark/matchbox/ExtractPopularImages.scala new file mode 100644 index 0000000..7aaeaba --- /dev/null +++ b/warcbase-core/src/main/scala/org/warcbase/spark/matchbox/ExtractPopularImages.scala @@ -0,0 +1,26 @@ +package org.warcbase.spark.matchbox + +import org.warcbase.spark.rdd.RecordRDD._ +import org.apache.spark.rdd.RDD +import org.warcbase.spark.archive.io.ArchiveRecord + + +/** + * Extract most popular images + * + * limit: number of most popular images in the output + * timeoutVal: time allowed to connect to each image + */ +object ExtractPopularImages { + def apply(records: RDD[ArchiveRecord], limit: Int, minWidth: Int = 30, minHeight: Int = 30) = { + val res = records + .keepImages() + .map(r => ((r.getUrl, r.getImageBytes), 1)) + .map(img => (ComputeMD5(img._1._2), (ComputeImageSize(img._1._2), img._1._1, img._2))) + .filter(img => img._2._1._1 >= minWidth && img._2._1._2 >= minHeight) + .reduceByKey((image1, image2) => (image1._1, image1._2, image1._3 + image2._3)) + .takeOrdered(limit)(Ordering[Int].on(x => -x._2._3)) + res.foreach(x => println(x._2._2 + "\t" + x._2._3)) + res + } +} diff --git a/warcbase-core/src/main/scala/org/warcbase/spark/matchbox/RemoveHttpHeader.scala b/warcbase-core/src/main/scala/org/warcbase/spark/matchbox/RemoveHttpHeader.scala new file mode 100644 index 0000000..090ff77 --- /dev/null +++ b/warcbase-core/src/main/scala/org/warcbase/spark/matchbox/RemoveHttpHeader.scala @@ -0,0 +1,21 @@ +package org.warcbase.spark.matchbox + +/** + * Created by youngbinkim on 7/9/16. + */ +object RemoveHttpHeader { + val headerEnd = "\r\n\r\n" + def apply(content: String): String = { + try { + if (content.startsWith("HTTP/")) + content.substring(content.indexOf(headerEnd) + headerEnd.length) + else + content + } catch { + case e: Exception => { + println(e) + null + } + } + } +} \ No newline at end of file diff --git a/warcbase-core/src/main/scala/org/warcbase/spark/rdd/RecordRDD.scala b/warcbase-core/src/main/scala/org/warcbase/spark/rdd/RecordRDD.scala index dda3c16..2a52a80 100644 --- a/warcbase-core/src/main/scala/org/warcbase/spark/rdd/RecordRDD.scala +++ b/warcbase-core/src/main/scala/org/warcbase/spark/rdd/RecordRDD.scala @@ -57,6 +57,17 @@ object RecordRDD extends java.io.Serializable { && !r.getUrl.endsWith("robots.txt")) } + def keepImages() = { + rdd.filter(r => + r.getCrawlDate != null + && ( + (r.getMimeType != null && r.getMimeType.contains("image/")) + || r.getUrl.endsWith("jpg") + || r.getUrl.endsWith("jpeg") + || r.getUrl.endsWith("png")) + && !r.getUrl.endsWith("robots.txt")) + } + def keepMimeTypes(mimeTypes: Set[String]) = { rdd.filter(r => mimeTypes.contains(r.getMimeType)) }