From e1bb80bf4988c406d449ce7699f837bc346424e1 Mon Sep 17 00:00:00 2001 From: zero323 Date: Sun, 31 Jul 2016 21:55:04 +0200 Subject: [PATCH 1/3] Add metadata write-read example --- 06_data_preparation.md | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/06_data_preparation.md b/06_data_preparation.md index fdf795f..b7e1850 100644 --- a/06_data_preparation.md +++ b/06_data_preparation.md @@ -220,4 +220,40 @@ df_with_meta.schema[-1].metadata == meta ### Setting custom column metadata +Arguably the true power of metadata shows itself when used outside restricted ML environment. It is possible to attach an arbitrary JSON document to each column using it to provenance tracking, storing diagnostic information or performing different data enrichment tasks. +[`Metadata`](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.types.Metadata) object created from JSON string: + + +```scala +import org.apache.spark.sql.types.Metadata + + +Metadata.fromJson("""{"foo": "bar"}""") + +// org.apache.spark.sql.types.Metadata = {"foo":"bar"} +``` +or constructed using `MetadataBuilder`: + +```scala +import org.apache.spark.sql.types.MetadataBuilder + +new MetadataBuilder().putString("foo", "bar").build + +// org.apache.spark.sql.types.Metadata = {"foo":"bar"} +``` + +Moreover it can attached to Parquet files and loaded back later: + + +```scala +Seq((1L, "foo"), (2L, "bar")) + .toDF("id", "txt") + .withColumn("id", $"id".as("", Metadata.fromJson("""{"foo": "bar"}"""))) + .write.parquet("/tmp/foo") + + +spark.read.parquet("/tmp/foo").schema.headOption.map(_.metadata) + +// Option[org.apache.spark.sql.types.Metadata] = Some({"foo":"bar"}) +``` From b803e6be195225dc9f94215ff84d7e18989abbe8 Mon Sep 17 00:00:00 2001 From: zero323 Date: Wed, 3 Aug 2016 21:11:13 +0200 Subject: [PATCH 2/3] Add examples of reading and writing metadata --- 06_data_preparation.md | 43 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 42 insertions(+), 1 deletion(-) diff --git a/06_data_preparation.md b/06_data_preparation.md index b7e1850..296104f 100644 --- a/06_data_preparation.md +++ b/06_data_preparation.md @@ -245,7 +245,6 @@ new MetadataBuilder().putString("foo", "bar").build Moreover it can attached to Parquet files and loaded back later: - ```scala Seq((1L, "foo"), (2L, "bar")) .toDF("id", "txt") @@ -257,3 +256,45 @@ spark.read.parquet("/tmp/foo").schema.headOption.map(_.metadata) // Option[org.apache.spark.sql.types.Metadata] = Some({"foo":"bar"}) ``` + +Metadata can be also accessed directly using Parquet tools: + +```scala +import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsScalaMapConverter} + +import org.apache.parquet.hadoop.ParquetFileReader +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.conf.Configuration + + +val conf = spark.sparkContext.hadoopConfiguration + +def getFooters(conf: Configuration, path: String) = { + val fs = FileSystem.get(conf) + val footers = ParquetFileReader.readAllFootersInParallel(conf, fs.getFileStatus(new Path(path))) + footers +} + +def getFileMetadata(conf: Configuration, path: String) = { + getFooters(conf, path) + .asScala.map(_.getParquetMetadata.getFileMetaData.getKeyValueMetaData.asScala) +} + +getFileMetadata(conf, "/tmp/foo").headOption + +// Option[scala.collection.mutable.Map[String,String]] = +// Some(Map(org.apache.spark.sql.parquet.row.metadata -> +// {"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{"foo":"bar"}} +// {"name":"txt","type":"string","nullable":true,"metadata":{}}]})) +``` + +We can also use extracted footers to write standalone metadata file when needed: + +```scala +import org.apache.parquet.hadoop.ParquetFileWriter + +def createMetadata(conf: Configuration, path: String) = { + val footers = getFooters(conf, path) + ParquetFileWriter.writeMetadataFile(conf, new Path(path), footers) +} +``` From b304bf9e5e63f65b1e10a8ea8996fb2ac4a3b851 Mon Sep 17 00:00:00 2001 From: zero323 Date: Wed, 3 Aug 2016 22:06:19 +0200 Subject: [PATCH 3/3] Add separate section for Parquet tools --- 06_data_preparation.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/06_data_preparation.md b/06_data_preparation.md index 296104f..b4ae38a 100644 --- a/06_data_preparation.md +++ b/06_data_preparation.md @@ -190,7 +190,7 @@ model.transform(df).schema[-1].metadata ## 'num_attrs': 2}} ``` -As for now PySpark doesn't support attaching metadata to a single column. It is possible though to use method similar to this one: +As for now PySpark doesn't support attaching metadata to a single column. It is possible though, to use method similar to this one: ```python import json @@ -257,6 +257,8 @@ spark.read.parquet("/tmp/foo").schema.headOption.map(_.metadata) // Option[org.apache.spark.sql.types.Metadata] = Some({"foo":"bar"}) ``` +#### Accessing Metadata Directly + Metadata can be also accessed directly using Parquet tools: ```scala