Skip to content

Commit

Permalink
Bug fix: The HasNext() function for Go driver arrow record iterator r…
Browse files Browse the repository at this point in the history
…eturning EOF for empty resultset.

Essentially, the HasNext() logic was assuming there were records in the result set in the case that the original return from executing the statement didn't specify, or if there were direct results.

Updated arrowRecordIterator.HasNext() to check state of underlying iterators. This handles the case of direct results that contain no records.
Updated resultPageIterator.HasNext() to try fetching a result page if necessary to determine if there are more records.

Added tests for empty result sets with direct results enabled and disabled.

Signed-off-by: Raymond Cypher <[email protected]>
  • Loading branch information
rcypher-databricks committed Jan 15, 2024
1 parent 91dced9 commit 5f9163e
Show file tree
Hide file tree
Showing 5 changed files with 288 additions and 7 deletions.
6 changes: 5 additions & 1 deletion internal/rows/arrowbased/arrowRecordIterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func (ri *arrowRecordIterator) Next() (arrow.Record, error) {

// Indicate whether there are any more records available
func (ri *arrowRecordIterator) HasNext() bool {
ri.checkFinished()
return !ri.isFinished
}

Expand All @@ -83,7 +84,10 @@ func (ri *arrowRecordIterator) Close() {
}

func (ri *arrowRecordIterator) checkFinished() {
finished := !ri.currentBatch.HasNext() && !ri.batchIterator.HasNext() && !ri.resultPageIterator.HasNext()
finished := ri.isFinished ||
((ri.currentBatch == nil || !ri.currentBatch.HasNext()) &&
(ri.batchIterator == nil || !ri.batchIterator.HasNext()) &&
(ri.resultPageIterator == nil || !ri.resultPageIterator.HasNext()))

if finished {
// Reached end of result set so Close
Expand Down
46 changes: 46 additions & 0 deletions internal/rows/rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -920,6 +920,52 @@ func TestGetArrowBatches(t *testing.T) {
assert.Equal(t, fetchResp3.Results.ArrowBatches[1].RowCount, r6.NumRows())
r6.Release()
})

t.Run("with empty result set, no direct results", func(t *testing.T) {
fetchResp1 := cli_service.TFetchResultsResp{}
loadTestData(t, "zeroRows/zeroRowsFetchResult.json", &fetchResp1)

client := getSimpleClient([]cli_service.TFetchResultsResp{fetchResp1})
cfg := config.WithDefaults()
rows, err := NewRows("connId", "corrId", nil, client, cfg, nil)
assert.Nil(t, err)

rows2, ok := rows.(dbsqlrows.Rows)
assert.True(t, ok)

rs, err2 := rows2.GetArrowBatches(context.Background())
assert.Nil(t, err2)

hasNext := rs.HasNext()
assert.False(t, hasNext)
r7, err2 := rs.Next()
assert.Nil(t, r7)
assert.ErrorContains(t, err2, io.EOF.Error())

})

t.Run("with empty result set, direct results", func(t *testing.T) {
executeStatementResp := cli_service.TExecuteStatementResp{}
loadTestData(t, "zeroRows/zeroRowsDirectResults.json", &executeStatementResp)
executeStatementResp.DirectResults.ResultSet.Results.ArrowBatches = []*cli_service.TSparkArrowBatch{}

client := getSimpleClient([]cli_service.TFetchResultsResp{})
cfg := config.WithDefaults()
rows, err := NewRows("connId", "corrId", nil, client, cfg, executeStatementResp.DirectResults)
assert.Nil(t, err)

rows2, ok := rows.(dbsqlrows.Rows)
assert.True(t, ok)

rs, err2 := rows2.GetArrowBatches(context.Background())
assert.Nil(t, err2)

hasNext := rs.HasNext()
assert.False(t, hasNext)
r7, err2 := rs.Next()
assert.Nil(t, r7)
assert.ErrorContains(t, err2, io.EOF.Error())
})
}

type rowTestPagingResult struct {
Expand Down
59 changes: 53 additions & 6 deletions internal/rows/rowscanner/resultPageIterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,46 @@ type resultPageIterator struct {
correlationId string

logger *dbsqllog.DBSQLLogger

// In some cases we don't know whether there are any records until we fetch
// the first result page. So our behaviour is to fetch a result page as necessary
// before Next() is called.
nextResultPage *cli_service.TFetchResultsResp

// Hold on to errors so they can be returned by Next()
err error
}

var _ ResultPageIterator = (*resultPageIterator)(nil)

// Returns true if there are more pages in the result set.
func (rpf *resultPageIterator) HasNext() bool { return !rpf.isFinished }
func (rpf *resultPageIterator) HasNext() bool {
if rpf.isFinished && rpf.nextResultPage == nil {
// There are no more pages to load and there isn't an already fetched
// page waiting to retrieved by Next()
rpf.err = io.EOF
return false
}

// If there isn't an already fetched result page try to fetch one now
if rpf.nextResultPage == nil {
nrp, err := rpf.getNextPage()
if err != nil {
rpf.Close()
rpf.isFinished = true
rpf.err = err
return false
}

rpf.err = nil
rpf.nextResultPage = nrp
if !nrp.GetHasMoreRows() {
rpf.Close()
}
}

return rpf.nextResultPage != nil
}

// Returns the next page of the result set. io.EOF will be returned if there are
// no more pages.
Expand All @@ -113,7 +147,18 @@ func (rpf *resultPageIterator) Next() (*cli_service.TFetchResultsResp, error) {
return nil, dbsqlerrint.NewDriverError(context.Background(), errRowsNilResultPageFetcher, nil)
}

if !rpf.HasNext() && rpf.nextResultPage == nil {
return nil, rpf.err
}

nrp := rpf.nextResultPage
rpf.nextResultPage = nil
return nrp, rpf.err
}

func (rpf *resultPageIterator) getNextPage() (*cli_service.TFetchResultsResp, error) {
if rpf.isFinished {
// no more result pages to fetch
return nil, io.EOF
}

Expand Down Expand Up @@ -168,14 +213,16 @@ func (rpf *resultPageIterator) Close() (err error) {
// need to do that now
if !rpf.closedOnServer {
rpf.closedOnServer = true
if rpf.client != nil {
req := cli_service.TCloseOperationReq{
OperationHandle: rpf.opHandle,
}

req := cli_service.TCloseOperationReq{
OperationHandle: rpf.opHandle,
_, err = rpf.client.CloseOperation(context.Background(), &req)
return err
}

_, err = rpf.client.CloseOperation(context.Background(), &req)
return err
}

return
}

Expand Down
111 changes: 111 additions & 0 deletions internal/rows/testdata/zeroRows/zeroRowsDirectResults.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
{
"status": {
"statusCode": "SUCCESS_STATUS"
},
"operationHandle": {
"operationId": {
"guid": "Ae6w6uAWEI+vWuB+fjGvOA==",
"secret": "M41SnYJyRuuEgstBlGaDnQ==",
"executionVersion": 0
},
"operationType": "EXECUTE_STATEMENT",
"hasResultSet": true
},
"directResults": {
"operationStatus": {
"status": {
"statusCode": "SUCCESS_STATUS"
},
"operationState": "FINISHED_STATE",
"operationStarted": 1705023332453,
"operationCompleted": 1705023332599,
"idempotencyType": "IDEMPOTENT",
"statementTimeout": 172800
},
"resultSetMetadata": {
"status": {
"statusCode": "SUCCESS_STATUS"
},
"schema": {
"columns": [
{
"columnName": "id",
"typeDesc": {
"types": [
{
"primitiveEntry": {
"type": "INT_TYPE"
}
}
]
},
"position": 1,
"comment": ""
},
{
"columnName": "deleted",
"typeDesc": {
"types": [
{
"primitiveEntry": {
"type": "BOOLEAN_TYPE"
}
}
]
},
"position": 2,
"comment": ""
},
{
"columnName": "name",
"typeDesc": {
"types": [
{
"primitiveEntry": {
"type": "STRING_TYPE"
}
}
]
},
"position": 3,
"comment": ""
}
]
},
"resultFormat": "ARROW_BASED_SET",
"lz4Compressed": false,
"arrowSchema": "/////2ACAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAMAAABsAQAArAAAAAQAAACy/v//FAAAAIgAAACIAAAAAAAFAYQAAAACAAAAQAAAAAQAAABo/v//CAAAABQAAAAIAAAAInN0cmluZyIAAAAAFwAAAFNwYXJrOkRhdGFUeXBlOkpzb25UeXBlAKD+//8IAAAAEAAAAAYAAABTVFJJTkcAABYAAABTcGFyazpEYXRhVHlwZTpTcWxOYW1lAAAAAAAAXP///wQAAABuYW1lAAAAAFb///8UAAAAiAAAAIwAAAAAAAYBiAAAAAIAAABAAAAABAAAAAz///8IAAAAFAAAAAkAAAAiYm9vbGVhbiIAAAAXAAAAU3Bhcms6RGF0YVR5cGU6SnNvblR5cGUARP///wgAAAAQAAAABwAAAEJPT0xFQU4AFgAAAFNwYXJrOkRhdGFUeXBlOlNxbE5hbWUAAAAAAAAEAAQABAAAAAcAAABkZWxldGVkAAAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAACMAAAAlAAAAAAAAgGYAAAAAgAAAEgAAAAEAAAAyP///wgAAAAUAAAACQAAACJpbnRlZ2VyIgAAABcAAABTcGFyazpEYXRhVHlwZTpKc29uVHlwZQAIAAwACAAEAAgAAAAIAAAADAAAAAMAAABJTlQAFgAAAFNwYXJrOkRhdGFUeXBlOlNxbE5hbWUAAAAAAAAIAAwACAAHAAgAAAAAAAABIAAAAAIAAABpZAAAAAAAAA==",
"cacheLookupResult": "LOCAL_CACHE_HIT",
"uncompressedBytes": 0,
"compressedBytes": 0,
"isStagingOperation": false,
"reasonForNoCloudFetch": "CLOUD_FETCH_SUPPORT",
"cacheLookupLatency": 4,
"remoteResultCacheEnabled": true,
"isServerless": true,
"truncatedByThriftLimit": false
},
"resultSet": {
"status": {
"statusCode": "SUCCESS_STATUS"
},
"hasMoreRows": false,
"results": {
"startRowOffset": 0,
"rows": []
}
},
"closeOperation": {
"status": {
"statusCode": "SUCCESS_STATUS"
}
}
},
"executionRejected": false,
"maxClusterCapacity": 10,
"queryCost": 0.5,
"currentClusterLoad": 1,
"idempotencyType": "IDEMPOTENT",
"remoteResultCacheEnabled": true,
"isServerless": true
}
73 changes: 73 additions & 0 deletions internal/rows/testdata/zeroRows/zeroRowsFetchResult.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
{
"status": {
"statusCode": "SUCCESS_STATUS"
},
"hasMoreRows": false,
"results": {
"startRowOffset": 0,
"rows": []
},
"resultSetMetadata": {
"status": {
"statusCode": "SUCCESS_STATUS"
},
"schema": {
"columns": [
{
"columnName": "id",
"typeDesc": {
"types": [
{
"primitiveEntry": {
"type": "INT_TYPE"
}
}
]
},
"position": 1,
"comment": ""
},
{
"columnName": "deleted",
"typeDesc": {
"types": [
{
"primitiveEntry": {
"type": "BOOLEAN_TYPE"
}
}
]
},
"position": 2,
"comment": ""
},
{
"columnName": "name",
"typeDesc": {
"types": [
{
"primitiveEntry": {
"type": "STRING_TYPE"
}
}
]
},
"position": 3,
"comment": ""
}
]
},
"resultFormat": "ARROW_BASED_SET",
"lz4Compressed": false,
"arrowSchema": "/////2ACAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAMAAABsAQAArAAAAAQAAACy/v//FAAAAIgAAACIAAAAAAAFAYQAAAACAAAAQAAAAAQAAABo/v//CAAAABQAAAAIAAAAInN0cmluZyIAAAAAFwAAAFNwYXJrOkRhdGFUeXBlOkpzb25UeXBlAKD+//8IAAAAEAAAAAYAAABTVFJJTkcAABYAAABTcGFyazpEYXRhVHlwZTpTcWxOYW1lAAAAAAAAXP///wQAAABuYW1lAAAAAFb///8UAAAAiAAAAIwAAAAAAAYBiAAAAAIAAABAAAAABAAAAAz///8IAAAAFAAAAAkAAAAiYm9vbGVhbiIAAAAXAAAAU3Bhcms6RGF0YVR5cGU6SnNvblR5cGUARP///wgAAAAQAAAABwAAAEJPT0xFQU4AFgAAAFNwYXJrOkRhdGFUeXBlOlNxbE5hbWUAAAAAAAAEAAQABAAAAAcAAABkZWxldGVkAAAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAACMAAAAlAAAAAAAAgGYAAAAAgAAAEgAAAAEAAAAyP///wgAAAAUAAAACQAAACJpbnRlZ2VyIgAAABcAAABTcGFyazpEYXRhVHlwZTpKc29uVHlwZQAIAAwACAAEAAgAAAAIAAAADAAAAAMAAABJTlQAFgAAAFNwYXJrOkRhdGFUeXBlOlNxbE5hbWUAAAAAAAAIAAwACAAHAAgAAAAAAAABIAAAAAIAAABpZAAAAAAAAA==",
"cacheLookupResult": "LOCAL_CACHE_HIT",
"uncompressedBytes": 0,
"compressedBytes": 0,
"isStagingOperation": false,
"reasonForNoCloudFetch": "CLOUD_FETCH_SUPPORT",
"cacheLookupLatency": 2,
"remoteResultCacheEnabled": true,
"isServerless": true,
"truncatedByThriftLimit": false
}
}

0 comments on commit 5f9163e

Please sign in to comment.