Skip to content

Commit

Permalink
PECO-1054 Expose Arrow batches to users, part two (databricks#163)
Browse files Browse the repository at this point in the history
Updated FetchableItems interface to return an instance of OutputType
instead of a slice of output type.
Created interfaces SparkArrowBatch and SparkArrowRecord and
implementations of each. This also changed the 1:1 ratio of batch
instances to arrow records. A SparkArrowBatch can contain multiple arrow
records now.
Created BatchIterator interface and implementation and switched
arrowRowScanner to use BatchIterator instead of BatchLoader
Created RowValues interface and implementation as a container for the
currently loaded values for a set of rows.
Updated the behaviour of fetchable items cloudURL and localBatch to
de-serialize the arrow records as part of fetching, rather than carry
around the raw bytes for later de-serialization. Also eliminated the
cloud fetch code that was de-serializing the arrow batch then
serializing each record individually to create one batch instance per
record.
Removed chunkedByteReader and replaced with io.MultiReader 
Normalized use of row number so that there is no need to track the index
of the row in the current batch.
  • Loading branch information
rcypher-databricks authored Sep 21, 2023
2 parents f7c0286 + cfceb51 commit f5b366e
Show file tree
Hide file tree
Showing 15 changed files with 719 additions and 585 deletions.
7 changes: 2 additions & 5 deletions internal/fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

type FetchableItems[OutputType any] interface {
Fetch(ctx context.Context) ([]OutputType, error)
Fetch(ctx context.Context) (OutputType, error)
}

type Fetcher[OutputType any] interface {
Expand Down Expand Up @@ -151,10 +151,7 @@ func work[I FetchableItems[O], O any](f *concurrentFetcher[I, O], workerIndex in
return
} else {
f.logger().Debug().Msgf("concurrent fetcher worker %d item loaded", workerIndex)
for i := range result {
r := result[i]
f.outChan <- r
}
f.outChan <- result
}
} else {
f.logger().Debug().Msgf("concurrent fetcher ending %d", workerIndex)
Expand Down
8 changes: 4 additions & 4 deletions internal/fetcher/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ func (m *mockFetchableItem) Fetch(ctx context.Context) ([]*mockOutput, error) {
return outputs, nil
}

var _ FetchableItems[*mockOutput] = (*mockFetchableItem)(nil)
var _ FetchableItems[[]*mockOutput] = (*mockFetchableItem)(nil)

func TestConcurrentFetcher(t *testing.T) {
t.Run("Comprehensively tests the concurrent fetcher", func(t *testing.T) {
ctx := context.Background()

inputChan := make(chan FetchableItems[*mockOutput], 10)
inputChan := make(chan FetchableItems[[]*mockOutput], 10)
for i := 0; i < 10; i++ {
item := mockFetchableItem{item: i, wait: 1 * time.Second}
inputChan <- &item
Expand All @@ -57,7 +57,7 @@ func TestConcurrentFetcher(t *testing.T) {

var results []*mockOutput
for result := range outChan {
results = append(results, result)
results = append(results, result...)
}

// Check if the fetcher returned the expected results
Expand Down Expand Up @@ -87,7 +87,7 @@ func TestConcurrentFetcher(t *testing.T) {
defer cancel()

// Create an input channel
inputChan := make(chan FetchableItems[*mockOutput], 3)
inputChan := make(chan FetchableItems[[]*mockOutput], 3)
for i := 0; i < 3; i++ {
item := mockFetchableItem{item: i, wait: 1 * time.Second}
inputChan <- &item
Expand Down
Loading

0 comments on commit f5b366e

Please sign in to comment.