Skip to content

Commit

Permalink
documentation update
Browse files Browse the repository at this point in the history
  • Loading branch information
sepastian committed Jun 14, 2022
1 parent cb8333a commit 25fe552
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 48 deletions.
30 changes: 29 additions & 1 deletion lib/warc2corpus/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,35 @@
import bs4
from datetime import datetime as dt
from aut import *
from pyspark.sql.functions import col, udf, from_json, explode, concat
from pyspark.sql.types import * #MapType, StringType, ArrayType, StructType, from_json
from pyspark import SparkContext
from pyspark.sql import SQLContext
from json import dumps

def apply(content,extractors):
def run(warc_file, config, url_regex='.*'):
"""
Main entry point for warc2corpus.
"""
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
df1 = WebArchive(sc, sqlContext, str(warc_file))
df2 = df1.all().filter(col("url").rlike(url_regex))
df3 = df2.withColumn("extract", extract(config)(remove_http_header(col("raw_content"))))
df4 = df3.select(df3.url, from_json('extract','ARRAY<MAP<STRING,STRING>>').alias('extract'))
return df4

# https://stackoverflow.com/a/37428126/92049
def extract(config):
"""
Generate a Spark UDF.
This encloses warc2corpus' apply function to make
the configuration available to Spark during processing.
"""
return udf(lambda html: dumps(apply(html,config)))

def apply(content, extractors):
"""
Apply extractors to content.
"""
Expand Down
80 changes: 33 additions & 47 deletions sample/up_pressemeldungen.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,39 @@
# docker run --rm -it --name aut -v $(pwd):/w2c --workdir=/w2c -e "PYTHONPATH=/w2c/lib" sepastian/aut:latest /spark/bin/pyspark --py-files /aut/target/aut.zip --jars /aut/target/aut-0.70.1-SNAPSHOT-fatjar.jar

from aut import *
from pyspark.sql.functions import col, udf, from_json, explode, concat
from pyspark.sql.types import * #MapType, StringType, ArrayType, StructType, from_json
from pyspark import SparkContext
from pyspark.sql import SQLContext
import warc2corpus as w2c
import re
import dateparser as dp
import json
import dateparser as dp
from datetime import datetime as dt
from pathlib import Path
from warc2corpus import run

sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

warcfile = "uni_passau_de_pressemeldungen_20220216T125953.warc"
df1 = WebArchive(sc, sqlContext, './data/uni_passau_de_pressemeldungen_20220216T125953.warc.gz')
#df2 = df1.webpages().filter(col("url").rlike(".+/pressemeldungen/meldung/detail/[a-z]+"))
df2 = df1.all().filter(col("url").rlike(".+/pressemeldungen/meldung/detail/[a-z]+"))
# Find the WARC archive file to process.
#
# When running the sample-spark make target,
# the data/ directory of this repository
# will be mounted at /w2c/data inside the container.
base = Path(__file__).parent.parent
data_dir = base.joinpath('data')
warc_file = data_dir.joinpath("uni_passau_de_pressemeldungen_20220216T125953.warc")

# https://stackoverflow.com/a/37428126/92049
def extract(config):
#return udf(lambda html: json.dumps(w2c.apply(html,config)),ArrayType(MapType(StringType(),StringType())))
#return udf(lambda html: json.dumps(w2c.apply(html,config)))
#schema = MapType(StringType(),StructType(),False)
#schema.add('meta',StringType(),False)
return udf(lambda html: json.dumps(w2c.apply(html,config)))
# Supply a regex selecting pages to process by URL.
url_regex = ".+/pressemeldungen/meldung/detail/[a-z]+"

# Configure warc2corpus.
#
# The following configuration will be applied to each web page selected above.
# Pages will be processed one by one.
#
# During processing, a result in JSON format will be generated.
#
# The 'meta' information will be copied to the JSON result as-is.
# It contains arbitrary information as key-value pairs.
#
# Each 'exctract' defines a piece of information to extract from the page
# currently processed. An exctract defines:
#
# * the 'name' under wich to store the information extracted;
# * the 'css_path' at which to find the information within the web page;
# * an optional function 'f' to transform the data extracted
config= [
{
'meta': {
Expand Down Expand Up @@ -61,31 +67,11 @@ def extract(config):
}
]

df3 = df2.withColumn("extract", extract(config)(remove_http_header(col("raw_content"))))

for r in df3.select("url","extract").collect():
#j = json.loads(r.extract)
print(r.extract)
print(type(r.extract))
#data.append(j[c] for doc in j for c in columns)

#df3.select("url","extract").write.json("data/test")
# Run warc2corpus, supplying the URL pattern and configuration defined above.
# This will return a (Py)Spark data frame.
df = run(warc_file, config, url_regex=url_regex)

df4 = df3.select(df3.url, from_json('extract','ARRAY<MAP<STRING,STRING>>').alias('extract'))

#df5 = df4.select(df4.url, explode(df4.json).alias('data'))
df5 = df4

for r in df5.collect():
print(r.extract)
print(type(r.extract))
#df4.select("url","json").write.json("data/test")

#df5 = df4.select(df4.json.meta.alias('meta'))
#df5.show(vertical=True)

base = Path(__file__).parent.parent
data_dir = base.joinpath('data')
# Write the results to a file in JSON format.
out_dir = data_dir.joinpath('sample_run_{}'.format(dt.now().strftime('%Y%m%dT%H%M%S')))
print(f'Writting results to #{out_dir}.')
df5.write.json(str(out_dir))
df.write.json(str(out_dir))

0 comments on commit 25fe552

Please sign in to comment.