Skip to content

Commit

Permalink
Configurable row group length for writing
Browse files Browse the repository at this point in the history
  • Loading branch information
tschaub committed Oct 1, 2023
1 parent b1b08b5 commit 0718ca8
Show file tree
Hide file tree
Showing 8 changed files with 300 additions and 14 deletions.
9 changes: 8 additions & 1 deletion cmd/gpq/command/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type ConvertCmd struct {
Max int `help:"Maximum number of features to consider when building a schema." default:"100"`
InputPrimaryColumn string `help:"Primary geometry column name when reading Parquet withtout metadata." default:"geometry"`
Compression string `help:"Parquet compression to use. Possible values: ${enum}." enum:"uncompressed, snappy, gzip, brotli, zstd" default:"zstd"`
RowGroupLength int `help:"Maximum number of rows per group when writing Parquet."`
}

type FormatType string
Expand Down Expand Up @@ -149,7 +150,12 @@ func (c *ConvertCmd) Run() error {
if outputFormat != ParquetType && outputFormat != GeoParquetType {
return errors.New("GeoJSON input can only be converted to GeoParquet")
}
convertOptions := &geojson.ConvertOptions{MinFeatures: c.Min, MaxFeatures: c.Max, Compression: c.Compression}
convertOptions := &geojson.ConvertOptions{
MinFeatures: c.Min,
MaxFeatures: c.Max,
Compression: c.Compression,
RowGroupLength: c.RowGroupLength,
}
return geojson.ToParquet(input, output, convertOptions)
}

Expand All @@ -160,6 +166,7 @@ func (c *ConvertCmd) Run() error {
convertOptions := &geoparquet.ConvertOptions{
InputPrimaryColumn: c.InputPrimaryColumn,
Compression: c.Compression,
RowGroupLength: c.RowGroupLength,
}

return geoparquet.FromParquet(input, output, convertOptions)
Expand Down
18 changes: 13 additions & 5 deletions internal/geojson/geojson.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,11 @@ func FromParquet(reader parquet.ReaderAtSeeker, writer io.Writer) error {
}

type ConvertOptions struct {
MinFeatures int
MaxFeatures int
Compression string
Metadata string
MinFeatures int
MaxFeatures int
Compression string
RowGroupLength int
Metadata string
}

var defaultOptions = &ConvertOptions{
Expand All @@ -80,12 +81,19 @@ func ToParquet(input io.Reader, output io.Writer, convertOptions *ConvertOptions
featuresRead := 0

var pqWriterProps *parquet.WriterProperties
var writerOptions []parquet.WriterProperty
if convertOptions.Compression != "" {
compression, err := pqutil.GetCompression(convertOptions.Compression)
if err != nil {
return err
}
pqWriterProps = parquet.NewWriterProperties(parquet.WithCompression(compression))
writerOptions = append(writerOptions, parquet.WithCompression(compression))
}
if convertOptions.RowGroupLength > 0 {
writerOptions = append(writerOptions, parquet.WithMaxRowGroupLength(int64(convertOptions.RowGroupLength)))
}
if len(writerOptions) > 0 {
pqWriterProps = parquet.NewWriterProperties(writerOptions...)
}

var featureWriter *geoparquet.FeatureWriter
Expand Down
36 changes: 36 additions & 0 deletions internal/geojson/geojson_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,42 @@ func TestToParquet(t *testing.T) {
assert.JSONEq(t, string(expected), geojsonBuffer.String())
}

func TestToParquetRowGroupLength3(t *testing.T) {
geojsonFile, openErr := os.Open("testdata/ten-points.geojson")
require.NoError(t, openErr)

parquetBuffer := &bytes.Buffer{}
toParquetErr := geojson.ToParquet(geojsonFile, parquetBuffer, &geojson.ConvertOptions{
RowGroupLength: 3,
})
assert.NoError(t, toParquetErr)

parquetInput := bytes.NewReader(parquetBuffer.Bytes())
fileReader, fileErr := file.NewParquetReader(parquetInput)
require.NoError(t, fileErr)
defer fileReader.Close()

assert.Equal(t, 4, fileReader.NumRowGroups())
}

func TestToParquetRowGroupLength5(t *testing.T) {
geojsonFile, openErr := os.Open("testdata/ten-points.geojson")
require.NoError(t, openErr)

parquetBuffer := &bytes.Buffer{}
toParquetErr := geojson.ToParquet(geojsonFile, parquetBuffer, &geojson.ConvertOptions{
RowGroupLength: 5,
})
assert.NoError(t, toParquetErr)

parquetInput := bytes.NewReader(parquetBuffer.Bytes())
fileReader, fileErr := file.NewParquetReader(parquetInput)
require.NoError(t, fileErr)
defer fileReader.Close()

assert.Equal(t, 2, fileReader.NumRowGroups())
}

func TestToParquetMismatchedTypes(t *testing.T) {
geojsonFile, openErr := os.Open("testdata/mismatched-types.geojson")
require.NoError(t, openErr)
Expand Down
105 changes: 105 additions & 0 deletions internal/geojson/testdata/ten-points.geojson
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
{
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"properties": {
"num": 0
},
"geometry": {
"type": "Point",
"coordinates": [0, 0]
}
},
{
"type": "Feature",
"properties": {
"num": 1
},
"geometry": {
"type": "Point",
"coordinates": [1, 1]
}
},
{
"type": "Feature",
"properties": {
"num": 2
},
"geometry": {
"type": "Point",
"coordinates": [2, 2]
}
},
{
"type": "Feature",
"properties": {
"num": 3
},
"geometry": {
"type": "Point",
"coordinates": [3, 3]
}
},
{
"type": "Feature",
"properties": {
"num": 4
},
"geometry": {
"type": "Point",
"coordinates": [4, 4]
}
},
{
"type": "Feature",
"properties": {
"num": 5
},
"geometry": {
"type": "Point",
"coordinates": [5, 5]
}
},
{
"type": "Feature",
"properties": {
"num": 6
},
"geometry": {
"type": "Point",
"coordinates": [6, 6]
}
},
{
"type": "Feature",
"properties": {
"num": 7
},
"geometry": {
"type": "Point",
"coordinates": [7, 7]
}
},
{
"type": "Feature",
"properties": {
"num": 8
},
"geometry": {
"type": "Point",
"coordinates": [8, 8]
}
},
{
"type": "Feature",
"properties": {
"num": 9
},
"geometry": {
"type": "Point",
"coordinates": [9, 9]
}
}
]
}
2 changes: 2 additions & 0 deletions internal/geoparquet/geoparquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
type ConvertOptions struct {
InputPrimaryColumn string
Compression string
RowGroupLength int
}

func getMetadata(fileReader *file.Reader, convertOptions *ConvertOptions) *Metadata {
Expand Down Expand Up @@ -171,6 +172,7 @@ func FromParquet(input parquet.ReaderAtSeeker, output io.Writer, convertOptions
TransformColumn: transformColumn,
BeforeClose: beforeClose,
Compression: compression,
RowGroupLength: convertOptions.RowGroupLength,
}

return pqutil.TransformByColumn(config)
Expand Down
40 changes: 36 additions & 4 deletions internal/pqutil/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type TransformConfig struct {
Reader parquet.ReaderAtSeeker
Writer io.Writer
Compression *compress.Compression
RowGroupLength int
TransformSchema SchemaTransformer
TransformColumn ColumnTransformer
BeforeClose func(*file.Reader, *file.Writer) error
Expand Down Expand Up @@ -50,6 +51,10 @@ func getWriterProperties(config *TransformConfig, fileReader *file.Reader) (*par
}
}

if config.RowGroupLength > 0 {
writerProperties = append(writerProperties, parquet.WithMaxRowGroupLength(int64(config.RowGroupLength)))
}

return parquet.NewWriterProperties(writerProperties...), nil
}

Expand Down Expand Up @@ -102,14 +107,33 @@ func TransformByColumn(config *TransformConfig) error {
fileWriter := file.NewParquetWriter(config.Writer, outputSchema.Root(), file.WithWriterProps(writerProperties))
defer fileWriter.Close()

rowGroupLength := int64(config.RowGroupLength)
if rowGroupLength == 0 {
if fileReader.NumRowGroups() > 0 {
rowGroupLength = fileReader.MetaData().RowGroups[0].NumRows
} else {
rowGroupLength = parquet.DefaultMaxRowGroupLen
}
}

ctx := pqarrow.NewArrowWriteContext(context.Background(), nil)

numRowGroups := fileReader.NumRowGroups()
for rowGroupIndex := 0; rowGroupIndex < numRowGroups; rowGroupIndex += 1 {
rowGroupReader := arrowReader.RowGroup(rowGroupIndex)
columnReaders := make([]*pqarrow.ColumnReader, numFields)
for fieldNum := 0; fieldNum < numFields; fieldNum += 1 {
colReader, err := arrowReader.GetColumn(ctx, fieldNum)
if err != nil {
return err
}
columnReaders[fieldNum] = colReader
}

numRows := fileReader.NumRows()
numRowsWritten := int64(0)
for {
rowGroupWriter := fileWriter.AppendRowGroup()
for fieldNum := 0; fieldNum < numFields; fieldNum += 1 {
arr, readErr := rowGroupReader.Column(fieldNum).Read(ctx)
colReader := columnReaders[fieldNum]
arr, readErr := colReader.NextBatch(rowGroupLength)
if readErr != nil {
return readErr
}
Expand All @@ -133,6 +157,14 @@ func TransformByColumn(config *TransformConfig) error {
return err
}
}
numRowsInGroup, err := rowGroupWriter.NumRows()
if err != nil {
return err
}
numRowsWritten += int64(numRowsInGroup)
if numRowsWritten >= numRows {
break
}
}

if config.BeforeClose != nil {
Expand Down
Loading

0 comments on commit 0718ca8

Please sign in to comment.