diff --git a/lib/warc2corpus/__init__.py b/lib/warc2corpus/__init__.py index ab9a920..49fbf9c 100644 --- a/lib/warc2corpus/__init__.py +++ b/lib/warc2corpus/__init__.py @@ -1,7 +1,35 @@ import bs4 from datetime import datetime as dt +from aut import * +from pyspark.sql.functions import col, udf, from_json, explode, concat +from pyspark.sql.types import * #MapType, StringType, ArrayType, StructType, from_json +from pyspark import SparkContext +from pyspark.sql import SQLContext +from json import dumps -def apply(content,extractors): +def run(warc_file, config, url_regex='.*'): + """ + Main entry point for warc2corpus. + """ + sc = SparkContext.getOrCreate() + sqlContext = SQLContext(sc) + df1 = WebArchive(sc, sqlContext, str(warc_file)) + df2 = df1.all().filter(col("url").rlike(url_regex)) + df3 = df2.withColumn("extract", extract(config)(remove_http_header(col("raw_content")))) + df4 = df3.select(df3.url, from_json('extract','ARRAY>').alias('extract')) + return df4 + +# https://stackoverflow.com/a/37428126/92049 +def extract(config): + """ + Generate a Spark UDF. + + This encloses warc2corpus' apply function to make + the configuration available to Spark during processing. + """ + return udf(lambda html: dumps(apply(html,config))) + +def apply(content, extractors): """ Apply extractors to content. """ diff --git a/sample/up_pressemeldungen.py b/sample/up_pressemeldungen.py index 4159b74..149a7b4 100644 --- a/sample/up_pressemeldungen.py +++ b/sample/up_pressemeldungen.py @@ -1,33 +1,39 @@ # docker run --rm -it --name aut -v $(pwd):/w2c --workdir=/w2c -e "PYTHONPATH=/w2c/lib" sepastian/aut:latest /spark/bin/pyspark --py-files /aut/target/aut.zip --jars /aut/target/aut-0.70.1-SNAPSHOT-fatjar.jar -from aut import * -from pyspark.sql.functions import col, udf, from_json, explode, concat -from pyspark.sql.types import * #MapType, StringType, ArrayType, StructType, from_json -from pyspark import SparkContext -from pyspark.sql import SQLContext -import warc2corpus as w2c -import re -import dateparser as dp import json +import dateparser as dp from datetime import datetime as dt from pathlib import Path +from warc2corpus import run -sc = SparkContext.getOrCreate() -sqlContext = SQLContext(sc) - -warcfile = "uni_passau_de_pressemeldungen_20220216T125953.warc" -df1 = WebArchive(sc, sqlContext, './data/uni_passau_de_pressemeldungen_20220216T125953.warc.gz') -#df2 = df1.webpages().filter(col("url").rlike(".+/pressemeldungen/meldung/detail/[a-z]+")) -df2 = df1.all().filter(col("url").rlike(".+/pressemeldungen/meldung/detail/[a-z]+")) +# Find the WARC archive file to process. +# +# When running the sample-spark make target, +# the data/ directory of this repository +# will be mounted at /w2c/data inside the container. +base = Path(__file__).parent.parent +data_dir = base.joinpath('data') +warc_file = data_dir.joinpath("uni_passau_de_pressemeldungen_20220216T125953.warc") -# https://stackoverflow.com/a/37428126/92049 -def extract(config): - #return udf(lambda html: json.dumps(w2c.apply(html,config)),ArrayType(MapType(StringType(),StringType()))) - #return udf(lambda html: json.dumps(w2c.apply(html,config))) - #schema = MapType(StringType(),StructType(),False) - #schema.add('meta',StringType(),False) - return udf(lambda html: json.dumps(w2c.apply(html,config))) +# Supply a regex selecting pages to process by URL. +url_regex = ".+/pressemeldungen/meldung/detail/[a-z]+" +# Configure warc2corpus. +# +# The following configuration will be applied to each web page selected above. +# Pages will be processed one by one. +# +# During processing, a result in JSON format will be generated. +# +# The 'meta' information will be copied to the JSON result as-is. +# It contains arbitrary information as key-value pairs. +# +# Each 'exctract' defines a piece of information to extract from the page +# currently processed. An exctract defines: +# +# * the 'name' under wich to store the information extracted; +# * the 'css_path' at which to find the information within the web page; +# * an optional function 'f' to transform the data extracted config= [ { 'meta': { @@ -61,31 +67,11 @@ def extract(config): } ] -df3 = df2.withColumn("extract", extract(config)(remove_http_header(col("raw_content")))) - -for r in df3.select("url","extract").collect(): - #j = json.loads(r.extract) - print(r.extract) - print(type(r.extract)) - #data.append(j[c] for doc in j for c in columns) - -#df3.select("url","extract").write.json("data/test") +# Run warc2corpus, supplying the URL pattern and configuration defined above. +# This will return a (Py)Spark data frame. +df = run(warc_file, config, url_regex=url_regex) -df4 = df3.select(df3.url, from_json('extract','ARRAY>').alias('extract')) - -#df5 = df4.select(df4.url, explode(df4.json).alias('data')) -df5 = df4 - -for r in df5.collect(): - print(r.extract) - print(type(r.extract)) -#df4.select("url","json").write.json("data/test") - -#df5 = df4.select(df4.json.meta.alias('meta')) -#df5.show(vertical=True) - -base = Path(__file__).parent.parent -data_dir = base.joinpath('data') +# Write the results to a file in JSON format. out_dir = data_dir.joinpath('sample_run_{}'.format(dt.now().strftime('%Y%m%dT%H%M%S'))) print(f'Writting results to #{out_dir}.') -df5.write.json(str(out_dir)) +df.write.json(str(out_dir))