diff --git a/src/main/scala/io/archivesunleashed/DataFrameLoader.scala b/src/main/scala/io/archivesunleashed/DataFrameLoader.scala index 95f2cc2e..22a45dcf 100644 --- a/src/main/scala/io/archivesunleashed/DataFrameLoader.scala +++ b/src/main/scala/io/archivesunleashed/DataFrameLoader.scala @@ -13,4 +13,10 @@ class DataFrameLoader(sc: SparkContext) { RecordLoader.loadArchives(path, sc) .extractHyperlinksDF() } + + /* Create a dataframe with (source page, image url) pairs */ + def extractImageLinks(path: String): DataFrame = { + RecordLoader.loadArchives(path, sc) + .extractImageLinksDF() + } } diff --git a/src/main/scala/io/archivesunleashed/package.scala b/src/main/scala/io/archivesunleashed/package.scala index 074bda2b..64a70dc5 100644 --- a/src/main/scala/io/archivesunleashed/package.scala +++ b/src/main/scala/io/archivesunleashed/package.scala @@ -19,7 +19,7 @@ package io import io.archivesunleashed.data.{ArchiveRecordWritable, ArchiveRecordInputFormat} import ArchiveRecordWritable.ArchiveFormat -import io.archivesunleashed.matchbox.{DetectLanguage, ExtractDate, ExtractLinks, ExtractDomain, RemoveHTML} +import io.archivesunleashed.matchbox.{DetectLanguage, ExtractDate, ExtractLinks, ExtractImageLinks, ExtractDomain, RemoveHTML} import io.archivesunleashed.matchbox.ExtractDate.DateComponent import io.archivesunleashed.matchbox.ExtractDate.DateComponent._ @@ -120,6 +120,25 @@ package object archivesunleashed { sqlContext.getOrCreate().createDataFrame(records, schema) } + /* Extracts all the images from a source page */ + def extractImageLinksDF(): DataFrame = { + val records = rdd + .keepValidPages() + .flatMap(r => { + val src = r.getUrl + val imageUrls = ExtractImageLinks(src, r.getContentString) + imageUrls.map(url => (src, url)) + }) + .map(t => Row(t._1, t._2)) + + val schema = new StructType() + .add(StructField("Src", StringType, true)) + .add(StructField("ImageUrl", StringType, true)) + + val sqlContext = SparkSession.builder(); + sqlContext.getOrCreate().createDataFrame(records, schema) + } + /** Removes all data except images. */ def keepImages() = { rdd.filter(r => diff --git a/src/test/scala/io/archivesunleashed/df/ExtractImageLinksTest.scala b/src/test/scala/io/archivesunleashed/df/ExtractImageLinksTest.scala new file mode 100644 index 00000000..7be171f3 --- /dev/null +++ b/src/test/scala/io/archivesunleashed/df/ExtractImageLinksTest.scala @@ -0,0 +1,65 @@ +/* + * Archives Unleashed Toolkit (AUT): + * An open-source platform for analyzing web archives. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.archivesunleashed + +import com.google.common.io.Resources +import io.archivesunleashed.df._ +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions._ +import org.apache.spark.{SparkConf, SparkContext} +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest.{BeforeAndAfter, FunSuite} + +@RunWith(classOf[JUnitRunner]) +class ExtractImageLinksTest extends FunSuite with BeforeAndAfter { + private val arcPath = Resources.getResource("arc/example.arc.gz").getPath + private val master = "local[4]" + private val appName = "example-df" + private var sc: SparkContext = _ + + before { + val conf = new SparkConf() + .setMaster(master) + .setAppName(appName) + sc = new SparkContext(conf) + } + + test("Fetch image links") { + val df = RecordLoader.loadArchives(arcPath, sc) + .extractImageLinksDF() + + // We need this in order to use the $-notation + val spark = SparkSession.builder().master("local").getOrCreate() + import spark.implicits._ + + val extracted = df.select($"Src".as("Domain"), $"ImageUrl".as("Image")) + .orderBy(desc("Image")).head(2).toList + assert(extracted.size == 2) + assert("http://www.archive.org/index.php" == extracted(0)(0)) + assert("http://www.archive.org/services/get-item-image.php?identifier=zh27814&collection=zh27&mediatype=audio" == extracted(0)(1)) + assert("http://www.archive.org/index.php" == extracted(1)(0)) + assert("http://www.archive.org/services/get-item-image.php?identifier=secretarmiesb00spivrich&collection=americana&mediatype=texts" == extracted(1)(1)) + } + + after { + if (sc != null) { + sc.stop() + } + } +}