diff --git a/06_data_preparation.md b/06_data_preparation.md index fdf795f..b4ae38a 100644 --- a/06_data_preparation.md +++ b/06_data_preparation.md @@ -190,7 +190,7 @@ model.transform(df).schema[-1].metadata ## 'num_attrs': 2}} ``` -As for now PySpark doesn't support attaching metadata to a single column. It is possible though to use method similar to this one: +As for now PySpark doesn't support attaching metadata to a single column. It is possible though, to use method similar to this one: ```python import json @@ -220,4 +220,83 @@ df_with_meta.schema[-1].metadata == meta ### Setting custom column metadata +Arguably the true power of metadata shows itself when used outside restricted ML environment. It is possible to attach an arbitrary JSON document to each column using it to provenance tracking, storing diagnostic information or performing different data enrichment tasks. +[`Metadata`](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.types.Metadata) object created from JSON string: + + +```scala +import org.apache.spark.sql.types.Metadata + + +Metadata.fromJson("""{"foo": "bar"}""") + +// org.apache.spark.sql.types.Metadata = {"foo":"bar"} +``` +or constructed using `MetadataBuilder`: + +```scala +import org.apache.spark.sql.types.MetadataBuilder + +new MetadataBuilder().putString("foo", "bar").build + +// org.apache.spark.sql.types.Metadata = {"foo":"bar"} +``` + +Moreover it can attached to Parquet files and loaded back later: + +```scala +Seq((1L, "foo"), (2L, "bar")) + .toDF("id", "txt") + .withColumn("id", $"id".as("", Metadata.fromJson("""{"foo": "bar"}"""))) + .write.parquet("/tmp/foo") + + +spark.read.parquet("/tmp/foo").schema.headOption.map(_.metadata) + +// Option[org.apache.spark.sql.types.Metadata] = Some({"foo":"bar"}) +``` + +#### Accessing Metadata Directly + +Metadata can be also accessed directly using Parquet tools: + +```scala +import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsScalaMapConverter} + +import org.apache.parquet.hadoop.ParquetFileReader +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.conf.Configuration + + +val conf = spark.sparkContext.hadoopConfiguration + +def getFooters(conf: Configuration, path: String) = { + val fs = FileSystem.get(conf) + val footers = ParquetFileReader.readAllFootersInParallel(conf, fs.getFileStatus(new Path(path))) + footers +} + +def getFileMetadata(conf: Configuration, path: String) = { + getFooters(conf, path) + .asScala.map(_.getParquetMetadata.getFileMetaData.getKeyValueMetaData.asScala) +} + +getFileMetadata(conf, "/tmp/foo").headOption + +// Option[scala.collection.mutable.Map[String,String]] = +// Some(Map(org.apache.spark.sql.parquet.row.metadata -> +// {"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{"foo":"bar"}} +// {"name":"txt","type":"string","nullable":true,"metadata":{}}]})) +``` + +We can also use extracted footers to write standalone metadata file when needed: + +```scala +import org.apache.parquet.hadoop.ParquetFileWriter + +def createMetadata(conf: Configuration, path: String) = { + val footers = getFooters(conf, path) + ParquetFileWriter.writeMetadataFile(conf, new Path(path), footers) +} +```