Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report on compression in describe output #75

Merged
merged 1 commit into from
Sep 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions cmd/gpq/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,9 @@ func (c *ConvertCmd) Run() error {
return geojson.FromParquet(input, output)
}

var convertOptions *geoparquet.ConvertOptions
if c.InputPrimaryColumn != geoparquet.DefaultGeometryColumn {
convertOptions = &geoparquet.ConvertOptions{
InputPrimaryColumn: c.InputPrimaryColumn,
}
convertOptions := &geoparquet.ConvertOptions{
InputPrimaryColumn: c.InputPrimaryColumn,
Compression: c.Compression,
}

return geoparquet.FromParquet(input, output, convertOptions)
Expand Down
54 changes: 39 additions & 15 deletions cmd/gpq/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (
ColType = "Type"
ColAnnotation = "Annotation"
ColRepetition = "Repetition"
ColCompression = "Compression"
ColEncoding = "Encoding"
ColGeometryTypes = "Geometry Types"
ColBounds = "Bounds"
Expand Down Expand Up @@ -69,7 +70,7 @@ func (c *DescribeCmd) Run() error {
}

info := &Info{
Schema: buildSchema("", fileMetadata.Schema.Root()),
Schema: buildSchema(fileReader, "", fileMetadata.Schema.Root()),
Metadata: metadata,
NumRows: fileMetadata.NumRows,
}
Expand All @@ -83,7 +84,7 @@ func (c *DescribeCmd) Run() error {
func (c *DescribeCmd) formatText(info *Info) error {
metadata := info.Metadata

header := table.Row{ColName, ColType, ColAnnotation, ColRepetition}
header := table.Row{ColName, ColType, ColAnnotation, ColRepetition, ColCompression}
columnConfigs := []table.ColumnConfig{}
if metadata != nil {
header = append(header, ColEncoding, ColGeometryTypes, ColBounds, ColDetail)
Expand Down Expand Up @@ -121,7 +122,7 @@ func (c *DescribeCmd) formatText(info *Info) error {
} else if field.Optional {
repetition = "0..1"
}
row := table.Row{name, field.Type, field.Annotation, repetition}
row := table.Row{name, field.Type, field.Annotation, repetition, field.Compression}
if metadata != nil {
geoColumn, ok := metadata.Columns[field.Name]
if !ok {
Expand Down Expand Up @@ -199,19 +200,41 @@ type Info struct {
}

type Schema struct {
Name string `json:"name,omitempty"`
Optional bool `json:"optional,omitempty"`
Repeated bool `json:"repeated,omitempty"`
Type string `json:"type,omitempty"`
Annotation string `json:"annotation,omitempty"`
Fields []*Schema `json:"fields,omitempty"`
Name string `json:"name,omitempty"`
Optional bool `json:"optional,omitempty"`
Repeated bool `json:"repeated,omitempty"`
Type string `json:"type,omitempty"`
Annotation string `json:"annotation,omitempty"`
Compression string `json:"compression,omitempty"`
Fields []*Schema `json:"fields,omitempty"`
}

func buildSchema(name string, node schema.Node) *Schema {
func getCompression(fileReader *file.Reader, node schema.Node) string {
if _, ok := node.(*schema.GroupNode); ok {
return ""
}
if fileReader.NumRowGroups() == 0 {
return "unknown"
}
rowGroupReader := fileReader.RowGroup(0)
colIndex := fileReader.MetaData().Schema.ColumnIndexByName(node.Path())
if colIndex < 0 {
return "unknown"
}
col, err := rowGroupReader.MetaData().ColumnChunk(colIndex)
if err != nil {
return "unknown"
}
return strings.ToLower(col.Compression().String())
}

func buildSchema(fileReader *file.Reader, name string, node schema.Node) *Schema {
annotation := ""
logicalType := node.LogicalType()
if !logicalType.IsNone() {
annotation = strings.ToLower(logicalType.String())
} else if _, isGroup := node.(*schema.GroupNode); isGroup {
annotation = "group"
}

repetition := node.RepetitionType()
Expand All @@ -224,10 +247,11 @@ func buildSchema(name string, node schema.Node) *Schema {
}

field := &Schema{
Name: name,
Optional: optional,
Repeated: repeated,
Annotation: annotation,
Name: name,
Optional: optional,
Repeated: repeated,
Annotation: annotation,
Compression: getCompression(fileReader, node),
}

if leaf, ok := node.(*schema.PrimitiveNode); ok {
Expand Down Expand Up @@ -259,7 +283,7 @@ func buildSchema(name string, node schema.Node) *Schema {
field.Fields = make([]*Schema, count)
for i := 0; i < count; i += 1 {
groupField := group.Field(i)
field.Fields[i] = buildSchema(groupField.Name(), groupField)
field.Fields[i] = buildSchema(fileReader, groupField.Name(), groupField)
}
}
return field
Expand Down
174 changes: 90 additions & 84 deletions internal/geoparquet/geoparquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,29 @@ type ConvertOptions struct {
Compression string
}

func FromParquet(input parquet.ReaderAtSeeker, output io.Writer, convertOptions *ConvertOptions) error {
if convertOptions == nil {
convertOptions = &ConvertOptions{}
func getMetadata(fileReader *file.Reader, convertOptions *ConvertOptions) *Metadata {
metadata, err := GetMetadata(fileReader.MetaData().KeyValueMetadata())
if err != nil {
primaryColumn := DefaultGeometryColumn
if convertOptions.InputPrimaryColumn != "" {
primaryColumn = convertOptions.InputPrimaryColumn
}
metadata = &Metadata{
PrimaryColumn: primaryColumn,
Columns: map[string]*GeometryColumn{
primaryColumn: getDefaultGeometryColumn(),
},
}
}

primaryColumn := DefaultGeometryColumn
if convertOptions.InputPrimaryColumn != "" {
primaryColumn = convertOptions.InputPrimaryColumn
if convertOptions.InputPrimaryColumn != "" && metadata.PrimaryColumn != convertOptions.InputPrimaryColumn {
metadata.PrimaryColumn = convertOptions.InputPrimaryColumn
}
return metadata
}

metadata := &Metadata{
PrimaryColumn: primaryColumn,
Columns: map[string]*GeometryColumn{
primaryColumn: getDefaultGeometryColumn(),
},
func FromParquet(input parquet.ReaderAtSeeker, output io.Writer, convertOptions *ConvertOptions) error {
if convertOptions == nil {
convertOptions = &ConvertOptions{}
}

var compression *compress.Compression
Expand All @@ -49,95 +57,93 @@ func FromParquet(input parquet.ReaderAtSeeker, output io.Writer, convertOptions
compression = &c
}

parquetSchema, schemaErr := pqutil.GetParquetSchema(input)
if schemaErr != nil {
return fmt.Errorf("trouble getting parquet schema: %w", schemaErr)
}

datasetInfo := geo.NewDatasetInfo(true)
for fieldNum := 0; fieldNum < parquetSchema.Root().NumFields(); fieldNum += 1 {
field := parquetSchema.Root().Field(fieldNum)
name := field.Name()
if _, ok := metadata.Columns[name]; !ok {
continue
transformSchema := func(fileReader *file.Reader) (*schema.Schema, error) {
inputSchema := fileReader.MetaData().Schema
metadata := getMetadata(fileReader, convertOptions)
for fieldNum := 0; fieldNum < inputSchema.Root().NumFields(); fieldNum += 1 {
field := inputSchema.Root().Field(fieldNum)
name := field.Name()
if _, ok := metadata.Columns[name]; !ok {
continue
}
if field.LogicalType() == pqutil.ParquetStringType {
datasetInfo.AddCollection(name)
}
}
if field.LogicalType() == pqutil.ParquetStringType {
datasetInfo.AddCollection(name)

if datasetInfo.NumCollections() == 0 {
return inputSchema, nil
}
}

var transformSchema pqutil.SchemaTransformer
var transformColumn pqutil.ColumnTransformer
if datasetInfo.NumCollections() > 0 {
transformSchema = func(inputSchema *schema.Schema) (*schema.Schema, error) {
inputRoot := inputSchema.Root()
numFields := inputRoot.NumFields()

fields := make([]schema.Node, numFields)
for fieldNum := 0; fieldNum < numFields; fieldNum += 1 {
inputField := inputRoot.Field(fieldNum)
if !datasetInfo.HasCollection(inputField.Name()) {
fields[fieldNum] = inputField
continue
}
outputField, err := schema.NewPrimitiveNode(inputField.Name(), inputField.RepetitionType(), parquet.Types.ByteArray, -1, -1)
if err != nil {
return nil, err
}
fields[fieldNum] = outputField
}
inputRoot := inputSchema.Root()
numFields := inputRoot.NumFields()

outputRoot, err := schema.NewGroupNode(inputRoot.Name(), inputRoot.RepetitionType(), fields, -1)
fields := make([]schema.Node, numFields)
for fieldNum := 0; fieldNum < numFields; fieldNum += 1 {
inputField := inputRoot.Field(fieldNum)
if !datasetInfo.HasCollection(inputField.Name()) {
fields[fieldNum] = inputField
continue
}
outputField, err := schema.NewPrimitiveNode(inputField.Name(), inputField.RepetitionType(), parquet.Types.ByteArray, -1, -1)
if err != nil {
return nil, err
}
return schema.NewSchema(outputRoot), nil
fields[fieldNum] = outputField
}

transformColumn = func(inputField *arrow.Field, outputField *arrow.Field, chunked *arrow.Chunked) (*arrow.Chunked, error) {
if !datasetInfo.HasCollection(inputField.Name) {
return chunked, nil
outputRoot, err := schema.NewGroupNode(inputRoot.Name(), inputRoot.RepetitionType(), fields, -1)
if err != nil {
return nil, err
}
return schema.NewSchema(outputRoot), nil
}

transformColumn := func(inputField *arrow.Field, outputField *arrow.Field, chunked *arrow.Chunked) (*arrow.Chunked, error) {
if !datasetInfo.HasCollection(inputField.Name) {
return chunked, nil
}
chunks := chunked.Chunks()
transformed := make([]arrow.Array, len(chunks))
builder := array.NewBinaryBuilder(memory.DefaultAllocator, arrow.BinaryTypes.Binary)
defer builder.Release()

collectionInfo := geo.NewCollectionInfo(false)
for i, arr := range chunks {
stringArray, ok := arr.(*array.String)
if !ok {
return nil, fmt.Errorf("expected a string array for %q, got %v", inputField.Name, arr)
}
chunks := chunked.Chunks()
transformed := make([]arrow.Array, len(chunks))
builder := array.NewBinaryBuilder(memory.DefaultAllocator, arrow.BinaryTypes.Binary)
defer builder.Release()

collectionInfo := geo.NewCollectionInfo(false)
for i, arr := range chunks {
stringArray, ok := arr.(*array.String)
if !ok {
return nil, fmt.Errorf("expected a string array for %q, got %v", inputField.Name, arr)
for rowNum := 0; rowNum < stringArray.Len(); rowNum += 1 {
if outputField.Nullable && stringArray.IsNull(rowNum) {
builder.AppendNull()
continue
}
str := stringArray.Value(rowNum)
geometry, wktErr := wkt.Unmarshal(str)
if wktErr != nil {
return nil, wktErr
}
for rowNum := 0; rowNum < stringArray.Len(); rowNum += 1 {
if outputField.Nullable && stringArray.IsNull(rowNum) {
builder.AppendNull()
continue
}
str := stringArray.Value(rowNum)
geometry, wktErr := wkt.Unmarshal(str)
if wktErr != nil {
return nil, wktErr
}
value, wkbErr := wkb.Marshal(geometry)
if wkbErr != nil {
return nil, wkbErr
}
collectionInfo.AddType(geometry.GeoJSONType())
bounds := geometry.Bound()
collectionInfo.AddBounds(&bounds)
builder.Append(value)
value, wkbErr := wkb.Marshal(geometry)
if wkbErr != nil {
return nil, wkbErr
}
transformed[i] = builder.NewArray()
collectionInfo.AddType(geometry.GeoJSONType())
bounds := geometry.Bound()
collectionInfo.AddBounds(&bounds)
builder.Append(value)
}
datasetInfo.AddBounds(inputField.Name, collectionInfo.Bounds())
datasetInfo.AddTypes(inputField.Name, collectionInfo.Types())
chunked.Release()
return arrow.NewChunked(builder.Type(), transformed), nil
transformed[i] = builder.NewArray()
}
datasetInfo.AddBounds(inputField.Name, collectionInfo.Bounds())
datasetInfo.AddTypes(inputField.Name, collectionInfo.Types())
chunked.Release()
return arrow.NewChunked(builder.Type(), transformed), nil
}

beforeClose := func(fileWriter *file.Writer) error {
beforeClose := func(fileReader *file.Reader, fileWriter *file.Writer) error {
metadata := getMetadata(fileReader, convertOptions)
for name, geometryCol := range metadata.Columns {
if !datasetInfo.HasCollection(name) {
continue
Expand Down
19 changes: 3 additions & 16 deletions internal/pqutil/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,11 @@ import (
"strings"

"github.com/apache/arrow/go/v14/parquet"
"github.com/apache/arrow/go/v14/parquet/file"
pqschema "github.com/apache/arrow/go/v14/parquet/schema"
)

var ParquetStringType = pqschema.StringLogicalType{}

func GetParquetSchema(input parquet.ReaderAtSeeker) (*pqschema.Schema, error) {
fileReader, err := file.NewParquetReader(input)
if err != nil {
return nil, err
}
schema := fileReader.MetaData().Schema
if err := fileReader.Close(); err != nil {
return nil, err
}
return schema, nil
}

func LookupNode(schema *pqschema.Schema, name string) (pqschema.Node, bool) {
root := schema.Root()
index := root.FieldIndexByName(name)
Expand Down Expand Up @@ -131,7 +118,7 @@ func (w *parquetWriter) writeNode(node pqschema.Node, level int) {
func (w *parquetWriter) writeGroupNode(node *pqschema.GroupNode, level int) {
repetition := node.RepetitionType().String()
name := node.Name()
annotation := logicalOrConvertedAnnotation(node)
annotation := LogicalOrConvertedAnnotation(node)

w.writeLine(fmt.Sprintf("%s group %s%s {", repetition, name, annotation), level)
for i := 0; i < node.NumFields(); i += 1 {
Expand All @@ -144,12 +131,12 @@ func (w *parquetWriter) writePrimitiveNode(node *pqschema.PrimitiveNode, level i
repetition := node.RepetitionType().String()
name := node.Name()
nodeType := physicalTypeString(node.PhysicalType())
annotation := logicalOrConvertedAnnotation(node)
annotation := LogicalOrConvertedAnnotation(node)

w.writeLine(fmt.Sprintf("%s %s %s%s;", repetition, nodeType, name, annotation), level)
}

func logicalOrConvertedAnnotation(node pqschema.Node) string {
func LogicalOrConvertedAnnotation(node pqschema.Node) string {
logicalType := node.LogicalType()
convertedType := node.ConvertedType()

Expand Down
Loading
Loading