Skip to content

Commit

Permalink
Handle Context
Browse files Browse the repository at this point in the history
Abort when context is cancelled.
  • Loading branch information
cwarden committed Dec 27, 2023
1 parent 918e235 commit 2bb4100
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 31 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.7.0
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.0
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.1.0
github.com/ForceCLI/config v0.0.0-20230217143549-9149d42a3c99
github.com/ForceCLI/force v1.0.5-0.20231222230115-685c4cb0390a
github.com/ForceCLI/force v1.0.5-0.20231227180521-1b251cf1a8b0
github.com/Masterminds/sprig/v3 v3.2.3
github.com/MicahParks/keyfunc v1.9.0
github.com/NYTimes/gziphandler v1.1.1
Expand Down Expand Up @@ -140,6 +139,7 @@ require (
)

require (
github.com/ForceCLI/config v0.0.0-20230217143549-9149d42a3c99 // indirect
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
github.com/Masterminds/goutils v1.1.1 // indirect
github.com/Masterminds/semver/v3 v3.2.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -651,8 +651,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/ForceCLI/config v0.0.0-20230217143549-9149d42a3c99 h1:H2axnitaP3Dw+tocMHPQHjM2wJ/+grF8sOIQGaJeEsg=
github.com/ForceCLI/config v0.0.0-20230217143549-9149d42a3c99/go.mod h1:WHFXv3VIHldTnYGmWAXAxsu4O754A9Zakq4DedI8PSA=
github.com/ForceCLI/force v1.0.5-0.20231222230115-685c4cb0390a h1:WmP/xGiagp8QK/p20tv9tXV5tUqQro//Y/QrUD3p0nI=
github.com/ForceCLI/force v1.0.5-0.20231222230115-685c4cb0390a/go.mod h1:qxApCXCTLnYtf2TRZbhnX3dcyEca29wss6YI8qZNMiI=
github.com/ForceCLI/force v1.0.5-0.20231227180521-1b251cf1a8b0 h1:XPYvEs+GpfNekTXPfOfkUWpbRYpOVorykDs6IPzlax8=
github.com/ForceCLI/force v1.0.5-0.20231227180521-1b251cf1a8b0/go.mod h1:qxApCXCTLnYtf2TRZbhnX3dcyEca29wss6YI8qZNMiI=
github.com/ForceCLI/inflect v0.0.0-20130829110746-cc00b5ad7a6a h1:mMd54YgLoeupNpbph3KdwvF58O0lZ72RQaJ2cFPOFDE=
github.com/ForceCLI/inflect v0.0.0-20130829110746-cc00b5ad7a6a/go.mod h1:DGKmCfb9oo5BivGO+szHk2ZvlqPDTlW4AYVpRBIVbms=
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c h1:RGWPOewvKIROun94nF7v2cua9qP+thov/7M50KEoeSU=
Expand Down
62 changes: 37 additions & 25 deletions runtime/drivers/salesforce/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ import (

// Forceable represents an object which can talk to salesforce
type Forceable interface {
CreateBulkJob(jobInfo force.JobInfo, requestOptions ...func(*http.Request)) (force.JobInfo, error)
BulkQuery(soql string, jobID string, contentType string, requestOptions ...func(*http.Request)) (force.BatchInfo, error)
CreateBulkJobWithContext(ctx context.Context, jobInfo force.JobInfo, requestOptions ...func(*http.Request)) (force.JobInfo, error)
BulkQueryWithContext(ctx context.Context, soql string, jobID string, contentType string, requestOptions ...func(*http.Request)) (force.BatchInfo, error)
CloseBulkJobWithContext(context context.Context, jobID string) (force.JobInfo, error)
GetJobInfo(jobID string) (force.JobInfo, error)
GetBatches(jobID string) ([]force.BatchInfo, error)
GetBatchInfo(jobID string, batchID string) (force.BatchInfo, error)
RetrieveBulkQuery(jobID string, batchID string) ([]byte, error)
RetrieveBulkJobQueryResultsWithCallback(job force.JobInfo, batchID string, resultID string, callback force.HttpCallback) error
GetJobInfoWithContext(ctx context.Context, jobID string) (force.JobInfo, error)
GetBatchesWithContext(ctx context.Context, jobID string) ([]force.BatchInfo, error)
GetBatchInfoWithContext(ctx context.Context, jobID string, batchID string) (force.BatchInfo, error)
RetrieveBulkQueryWithContext(ctx context.Context, jobID string, batchID string) ([]byte, error)
RetrieveBulkJobQueryResultsWithCallbackWithContext(ctx context.Context, job force.JobInfo, batchID string, resultID string, callback force.HttpCallback) error
}

type batchResult struct {
Expand Down Expand Up @@ -80,7 +80,7 @@ func makeBulkJob(session Forceable, objectName, query string, queryAll bool, log
func (c *connection) startJob(ctx context.Context, j *bulkJob) error {
session := j.session

jobInfo, err := session.CreateBulkJob(j.job, func(request *http.Request) {
jobInfo, err := session.CreateBulkJobWithContext(ctx, j.job, func(request *http.Request) {
if isPKChunkingEnabled(j) {
pkChunkHeader := "chunkSize=" + strconv.Itoa(j.pkChunkSize)
parent := parentObject(j.objectName)
Expand All @@ -98,15 +98,15 @@ func (c *connection) startJob(ctx context.Context, j *bulkJob) error {
}
return err
}
result, err := session.BulkQuery(j.query, jobInfo.Id, j.job.ContentType)
result, err := session.BulkQueryWithContext(ctx, j.query, jobInfo.Id, j.job.ContentType)
if err != nil {
return errors.New("bulk query failed with " + err.Error())
}
batchID := result.Id
// wait for chunking to complete
if isPKChunkingEnabled(j) {
for {
batchInfo, err := session.GetBatchInfo(jobInfo.Id, batchID)
batchInfo, err := session.GetBatchInfoWithContext(ctx, jobInfo.Id, batchID)
if err != nil {
return errors.New("bulk job status failed with " + err.Error())
}
Expand All @@ -115,27 +115,34 @@ func (c *connection) startJob(ctx context.Context, j *bulkJob) error {
break
}
c.logger.Info("Waiting for pk chunking to complete")
time.Sleep(2000 * time.Millisecond)
select {
case <-time.After(2 * time.Second):
case <-ctx.Done():
return fmt.Errorf("startJob cancelled: %w", ctx.Err())
}
}
}

c.logger.Info("Waiting for pk chunking to complete")
jobInfo, err = session.CloseBulkJobWithContext(ctx, jobInfo.Id)
if err != nil {
return err
}
var status force.JobInfo

for {
status, err = session.GetJobInfo(jobInfo.Id)
status, err = session.GetJobInfoWithContext(ctx, jobInfo.Id)
if err != nil {
return errors.New("bulk job status failed with " + err.Error())
}
if status.NumberBatchesCompleted+status.NumberBatchesFailed == status.NumberBatchesTotal {
break
}
c.logger.Info("Waiting for bulk export to complete")
time.Sleep(2000 * time.Millisecond)
select {
case <-time.After(2 * time.Second):
case <-ctx.Done():
return fmt.Errorf("startJob cancelled: %w", ctx.Err())
}
}

j.job = status
Expand All @@ -156,7 +163,7 @@ func (j *bulkJob) getBatches(ctx context.Context) error {

if isPKChunkingEnabled(j) {
var allBatches []force.BatchInfo
allBatches, err = j.session.GetBatches(j.job.Id)
allBatches, err = j.session.GetBatchesWithContext(ctx, j.job.Id)
// for pk chunking enabled jobs the first batch has no results
if allBatches != nil {
if allBatches[0].State == "Failed" {
Expand All @@ -170,15 +177,15 @@ func (j *bulkJob) getBatches(ctx context.Context) error {
}
}
} else {
batch, berr := j.session.GetBatchInfo(j.jobID, j.batchID)
batch, berr := j.session.GetBatchInfoWithContext(ctx, j.jobID, j.batchID)
err = berr
batches = []force.BatchInfo{batch}
}
if err != nil {
return fmt.Errorf("%s %w", errorMessage+"batch status failed with ", err)
}
for _, b := range batches {
results, err := getBatchResults(j.session, j.job, b)
results, err := getBatchResults(ctx, j.session, j.job, b)
if err != nil {
return fmt.Errorf("%s %w", errorMessage+"batch results failed with ", err)
}
Expand All @@ -187,7 +194,7 @@ func (j *bulkJob) getBatches(ctx context.Context) error {
return nil
}

func (j *bulkJob) retrieveJobResult(result int) (string, error) {
func (j *bulkJob) retrieveJobResult(ctx context.Context, result int) (string, error) {
batchResult := j.results[result]
writer, err := os.CreateTemp("", "batchResult-"+batchResult.resultID+"-*.csv")
if err != nil {
Expand All @@ -197,8 +204,8 @@ func (j *bulkJob) retrieveJobResult(result int) (string, error) {
writer.Close()
}()

httpBody := fetchBatchResult(j, batchResult, j.logger)
err = readAndWriteBody(j, httpBody, writer)
httpBody := fetchBatchResult(ctx, j, batchResult, j.logger)
err = readAndWriteBody(ctx, j, httpBody, writer)
if closer, ok := httpBody.(io.ReadCloser); ok {
closer.Close()
}
Expand All @@ -208,7 +215,7 @@ func (j *bulkJob) retrieveJobResult(result int) (string, error) {
return writer.Name(), nil
}

func fetchBatchResult(j *bulkJob, resultInfo batchResult, logger *zap.Logger) io.Reader {
func fetchBatchResult(ctx context.Context, j *bulkJob, resultInfo batchResult, logger *zap.Logger) io.Reader {
errorMessage := "Could not fetch job result. Reason: "

if resultInfo.batch.State == "Failed" {
Expand All @@ -220,7 +227,7 @@ func fetchBatchResult(j *bulkJob, resultInfo batchResult, logger *zap.Logger) io
return bytes.NewReader(nil)
}
var result io.Reader
err := j.session.RetrieveBulkJobQueryResultsWithCallback(j.job, resultInfo.batch.Id, resultInfo.resultID, func(r *http.Response) error {
err := j.session.RetrieveBulkJobQueryResultsWithCallbackWithContext(ctx, j.job, resultInfo.batch.Id, resultInfo.resultID, func(r *http.Response) error {
result = r.Body
return nil
})
Expand All @@ -231,7 +238,7 @@ func fetchBatchResult(j *bulkJob, resultInfo batchResult, logger *zap.Logger) io
return result
}

func readAndWriteBody(j *bulkJob, httpBody io.Reader, w io.Writer) error {
func readAndWriteBody(ctx context.Context, j *bulkJob, httpBody io.Reader, w io.Writer) error {
recReader := j.RecordReader(httpBody)
for {
records, err := recReader.Next()
Expand All @@ -243,15 +250,20 @@ func readAndWriteBody(j *bulkJob, httpBody io.Reader, w io.Writer) error {
if _, err := io.Copy(w, bytes.NewReader(records.Bytes)); err != nil {
return fmt.Errorf("write failed: %w", err)
}
select {
case <-ctx.Done():
return fmt.Errorf("readAndWriteBody cancelled: %w", ctx.Err())
default:
}
}
}

// Get all of the results for a batch. Most batches have one results, but
// large batches can be split into multiple result files.
func getBatchResults(session Forceable, job force.JobInfo, batch force.BatchInfo) ([]batchResult, error) {
func getBatchResults(ctx context.Context, session Forceable, job force.JobInfo, batch force.BatchInfo) ([]batchResult, error) {
var resultIDs []string
var results []batchResult
jobInfo, err := session.RetrieveBulkQuery(job.Id, batch.Id)
jobInfo, err := session.RetrieveBulkQueryWithContext(ctx, job.Id, batch.Id)
if err != nil {
return nil, err
}
Expand Down
3 changes: 1 addition & 2 deletions runtime/drivers/salesforce/sql_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ func (c *connection) QueryAsFiles(ctx context.Context, props map[string]any, opt

job := makeBulkJob(session, srcProps.SObject, srcProps.SOQL, srcProps.QueryAll, c.logger)

// TODO: Handle context in startJob
err = c.startJob(ctx, job)
if err != nil {
return nil, err
Expand Down Expand Up @@ -124,7 +123,7 @@ func (j *bulkJob) Next() ([]string, error) {
if j.nextResult == len(j.results) {
return nil, io.EOF
}
tempFile, err := j.retrieveJobResult(j.nextResult)
tempFile, err := j.retrieveJobResult(context.Background(), j.nextResult)
if err != nil {
return nil, fmt.Errorf("Failed to retrieve batch: %w", err)
}
Expand Down

0 comments on commit 2bb4100

Please sign in to comment.