Skip to content
This repository has been archived by the owner on Jun 8, 2019. It is now read-only.

Commit

Permalink
Merge pull request #32 from awesome-spark/06-dataframe-metadata-part3
Browse files Browse the repository at this point in the history
06 dataframe metadata part3
  • Loading branch information
eliasah authored Aug 4, 2016
2 parents 20ed441 + b304bf9 commit d87a7d1
Showing 1 changed file with 80 additions and 1 deletion.
81 changes: 80 additions & 1 deletion 06_data_preparation.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ model.transform(df).schema[-1].metadata
## 'num_attrs': 2}}
```

As for now PySpark doesn't support attaching metadata to a single column. It is possible though to use method similar to this one:
As for now PySpark doesn't support attaching metadata to a single column. It is possible though, to use method similar to this one:

```python
import json
Expand Down Expand Up @@ -220,4 +220,83 @@ df_with_meta.schema[-1].metadata == meta

### Setting custom column metadata

Arguably the true power of metadata shows itself when used outside restricted ML environment. It is possible to attach an arbitrary JSON document to each column using it to provenance tracking, storing diagnostic information or performing different data enrichment tasks.

[`Metadata`](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.types.Metadata) object created from JSON string:


```scala
import org.apache.spark.sql.types.Metadata


Metadata.fromJson("""{"foo": "bar"}""")

// org.apache.spark.sql.types.Metadata = {"foo":"bar"}
```
or constructed using `MetadataBuilder`:

```scala
import org.apache.spark.sql.types.MetadataBuilder

new MetadataBuilder().putString("foo", "bar").build

// org.apache.spark.sql.types.Metadata = {"foo":"bar"}
```

Moreover it can attached to Parquet files and loaded back later:

```scala
Seq((1L, "foo"), (2L, "bar"))
.toDF("id", "txt")
.withColumn("id", $"id".as("", Metadata.fromJson("""{"foo": "bar"}""")))
.write.parquet("/tmp/foo")


spark.read.parquet("/tmp/foo").schema.headOption.map(_.metadata)

// Option[org.apache.spark.sql.types.Metadata] = Some({"foo":"bar"})
```

#### Accessing Metadata Directly

Metadata can be also accessed directly using Parquet tools:

```scala
import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsScalaMapConverter}

import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration


val conf = spark.sparkContext.hadoopConfiguration

def getFooters(conf: Configuration, path: String) = {
val fs = FileSystem.get(conf)
val footers = ParquetFileReader.readAllFootersInParallel(conf, fs.getFileStatus(new Path(path)))
footers
}

def getFileMetadata(conf: Configuration, path: String) = {
getFooters(conf, path)
.asScala.map(_.getParquetMetadata.getFileMetaData.getKeyValueMetaData.asScala)
}

getFileMetadata(conf, "/tmp/foo").headOption

// Option[scala.collection.mutable.Map[String,String]] =
// Some(Map(org.apache.spark.sql.parquet.row.metadata ->
// {"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{"foo":"bar"}}
// {"name":"txt","type":"string","nullable":true,"metadata":{}}]}))
```

We can also use extracted footers to write standalone metadata file when needed:

```scala
import org.apache.parquet.hadoop.ParquetFileWriter

def createMetadata(conf: Configuration, path: String) = {
val footers = getFooters(conf, path)
ParquetFileWriter.writeMetadataFile(conf, new Path(path), footers)
}
```

0 comments on commit d87a7d1

Please sign in to comment.