Skip to content

Commit

Permalink
checkpoint every 100 resources - for more frequent progress updates.
Browse files Browse the repository at this point in the history
  • Loading branch information
AnalogJ committed Oct 13, 2023
1 parent 02c515f commit 3fc1ed1
Showing 1 changed file with 44 additions and 3 deletions.
47 changes: 44 additions & 3 deletions clients/internal/base/fhir401_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,23 @@ func (c *SourceClientFHIR401) SyncAllByPatientEverythingBundle(db models.Databas
//lookup table for every resource ID found by Fasten
lookupResourceReferences := map[string]bool{}

for _, apiModel := range rawResourceModels {
for ndx, apiModel := range rawResourceModels {
err = c.ProcessResource(db, apiModel, lookupResourceReferences, internalFragmentReferenceLookup, &summary)
if err != nil {
syncErrors[apiModel.SourceResourceType] = err
continue
}

if ndx%100 == 0 {
db.BackgroundJobCheckpoint(c.Context,
map[string]interface{}{
"stage": "EverythingBundle",
"stage_progress": ndx,
"summary": summary,
},
map[string]interface{}{"errors": syncErrors},
)
}
}

//process any pending resources
Expand Down Expand Up @@ -149,12 +160,27 @@ func (c *SourceClientFHIR401) SyncAllByResourceName(db models.DatabaseRepository
}
summary.TotalResources += len(rawResourceModels)

for _, apiModel := range rawResourceModels {
for ndx, apiModel := range rawResourceModels {
err = c.ProcessResource(db, apiModel, lookupResourceReferences, internalFragmentReferenceLookup, &summary)
if err != nil {
syncErrors[resourceType] = err
continue
}

if ndx%100 == 0 {
stageErrorData := map[string]interface{}{}
if len(syncErrors) > 0 {
stageErrorData["errors"] = syncErrors
}
db.BackgroundJobCheckpoint(c.Context,
map[string]interface{}{
"stage": resourceType,
"stage_progress": ndx,
"summary": summary,
},
stageErrorData,
)
}
}

checkpointErrorData := map[string]interface{}{}
Expand Down Expand Up @@ -222,7 +248,7 @@ func (c *SourceClientFHIR401) ProcessPendingResources(db models.DatabaseReposito

//process pending resources
summary.TotalResources += len(pendingResourceReferences)
for _, pendingResourceIdOrUri := range pendingResourceReferences {
for ndx, pendingResourceIdOrUri := range pendingResourceReferences {
var resourceRaw map[string]interface{}

resourceSourceUri, err := c.GetRequest(pendingResourceIdOrUri, &resourceRaw)
Expand Down Expand Up @@ -268,6 +294,21 @@ func (c *SourceClientFHIR401) ProcessPendingResources(db models.DatabaseReposito
syncErrors[pendingResourceIdOrUri] = err
continue
}

if ndx%100 == 0 {
stageErrorData := map[string]interface{}{}
if len(syncErrors) > 0 {
stageErrorData["errors"] = syncErrors
}
db.BackgroundJobCheckpoint(c.Context,
map[string]interface{}{
"stage": "PendingResources",
"stage_progress": ndx,
"summary": summary,
},
stageErrorData,
)
}
}
extractionLoopCount += 1
}
Expand Down

0 comments on commit 3fc1ed1

Please sign in to comment.