Skip to content

Commit

Permalink
Merge pull request #75 from planetlabs/compression
Browse files Browse the repository at this point in the history
Report on compression in describe output
  • Loading branch information
tschaub authored Sep 24, 2023
2 parents 6ab960f + c7305f2 commit b1560ba
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 141 deletions.
8 changes: 3 additions & 5 deletions cmd/gpq/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,9 @@ func (c *ConvertCmd) Run() error {
return geojson.FromParquet(input, output)
}

var convertOptions *geoparquet.ConvertOptions
if c.InputPrimaryColumn != geoparquet.DefaultGeometryColumn {
convertOptions = &geoparquet.ConvertOptions{
InputPrimaryColumn: c.InputPrimaryColumn,
}
convertOptions := &geoparquet.ConvertOptions{
InputPrimaryColumn: c.InputPrimaryColumn,
Compression: c.Compression,
}

return geoparquet.FromParquet(input, output, convertOptions)
Expand Down
54 changes: 39 additions & 15 deletions cmd/gpq/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (
ColType = "Type"
ColAnnotation = "Annotation"
ColRepetition = "Repetition"
ColCompression = "Compression"
ColEncoding = "Encoding"
ColGeometryTypes = "Geometry Types"
ColBounds = "Bounds"
Expand Down Expand Up @@ -69,7 +70,7 @@ func (c *DescribeCmd) Run() error {
}

info := &Info{
Schema: buildSchema("", fileMetadata.Schema.Root()),
Schema: buildSchema(fileReader, "", fileMetadata.Schema.Root()),
Metadata: metadata,
NumRows: fileMetadata.NumRows,
}
Expand All @@ -83,7 +84,7 @@ func (c *DescribeCmd) Run() error {
func (c *DescribeCmd) formatText(info *Info) error {
metadata := info.Metadata

header := table.Row{ColName, ColType, ColAnnotation, ColRepetition}
header := table.Row{ColName, ColType, ColAnnotation, ColRepetition, ColCompression}
columnConfigs := []table.ColumnConfig{}
if metadata != nil {
header = append(header, ColEncoding, ColGeometryTypes, ColBounds, ColDetail)
Expand Down Expand Up @@ -121,7 +122,7 @@ func (c *DescribeCmd) formatText(info *Info) error {
} else if field.Optional {
repetition = "0..1"
}
row := table.Row{name, field.Type, field.Annotation, repetition}
row := table.Row{name, field.Type, field.Annotation, repetition, field.Compression}
if metadata != nil {
geoColumn, ok := metadata.Columns[field.Name]
if !ok {
Expand Down Expand Up @@ -199,19 +200,41 @@ type Info struct {
}

type Schema struct {
Name string `json:"name,omitempty"`
Optional bool `json:"optional,omitempty"`
Repeated bool `json:"repeated,omitempty"`
Type string `json:"type,omitempty"`
Annotation string `json:"annotation,omitempty"`
Fields []*Schema `json:"fields,omitempty"`
Name string `json:"name,omitempty"`
Optional bool `json:"optional,omitempty"`
Repeated bool `json:"repeated,omitempty"`
Type string `json:"type,omitempty"`
Annotation string `json:"annotation,omitempty"`
Compression string `json:"compression,omitempty"`
Fields []*Schema `json:"fields,omitempty"`
}

func buildSchema(name string, node schema.Node) *Schema {
func getCompression(fileReader *file.Reader, node schema.Node) string {
if _, ok := node.(*schema.GroupNode); ok {
return ""
}
if fileReader.NumRowGroups() == 0 {
return "unknown"
}
rowGroupReader := fileReader.RowGroup(0)
colIndex := fileReader.MetaData().Schema.ColumnIndexByName(node.Path())
if colIndex < 0 {
return "unknown"
}
col, err := rowGroupReader.MetaData().ColumnChunk(colIndex)
if err != nil {
return "unknown"
}
return strings.ToLower(col.Compression().String())
}

func buildSchema(fileReader *file.Reader, name string, node schema.Node) *Schema {
annotation := ""
logicalType := node.LogicalType()
if !logicalType.IsNone() {
annotation = strings.ToLower(logicalType.String())
} else if _, isGroup := node.(*schema.GroupNode); isGroup {
annotation = "group"
}

repetition := node.RepetitionType()
Expand All @@ -224,10 +247,11 @@ func buildSchema(name string, node schema.Node) *Schema {
}

field := &Schema{
Name: name,
Optional: optional,
Repeated: repeated,
Annotation: annotation,
Name: name,
Optional: optional,
Repeated: repeated,
Annotation: annotation,
Compression: getCompression(fileReader, node),
}

if leaf, ok := node.(*schema.PrimitiveNode); ok {
Expand Down Expand Up @@ -259,7 +283,7 @@ func buildSchema(name string, node schema.Node) *Schema {
field.Fields = make([]*Schema, count)
for i := 0; i < count; i += 1 {
groupField := group.Field(i)
field.Fields[i] = buildSchema(groupField.Name(), groupField)
field.Fields[i] = buildSchema(fileReader, groupField.Name(), groupField)
}
}
return field
Expand Down
174 changes: 90 additions & 84 deletions internal/geoparquet/geoparquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,29 @@ type ConvertOptions struct {
Compression string
}

func FromParquet(input parquet.ReaderAtSeeker, output io.Writer, convertOptions *ConvertOptions) error {
if convertOptions == nil {
convertOptions = &ConvertOptions{}
func getMetadata(fileReader *file.Reader, convertOptions *ConvertOptions) *Metadata {
metadata, err := GetMetadata(fileReader.MetaData().KeyValueMetadata())
if err != nil {
primaryColumn := DefaultGeometryColumn
if convertOptions.InputPrimaryColumn != "" {
primaryColumn = convertOptions.InputPrimaryColumn
}
metadata = &Metadata{
PrimaryColumn: primaryColumn,
Columns: map[string]*GeometryColumn{
primaryColumn: getDefaultGeometryColumn(),
},
}
}

primaryColumn := DefaultGeometryColumn
if convertOptions.InputPrimaryColumn != "" {
primaryColumn = convertOptions.InputPrimaryColumn
if convertOptions.InputPrimaryColumn != "" && metadata.PrimaryColumn != convertOptions.InputPrimaryColumn {
metadata.PrimaryColumn = convertOptions.InputPrimaryColumn
}
return metadata
}

metadata := &Metadata{
PrimaryColumn: primaryColumn,
Columns: map[string]*GeometryColumn{
primaryColumn: getDefaultGeometryColumn(),
},
func FromParquet(input parquet.ReaderAtSeeker, output io.Writer, convertOptions *ConvertOptions) error {
if convertOptions == nil {
convertOptions = &ConvertOptions{}
}

var compression *compress.Compression
Expand All @@ -49,95 +57,93 @@ func FromParquet(input parquet.ReaderAtSeeker, output io.Writer, convertOptions
compression = &c
}

parquetSchema, schemaErr := pqutil.GetParquetSchema(input)
if schemaErr != nil {
return fmt.Errorf("trouble getting parquet schema: %w", schemaErr)
}

datasetInfo := geo.NewDatasetInfo(true)
for fieldNum := 0; fieldNum < parquetSchema.Root().NumFields(); fieldNum += 1 {
field := parquetSchema.Root().Field(fieldNum)
name := field.Name()
if _, ok := metadata.Columns[name]; !ok {
continue
transformSchema := func(fileReader *file.Reader) (*schema.Schema, error) {
inputSchema := fileReader.MetaData().Schema
metadata := getMetadata(fileReader, convertOptions)
for fieldNum := 0; fieldNum < inputSchema.Root().NumFields(); fieldNum += 1 {
field := inputSchema.Root().Field(fieldNum)
name := field.Name()
if _, ok := metadata.Columns[name]; !ok {
continue
}
if field.LogicalType() == pqutil.ParquetStringType {
datasetInfo.AddCollection(name)
}
}
if field.LogicalType() == pqutil.ParquetStringType {
datasetInfo.AddCollection(name)

if datasetInfo.NumCollections() == 0 {
return inputSchema, nil
}
}

var transformSchema pqutil.SchemaTransformer
var transformColumn pqutil.ColumnTransformer
if datasetInfo.NumCollections() > 0 {
transformSchema = func(inputSchema *schema.Schema) (*schema.Schema, error) {
inputRoot := inputSchema.Root()
numFields := inputRoot.NumFields()

fields := make([]schema.Node, numFields)
for fieldNum := 0; fieldNum < numFields; fieldNum += 1 {
inputField := inputRoot.Field(fieldNum)
if !datasetInfo.HasCollection(inputField.Name()) {
fields[fieldNum] = inputField
continue
}
outputField, err := schema.NewPrimitiveNode(inputField.Name(), inputField.RepetitionType(), parquet.Types.ByteArray, -1, -1)
if err != nil {
return nil, err
}
fields[fieldNum] = outputField
}
inputRoot := inputSchema.Root()
numFields := inputRoot.NumFields()

outputRoot, err := schema.NewGroupNode(inputRoot.Name(), inputRoot.RepetitionType(), fields, -1)
fields := make([]schema.Node, numFields)
for fieldNum := 0; fieldNum < numFields; fieldNum += 1 {
inputField := inputRoot.Field(fieldNum)
if !datasetInfo.HasCollection(inputField.Name()) {
fields[fieldNum] = inputField
continue
}
outputField, err := schema.NewPrimitiveNode(inputField.Name(), inputField.RepetitionType(), parquet.Types.ByteArray, -1, -1)
if err != nil {
return nil, err
}
return schema.NewSchema(outputRoot), nil
fields[fieldNum] = outputField
}

transformColumn = func(inputField *arrow.Field, outputField *arrow.Field, chunked *arrow.Chunked) (*arrow.Chunked, error) {
if !datasetInfo.HasCollection(inputField.Name) {
return chunked, nil
outputRoot, err := schema.NewGroupNode(inputRoot.Name(), inputRoot.RepetitionType(), fields, -1)
if err != nil {
return nil, err
}
return schema.NewSchema(outputRoot), nil
}

transformColumn := func(inputField *arrow.Field, outputField *arrow.Field, chunked *arrow.Chunked) (*arrow.Chunked, error) {
if !datasetInfo.HasCollection(inputField.Name) {
return chunked, nil
}
chunks := chunked.Chunks()
transformed := make([]arrow.Array, len(chunks))
builder := array.NewBinaryBuilder(memory.DefaultAllocator, arrow.BinaryTypes.Binary)
defer builder.Release()

collectionInfo := geo.NewCollectionInfo(false)
for i, arr := range chunks {
stringArray, ok := arr.(*array.String)
if !ok {
return nil, fmt.Errorf("expected a string array for %q, got %v", inputField.Name, arr)
}
chunks := chunked.Chunks()
transformed := make([]arrow.Array, len(chunks))
builder := array.NewBinaryBuilder(memory.DefaultAllocator, arrow.BinaryTypes.Binary)
defer builder.Release()

collectionInfo := geo.NewCollectionInfo(false)
for i, arr := range chunks {
stringArray, ok := arr.(*array.String)
if !ok {
return nil, fmt.Errorf("expected a string array for %q, got %v", inputField.Name, arr)
for rowNum := 0; rowNum < stringArray.Len(); rowNum += 1 {
if outputField.Nullable && stringArray.IsNull(rowNum) {
builder.AppendNull()
continue
}
str := stringArray.Value(rowNum)
geometry, wktErr := wkt.Unmarshal(str)
if wktErr != nil {
return nil, wktErr
}
for rowNum := 0; rowNum < stringArray.Len(); rowNum += 1 {
if outputField.Nullable && stringArray.IsNull(rowNum) {
builder.AppendNull()
continue
}
str := stringArray.Value(rowNum)
geometry, wktErr := wkt.Unmarshal(str)
if wktErr != nil {
return nil, wktErr
}
value, wkbErr := wkb.Marshal(geometry)
if wkbErr != nil {
return nil, wkbErr
}
collectionInfo.AddType(geometry.GeoJSONType())
bounds := geometry.Bound()
collectionInfo.AddBounds(&bounds)
builder.Append(value)
value, wkbErr := wkb.Marshal(geometry)
if wkbErr != nil {
return nil, wkbErr
}
transformed[i] = builder.NewArray()
collectionInfo.AddType(geometry.GeoJSONType())
bounds := geometry.Bound()
collectionInfo.AddBounds(&bounds)
builder.Append(value)
}
datasetInfo.AddBounds(inputField.Name, collectionInfo.Bounds())
datasetInfo.AddTypes(inputField.Name, collectionInfo.Types())
chunked.Release()
return arrow.NewChunked(builder.Type(), transformed), nil
transformed[i] = builder.NewArray()
}
datasetInfo.AddBounds(inputField.Name, collectionInfo.Bounds())
datasetInfo.AddTypes(inputField.Name, collectionInfo.Types())
chunked.Release()
return arrow.NewChunked(builder.Type(), transformed), nil
}

beforeClose := func(fileWriter *file.Writer) error {
beforeClose := func(fileReader *file.Reader, fileWriter *file.Writer) error {
metadata := getMetadata(fileReader, convertOptions)
for name, geometryCol := range metadata.Columns {
if !datasetInfo.HasCollection(name) {
continue
Expand Down
19 changes: 3 additions & 16 deletions internal/pqutil/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,11 @@ import (
"strings"

"github.com/apache/arrow/go/v14/parquet"
"github.com/apache/arrow/go/v14/parquet/file"
pqschema "github.com/apache/arrow/go/v14/parquet/schema"
)

var ParquetStringType = pqschema.StringLogicalType{}

func GetParquetSchema(input parquet.ReaderAtSeeker) (*pqschema.Schema, error) {
fileReader, err := file.NewParquetReader(input)
if err != nil {
return nil, err
}
schema := fileReader.MetaData().Schema
if err := fileReader.Close(); err != nil {
return nil, err
}
return schema, nil
}

func LookupNode(schema *pqschema.Schema, name string) (pqschema.Node, bool) {
root := schema.Root()
index := root.FieldIndexByName(name)
Expand Down Expand Up @@ -131,7 +118,7 @@ func (w *parquetWriter) writeNode(node pqschema.Node, level int) {
func (w *parquetWriter) writeGroupNode(node *pqschema.GroupNode, level int) {
repetition := node.RepetitionType().String()
name := node.Name()
annotation := logicalOrConvertedAnnotation(node)
annotation := LogicalOrConvertedAnnotation(node)

w.writeLine(fmt.Sprintf("%s group %s%s {", repetition, name, annotation), level)
for i := 0; i < node.NumFields(); i += 1 {
Expand All @@ -144,12 +131,12 @@ func (w *parquetWriter) writePrimitiveNode(node *pqschema.PrimitiveNode, level i
repetition := node.RepetitionType().String()
name := node.Name()
nodeType := physicalTypeString(node.PhysicalType())
annotation := logicalOrConvertedAnnotation(node)
annotation := LogicalOrConvertedAnnotation(node)

w.writeLine(fmt.Sprintf("%s %s %s%s;", repetition, nodeType, name, annotation), level)
}

func logicalOrConvertedAnnotation(node pqschema.Node) string {
func LogicalOrConvertedAnnotation(node pqschema.Node) string {
logicalType := node.LogicalType()
convertedType := node.ConvertedType()

Expand Down
Loading

0 comments on commit b1560ba

Please sign in to comment.