Skip to content

Commit

Permalink
Added BackgroundJobCheckpoint to database repository interface.
Browse files Browse the repository at this point in the history
This function allows the calling function to get periodic updates regarding the status of the sync (and eventually to continue where it left off).

Make sure that the resource names are unique and sorted before beginning sync.
  • Loading branch information
AnalogJ committed Oct 7, 2023
1 parent 515bbf7 commit 414ce1e
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 2 deletions.
35 changes: 33 additions & 2 deletions clients/internal/base/fhir401_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/samber/lo"
"github.com/sirupsen/logrus"
"net/http"
"sort"
"strings"
)

Expand Down Expand Up @@ -79,6 +80,13 @@ func (c *SourceClientFHIR401) SyncAllByPatientEverythingBundle(db models.Databas
c.Logger.Errorf("%d error(s) occurred during sync. \n %v", len(syncErrors), syncErrors)
}

db.BackgroundJobCheckpoint(c.Context,
map[string]interface{}{
"stage": "PendingResources",
"summary": summary,
},
map[string]interface{}{"errors": syncErrors},
)
return summary, nil
}

Expand Down Expand Up @@ -125,6 +133,8 @@ func (c *SourceClientFHIR401) SyncAllByResourceName(db models.DatabaseRepository
lookupResourceReferences := map[string]bool{}

//query for resources by resource name
resourceNames = lo.Uniq(resourceNames)
sort.Strings(resourceNames)
for _, resourceType := range resourceNames {
bundle, err := c.GetResourceBundle(fmt.Sprintf("%s?patient=%s", resourceType, c.SourceCredential.GetPatientId()))
if err != nil {
Expand All @@ -145,17 +155,38 @@ func (c *SourceClientFHIR401) SyncAllByResourceName(db models.DatabaseRepository
syncErrors[resourceType] = err
continue
}
}

checkpointErrorData := map[string]interface{}{}
if len(syncErrors) > 0 {
checkpointErrorData["errors"] = syncErrors
}

db.BackgroundJobCheckpoint(c.Context,
map[string]interface{}{
"stage": resourceType,
"summary": summary,
},
checkpointErrorData,
)
}

//process any pending resources
lookupResourceReferences, syncErrors = c.ProcessPendingResources(db, &summary, lookupResourceReferences, syncErrors)

checkpointErrorData := map[string]interface{}{}
if len(syncErrors) > 0 {
//TODO: ignore errors.
//ignore errors.
c.Logger.Errorf("%d error(s) occurred during sync. \n %v", len(syncErrors), syncErrors)
checkpointErrorData["errors"] = syncErrors
}
db.BackgroundJobCheckpoint(c.Context,
map[string]interface{}{
"stage": "PendingResources",
"summary": summary,
},
checkpointErrorData,
)
return summary, nil
}

Expand Down Expand Up @@ -290,7 +321,7 @@ func (c *SourceClientFHIR401) GetResourceBundle(relativeResourcePath string) (in
}
}

c.Logger.Infof("BUNDLE - %v", bundle)
//c.Logger.Debugf("BUNDLE - %v", bundle)
return bundle, err

}
Expand Down
1 change: 1 addition & 0 deletions clients/models/database_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ import (
//go:generate mockgen -source=database_repository.go -destination=mock/mock_database_repository.go
type DatabaseRepository interface {
UpsertRawResource(ctx context.Context, sourceCredentials SourceCredential, rawResource RawResourceFhir) (bool, error)
BackgroundJobCheckpoint(ctx context.Context, checkpointData map[string]interface{}, errorData map[string]interface{})
}

0 comments on commit 414ce1e

Please sign in to comment.