Skip to content

Commit

Permalink
Merge pull request #74 from planetlabs/overhaul
Browse files Browse the repository at this point in the history
Implementation overhaul
  • Loading branch information
tschaub authored Sep 24, 2023
2 parents abc5667 + 60139cb commit 6ab960f
Show file tree
Hide file tree
Showing 57 changed files with 3,741 additions and 2,567 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
go-version: '1.20'
- uses: golangci/golangci-lint-action@v3
with:
version: v1.51.2
version: v1.54.1
args: "--out-${NO_FUTURE}format colored-line-number"

test:
Expand Down
18 changes: 4 additions & 14 deletions cmd/gpq/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/planetlabs/gpq/internal/geojson"
"github.com/planetlabs/gpq/internal/geoparquet"
"github.com/segmentio/parquet-go"
)

type ConvertCmd struct {
Expand All @@ -33,7 +32,7 @@ type ConvertCmd struct {
Min int `help:"Minimum number of features to consider when building a schema." default:"10"`
Max int `help:"Maximum number of features to consider when building a schema." default:"100"`
InputPrimaryColumn string `help:"Primary geometry column name when reading Parquet withtout metadata." default:"geometry"`
Compression string `help:"Parquet compression to use. Possible values: ${enum}." enum:"uncompressed, snappy, gzip, brotli, zstd, lz4raw" default:"gzip"`
Compression string `help:"Parquet compression to use. Possible values: ${enum}." enum:"uncompressed, snappy, gzip, brotli, zstd" default:"zstd"`
}

type FormatType string
Expand Down Expand Up @@ -111,18 +110,8 @@ func (c *ConvertCmd) Run() error {
return geojson.ToParquet(input, output, convertOptions)
}

stat, statErr := os.Stat(c.Input)
if statErr != nil {
return fmt.Errorf("failed to get size of %q: %w", c.Input, statErr)
}

file, fileErr := parquet.OpenFile(input, stat.Size())
if fileErr != nil {
return fileErr
}

if outputFormat == GeoJSONType {
return geojson.FromParquet(file, output)
return geojson.FromParquet(input, output)
}

var convertOptions *geoparquet.ConvertOptions
Expand All @@ -131,5 +120,6 @@ func (c *ConvertCmd) Run() error {
InputPrimaryColumn: c.InputPrimaryColumn,
}
}
return geoparquet.FromParquet(file, output, convertOptions)

return geoparquet.FromParquet(input, output, convertOptions)
}
77 changes: 43 additions & 34 deletions cmd/gpq/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import (
"strconv"
"strings"

"github.com/apache/arrow/go/v14/parquet"
"github.com/apache/arrow/go/v14/parquet/file"
"github.com/apache/arrow/go/v14/parquet/schema"
"github.com/jedib0t/go-pretty/v6/table"
"github.com/jedib0t/go-pretty/v6/text"
"github.com/planetlabs/gpq/internal/geoparquet"
"github.com/segmentio/parquet-go"
"golang.org/x/term"
)

Expand Down Expand Up @@ -53,29 +55,23 @@ func (c *DescribeCmd) Run() error {
}
defer input.Close()

stat, statErr := os.Stat(c.Input)
if statErr != nil {
return fmt.Errorf("failed to get size of %q: %w", c.Input, statErr)
}

file, fileErr := parquet.OpenFile(input, stat.Size())
fileReader, fileErr := file.NewParquetReader(input)
if fileErr != nil {
return fileErr
return fmt.Errorf("failed to read %q as parquet: %w", c.Input, fileErr)
}

metadata, geoErr := geoparquet.GetMetadata(file)
fileMetadata := fileReader.MetaData()
metadata, geoErr := geoparquet.GetMetadata(fileMetadata.KeyValueMetadata())
if geoErr != nil {
if !errors.Is(geoErr, geoparquet.ErrNoMetadata) {
return geoErr
}
}

schema := buildSchema("", file.Schema())

info := &Info{
Schema: schema,
Schema: buildSchema("", fileMetadata.Schema.Root()),
Metadata: metadata,
NumRows: file.NumRows(),
NumRows: fileMetadata.NumRows,
}

if c.Format == "json" {
Expand Down Expand Up @@ -211,47 +207,60 @@ type Schema struct {
Fields []*Schema `json:"fields,omitempty"`
}

func buildSchema(name string, node parquet.Node) *Schema {
nodeType := node.Type()
func buildSchema(name string, node schema.Node) *Schema {
annotation := ""
if logicalType := nodeType.LogicalType(); logicalType != nil {
annotation = logicalType.String()
logicalType := node.LogicalType()
if !logicalType.IsNone() {
annotation = strings.ToLower(logicalType.String())
}

repetition := node.RepetitionType()
optional := false
repeated := false
if repetition == parquet.Repetitions.Optional {
optional = true
} else if repetition == parquet.Repetitions.Repeated {
repeated = true
}

field := &Schema{
Name: name,
Optional: node.Optional(),
Repeated: node.Repeated(),
Optional: optional,
Repeated: repeated,
Annotation: annotation,
}

if node.Leaf() {
switch nodeType.Kind() {
case parquet.Boolean:
if leaf, ok := node.(*schema.PrimitiveNode); ok {
switch leaf.PhysicalType() {
case parquet.Types.Boolean:
field.Type = "boolean"
case parquet.Int32:
case parquet.Types.Int32:
field.Type = "int32"
case parquet.Int64:
case parquet.Types.Int64:
field.Type = "int64"
case parquet.Int96:
case parquet.Types.Int96:
field.Type = "int96"
case parquet.Float:
case parquet.Types.Float:
field.Type = "float"
case parquet.Double:
case parquet.Types.Double:
field.Type = "double"
case parquet.ByteArray:
case parquet.Types.ByteArray:
field.Type = "binary"
case parquet.FixedLenByteArray:
field.Type = fmt.Sprintf("fixed_len_byte_array(%d)", nodeType.Length())
case parquet.Types.FixedLenByteArray:
field.Type = fmt.Sprintf("fixed_len_byte_array(%d)", leaf.TypeLength())
default:
field.Type = "unknown"
field.Type = leaf.PhysicalType().String()
}
return field
}

field.Fields = make([]*Schema, len(node.Fields()))
for i, groupField := range node.Fields() {
field.Fields[i] = buildSchema(groupField.Name(), groupField)
if group, ok := node.(*schema.GroupNode); ok {
count := group.NumFields()
field.Fields = make([]*Schema, count)
for i := 0; i < count; i += 1 {
groupField := group.Field(i)
field.Fields[i] = buildSchema(groupField.Name(), groupField)
}
}
return field
}
41 changes: 25 additions & 16 deletions cmd/wasm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ import (
"strings"
"syscall/js"

"github.com/apache/arrow/go/v14/parquet/file"
"github.com/planetlabs/gpq/internal/geojson"
"github.com/segmentio/parquet-go"
"github.com/planetlabs/gpq/internal/geoparquet"
"github.com/planetlabs/gpq/internal/pqutil"
)

var uint8ArrayConstructor = js.Global().Get("Uint8Array")
Expand Down Expand Up @@ -56,24 +58,28 @@ var fromParquet = js.FuncOf(func(this js.Value, args []js.Value) any {
data := make([]byte, numBytes)
js.CopyBytesToGo(data, args[0])

input, fileErr := parquet.OpenFile(bytes.NewReader(data), int64(numBytes))
if fileErr != nil {
return returnFromError(fileErr)
}

output := &bytes.Buffer{}
convertErr := geojson.FromParquet(input, output)
convertErr := geojson.FromParquet(bytes.NewReader(data), output)
if convertErr != nil {
return returnFromError(convertErr)
}

metadata, _ := input.Lookup("geo")
reader, readerErr := file.NewParquetReader(bytes.NewReader(data))
if readerErr != nil {
return returnFromError(readerErr)
}
defer reader.Close()

metadata, metadataErr := geoparquet.GetMetadataValue(reader.MetaData().KeyValueMetadata())
if metadataErr != nil {
return returnFromError(metadataErr)
}

return returnFromValue(map[string]any{
"data": output.String(),
"geo": metadata,
"schema": input.Schema().String(),
"records": input.NumRows(),
"schema": pqutil.ParquetSchemaString(reader.MetaData().Schema),
"records": reader.NumRows(),
})
})

Expand All @@ -93,21 +99,24 @@ var toParquet = js.FuncOf(func(this js.Value, args []js.Value) any {
return returnFromError(convertErr)
}

file, err := parquet.OpenFile(bytes.NewReader(output.Bytes()), int64(output.Len()))
if err != nil {
return returnFromError(err)
reader, readerErr := file.NewParquetReader(bytes.NewReader(output.Bytes()))
if readerErr != nil {
return returnFromError(readerErr)
}

metadata, _ := file.Lookup("geo")
metadata, metadataErr := geoparquet.GetMetadataValue(reader.MetaData().KeyValueMetadata())
if metadataErr != nil {
return returnFromError(metadataErr)
}

array := uint8ArrayConstructor.New(output.Len())
js.CopyBytesToJS(array, output.Bytes())

return returnFromValue(map[string]any{
"data": array,
"geo": metadata,
"schema": file.Schema().String(),
"records": file.NumRows(),
"schema": pqutil.ParquetSchemaString(reader.MetaData().Schema),
"records": reader.NumRows(),
})
})

Expand Down
38 changes: 27 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,34 +1,50 @@
module github.com/planetlabs/gpq

go 1.20
go 1.21

require (
github.com/alecthomas/kong v0.8.0
github.com/apache/arrow/go/v14 v14.0.0-20230922164031-772a01c080ad
github.com/fatih/color v1.15.0
github.com/jedib0t/go-pretty/v6 v6.4.7
github.com/paulmach/orb v0.10.0
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1
github.com/segmentio/encoding v0.3.6
github.com/segmentio/parquet-go v0.0.0-20230605165518-1fd7f3303070
github.com/stretchr/testify v1.8.4
golang.org/x/term v0.12.0
)

require (
github.com/andybalholm/brotli v1.0.3 // indirect
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/apache/thrift v0.17.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/klauspost/compress v1.15.9 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v23.5.26+incompatible // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.17 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/mattn/go-runewidth v0.0.13 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/pierrec/lz4/v4 v4.1.9 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/segmentio/asm v1.1.3 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.mongodb.org/mongo-driver v1.11.4 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.15.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/tools v0.13.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
google.golang.org/grpc v1.54.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit 6ab960f

Please sign in to comment.