diff --git a/go.mod b/go.mod index 41a5d9aadcf7..244a7699ed43 100644 --- a/go.mod +++ b/go.mod @@ -10,8 +10,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/azcore v1.7.0 github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.0 github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.1.0 - github.com/ForceCLI/config v0.0.0-20230217143549-9149d42a3c99 - github.com/ForceCLI/force v1.0.5-0.20231222230115-685c4cb0390a + github.com/ForceCLI/force v1.0.5-0.20231227180521-1b251cf1a8b0 github.com/Masterminds/sprig/v3 v3.2.3 github.com/MicahParks/keyfunc v1.9.0 github.com/NYTimes/gziphandler v1.1.1 @@ -140,6 +139,7 @@ require ( ) require ( + github.com/ForceCLI/config v0.0.0-20230217143549-9149d42a3c99 // indirect github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect github.com/Masterminds/goutils v1.1.1 // indirect github.com/Masterminds/semver/v3 v3.2.1 // indirect diff --git a/go.sum b/go.sum index be0007bc2a93..f9d49d20b107 100644 --- a/go.sum +++ b/go.sum @@ -651,8 +651,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/ForceCLI/config v0.0.0-20230217143549-9149d42a3c99 h1:H2axnitaP3Dw+tocMHPQHjM2wJ/+grF8sOIQGaJeEsg= github.com/ForceCLI/config v0.0.0-20230217143549-9149d42a3c99/go.mod h1:WHFXv3VIHldTnYGmWAXAxsu4O754A9Zakq4DedI8PSA= -github.com/ForceCLI/force v1.0.5-0.20231222230115-685c4cb0390a h1:WmP/xGiagp8QK/p20tv9tXV5tUqQro//Y/QrUD3p0nI= -github.com/ForceCLI/force v1.0.5-0.20231222230115-685c4cb0390a/go.mod h1:qxApCXCTLnYtf2TRZbhnX3dcyEca29wss6YI8qZNMiI= +github.com/ForceCLI/force v1.0.5-0.20231227180521-1b251cf1a8b0 h1:XPYvEs+GpfNekTXPfOfkUWpbRYpOVorykDs6IPzlax8= +github.com/ForceCLI/force v1.0.5-0.20231227180521-1b251cf1a8b0/go.mod h1:qxApCXCTLnYtf2TRZbhnX3dcyEca29wss6YI8qZNMiI= github.com/ForceCLI/inflect v0.0.0-20130829110746-cc00b5ad7a6a h1:mMd54YgLoeupNpbph3KdwvF58O0lZ72RQaJ2cFPOFDE= github.com/ForceCLI/inflect v0.0.0-20130829110746-cc00b5ad7a6a/go.mod h1:DGKmCfb9oo5BivGO+szHk2ZvlqPDTlW4AYVpRBIVbms= github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c h1:RGWPOewvKIROun94nF7v2cua9qP+thov/7M50KEoeSU= diff --git a/runtime/drivers/salesforce/bulk.go b/runtime/drivers/salesforce/bulk.go index 7cc4236a1498..8faa0844956a 100644 --- a/runtime/drivers/salesforce/bulk.go +++ b/runtime/drivers/salesforce/bulk.go @@ -20,14 +20,14 @@ import ( // Forceable represents an object which can talk to salesforce type Forceable interface { - CreateBulkJob(jobInfo force.JobInfo, requestOptions ...func(*http.Request)) (force.JobInfo, error) - BulkQuery(soql string, jobID string, contentType string, requestOptions ...func(*http.Request)) (force.BatchInfo, error) + CreateBulkJobWithContext(ctx context.Context, jobInfo force.JobInfo, requestOptions ...func(*http.Request)) (force.JobInfo, error) + BulkQueryWithContext(ctx context.Context, soql string, jobID string, contentType string, requestOptions ...func(*http.Request)) (force.BatchInfo, error) CloseBulkJobWithContext(context context.Context, jobID string) (force.JobInfo, error) - GetJobInfo(jobID string) (force.JobInfo, error) - GetBatches(jobID string) ([]force.BatchInfo, error) - GetBatchInfo(jobID string, batchID string) (force.BatchInfo, error) - RetrieveBulkQuery(jobID string, batchID string) ([]byte, error) - RetrieveBulkJobQueryResultsWithCallback(job force.JobInfo, batchID string, resultID string, callback force.HttpCallback) error + GetJobInfoWithContext(ctx context.Context, jobID string) (force.JobInfo, error) + GetBatchesWithContext(ctx context.Context, jobID string) ([]force.BatchInfo, error) + GetBatchInfoWithContext(ctx context.Context, jobID string, batchID string) (force.BatchInfo, error) + RetrieveBulkQueryWithContext(ctx context.Context, jobID string, batchID string) ([]byte, error) + RetrieveBulkJobQueryResultsWithCallbackWithContext(ctx context.Context, job force.JobInfo, batchID string, resultID string, callback force.HttpCallback) error } type batchResult struct { @@ -80,7 +80,7 @@ func makeBulkJob(session Forceable, objectName, query string, queryAll bool, log func (c *connection) startJob(ctx context.Context, j *bulkJob) error { session := j.session - jobInfo, err := session.CreateBulkJob(j.job, func(request *http.Request) { + jobInfo, err := session.CreateBulkJobWithContext(ctx, j.job, func(request *http.Request) { if isPKChunkingEnabled(j) { pkChunkHeader := "chunkSize=" + strconv.Itoa(j.pkChunkSize) parent := parentObject(j.objectName) @@ -98,7 +98,7 @@ func (c *connection) startJob(ctx context.Context, j *bulkJob) error { } return err } - result, err := session.BulkQuery(j.query, jobInfo.Id, j.job.ContentType) + result, err := session.BulkQueryWithContext(ctx, j.query, jobInfo.Id, j.job.ContentType) if err != nil { return errors.New("bulk query failed with " + err.Error()) } @@ -106,7 +106,7 @@ func (c *connection) startJob(ctx context.Context, j *bulkJob) error { // wait for chunking to complete if isPKChunkingEnabled(j) { for { - batchInfo, err := session.GetBatchInfo(jobInfo.Id, batchID) + batchInfo, err := session.GetBatchInfoWithContext(ctx, jobInfo.Id, batchID) if err != nil { return errors.New("bulk job status failed with " + err.Error()) } @@ -115,11 +115,14 @@ func (c *connection) startJob(ctx context.Context, j *bulkJob) error { break } c.logger.Info("Waiting for pk chunking to complete") - time.Sleep(2000 * time.Millisecond) + select { + case <-time.After(2 * time.Second): + case <-ctx.Done(): + return fmt.Errorf("startJob cancelled: %w", ctx.Err()) + } } } - c.logger.Info("Waiting for pk chunking to complete") jobInfo, err = session.CloseBulkJobWithContext(ctx, jobInfo.Id) if err != nil { return err @@ -127,7 +130,7 @@ func (c *connection) startJob(ctx context.Context, j *bulkJob) error { var status force.JobInfo for { - status, err = session.GetJobInfo(jobInfo.Id) + status, err = session.GetJobInfoWithContext(ctx, jobInfo.Id) if err != nil { return errors.New("bulk job status failed with " + err.Error()) } @@ -135,7 +138,11 @@ func (c *connection) startJob(ctx context.Context, j *bulkJob) error { break } c.logger.Info("Waiting for bulk export to complete") - time.Sleep(2000 * time.Millisecond) + select { + case <-time.After(2 * time.Second): + case <-ctx.Done(): + return fmt.Errorf("startJob cancelled: %w", ctx.Err()) + } } j.job = status @@ -156,7 +163,7 @@ func (j *bulkJob) getBatches(ctx context.Context) error { if isPKChunkingEnabled(j) { var allBatches []force.BatchInfo - allBatches, err = j.session.GetBatches(j.job.Id) + allBatches, err = j.session.GetBatchesWithContext(ctx, j.job.Id) // for pk chunking enabled jobs the first batch has no results if allBatches != nil { if allBatches[0].State == "Failed" { @@ -170,7 +177,7 @@ func (j *bulkJob) getBatches(ctx context.Context) error { } } } else { - batch, berr := j.session.GetBatchInfo(j.jobID, j.batchID) + batch, berr := j.session.GetBatchInfoWithContext(ctx, j.jobID, j.batchID) err = berr batches = []force.BatchInfo{batch} } @@ -178,7 +185,7 @@ func (j *bulkJob) getBatches(ctx context.Context) error { return fmt.Errorf("%s %w", errorMessage+"batch status failed with ", err) } for _, b := range batches { - results, err := getBatchResults(j.session, j.job, b) + results, err := getBatchResults(ctx, j.session, j.job, b) if err != nil { return fmt.Errorf("%s %w", errorMessage+"batch results failed with ", err) } @@ -187,7 +194,7 @@ func (j *bulkJob) getBatches(ctx context.Context) error { return nil } -func (j *bulkJob) retrieveJobResult(result int) (string, error) { +func (j *bulkJob) retrieveJobResult(ctx context.Context, result int) (string, error) { batchResult := j.results[result] writer, err := os.CreateTemp("", "batchResult-"+batchResult.resultID+"-*.csv") if err != nil { @@ -197,8 +204,8 @@ func (j *bulkJob) retrieveJobResult(result int) (string, error) { writer.Close() }() - httpBody := fetchBatchResult(j, batchResult, j.logger) - err = readAndWriteBody(j, httpBody, writer) + httpBody := fetchBatchResult(ctx, j, batchResult, j.logger) + err = readAndWriteBody(ctx, j, httpBody, writer) if closer, ok := httpBody.(io.ReadCloser); ok { closer.Close() } @@ -208,7 +215,7 @@ func (j *bulkJob) retrieveJobResult(result int) (string, error) { return writer.Name(), nil } -func fetchBatchResult(j *bulkJob, resultInfo batchResult, logger *zap.Logger) io.Reader { +func fetchBatchResult(ctx context.Context, j *bulkJob, resultInfo batchResult, logger *zap.Logger) io.Reader { errorMessage := "Could not fetch job result. Reason: " if resultInfo.batch.State == "Failed" { @@ -220,7 +227,7 @@ func fetchBatchResult(j *bulkJob, resultInfo batchResult, logger *zap.Logger) io return bytes.NewReader(nil) } var result io.Reader - err := j.session.RetrieveBulkJobQueryResultsWithCallback(j.job, resultInfo.batch.Id, resultInfo.resultID, func(r *http.Response) error { + err := j.session.RetrieveBulkJobQueryResultsWithCallbackWithContext(ctx, j.job, resultInfo.batch.Id, resultInfo.resultID, func(r *http.Response) error { result = r.Body return nil }) @@ -231,7 +238,7 @@ func fetchBatchResult(j *bulkJob, resultInfo batchResult, logger *zap.Logger) io return result } -func readAndWriteBody(j *bulkJob, httpBody io.Reader, w io.Writer) error { +func readAndWriteBody(ctx context.Context, j *bulkJob, httpBody io.Reader, w io.Writer) error { recReader := j.RecordReader(httpBody) for { records, err := recReader.Next() @@ -243,15 +250,20 @@ func readAndWriteBody(j *bulkJob, httpBody io.Reader, w io.Writer) error { if _, err := io.Copy(w, bytes.NewReader(records.Bytes)); err != nil { return fmt.Errorf("write failed: %w", err) } + select { + case <-ctx.Done(): + return fmt.Errorf("readAndWriteBody cancelled: %w", ctx.Err()) + default: + } } } // Get all of the results for a batch. Most batches have one results, but // large batches can be split into multiple result files. -func getBatchResults(session Forceable, job force.JobInfo, batch force.BatchInfo) ([]batchResult, error) { +func getBatchResults(ctx context.Context, session Forceable, job force.JobInfo, batch force.BatchInfo) ([]batchResult, error) { var resultIDs []string var results []batchResult - jobInfo, err := session.RetrieveBulkQuery(job.Id, batch.Id) + jobInfo, err := session.RetrieveBulkQueryWithContext(ctx, job.Id, batch.Id) if err != nil { return nil, err } diff --git a/runtime/drivers/salesforce/sql_store.go b/runtime/drivers/salesforce/sql_store.go index b5d14d77339b..fad155b7c741 100644 --- a/runtime/drivers/salesforce/sql_store.go +++ b/runtime/drivers/salesforce/sql_store.go @@ -76,7 +76,6 @@ func (c *connection) QueryAsFiles(ctx context.Context, props map[string]any, opt job := makeBulkJob(session, srcProps.SObject, srcProps.SOQL, srcProps.QueryAll, c.logger) - // TODO: Handle context in startJob err = c.startJob(ctx, job) if err != nil { return nil, err @@ -124,7 +123,7 @@ func (j *bulkJob) Next() ([]string, error) { if j.nextResult == len(j.results) { return nil, io.EOF } - tempFile, err := j.retrieveJobResult(j.nextResult) + tempFile, err := j.retrieveJobResult(context.Background(), j.nextResult) if err != nil { return nil, fmt.Errorf("Failed to retrieve batch: %w", err) }