Skip to content

Commit

Permalink
fix _id
Browse files Browse the repository at this point in the history
  • Loading branch information
jh-bate committed Dec 18, 2023
1 parent 97216cc commit 5e1acbe
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func (m *Migration) fetchAndUpdateBatch() bool {

func (m *Migration) writeBatchUpdates() (int, error) {
var getBatches = func(chunkSize int) [][]mongo.WriteModel {
log.Printf("updates to apply count: %d", len(m.updates))
//log.Printf("updates to apply count: %d", len(m.updates))
batches := [][]mongo.WriteModel{}
for i := 0; i < len(m.updates); i += chunkSize {
end := i + chunkSize
Expand All @@ -414,7 +414,7 @@ func (m *Migration) writeBatchUpdates() (int, error) {
return batches
}
updateCount := 0
log.Printf("write batch size %d", *m.writeBatchSize)
//log.Printf("write batch size %d", *m.writeBatchSize)
for _, batch := range getBatches(int(*m.writeBatchSize)) {
if err := m.blockUntilDBReady(); err != nil {
log.Printf("writeBatchUpdates-blocking error: %s", err)
Expand All @@ -425,18 +425,15 @@ func (m *Migration) writeBatchUpdates() (int, error) {
return updateCount, err
}
log.Printf("updates to write %d", len(batch))
log.Printf("first to write %v", batch[0])

if m.dryRun {
updateCount += len(batch)
log.Println("dry run so not applying changes")
continue
}
bulkOption := options.BulkWriteOptions{}
bulkOption.SetOrdered(true)

if deviceC := m.getDataCollection(); deviceC != nil {
results, err := deviceC.BulkWrite(m.ctx, batch, &bulkOption)
results, err := deviceC.BulkWrite(m.ctx, batch)
if err != nil {
log.Printf("error writing batch updates %v", err)
return updateCount, err
Expand Down
63 changes: 29 additions & 34 deletions migrations/20231128_jellyfish_migration/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,85 +68,85 @@ func GetBGValuePlatformPrecision(mmolVal float64) float64 {

func GetDatumUpdates(bsonData bson.M) (string, bson.M, error) {
updates := bson.M{}
datumID := ""
var identityFields []string

// while doing test runs
var errorDebug = func(err error) (string, bson.M, error) {
log.Printf("[%s] error [%s] creating hash for datum %v", datumID, err, bsonData)
return datumID, nil, err
var errorDebug = func(id string, err error) (string, bson.M, error) {
log.Printf("[%s] error [%s] creating hash for datum %v", id, err, bsonData)
return id, nil, err
}

datumID, ok := bsonData["_id"].(string)
if !ok {
return errorDebug("", errors.New("cannot get the datum id"))
}

datumType, ok := bsonData["type"].(string)
if !ok {
return errorDebug(errors.New("cannot get the datum type"))
return errorDebug(datumID, errors.New("cannot get the datum type"))
}

dataBytes, err := bson.Marshal(bsonData)
if err != nil {
return errorDebug(err)
return errorDebug(datumID, err)
}

switch datumType {
case basal.Type:
var datum *basal.Basal
err = bson.Unmarshal(dataBytes, &datum)
if err != nil {
return errorDebug(err)
return errorDebug(datumID, err)
}
datumID = *datum.ID
identityFields, err = datum.IdentityFields()
log.Printf("basal %s id %v", datumID, identityFields)
if err != nil {
return errorDebug(err)
return errorDebug(datumID, err)
}
case bolus.Type:
var datum *bolus.Bolus
err = bson.Unmarshal(dataBytes, &datum)
if err != nil {
return errorDebug(err)
return errorDebug(datumID, err)
}
datumID = *datum.ID
identityFields, err = datum.IdentityFields()
log.Printf("bolus %s id %v", datumID, identityFields)
if err != nil {
return errorDebug(err)
return errorDebug(datumID, err)
}
case device.Type:
var datum *bolus.Bolus
err = bson.Unmarshal(dataBytes, &datum)
if err != nil {
return errorDebug(err)
return errorDebug(datumID, err)
}
datumID = *datum.ID
identityFields, err = datum.IdentityFields()
log.Printf("device %s id %v", datumID, identityFields)
if err != nil {
return errorDebug(err)
return errorDebug(datumID, err)
}
case pump.Type:
var datum *pump.Pump
err = bson.Unmarshal(dataBytes, &datum)
if err != nil {
return errorDebug(err)
return errorDebug(datumID, err)
}
datumID = *datum.ID
identityFields, err = datum.IdentityFields()
log.Printf("pump %s id %v", datumID, identityFields)
if err != nil {
return errorDebug(err)
return errorDebug(datumID, err)
}

boluses, err := updateIfExistsPumpSettingsBolus(bsonData)
if err != nil {
return errorDebug(err)
return errorDebug(datumID, err)
} else if boluses != nil {
updates["boluses"] = boluses
}

sleepSchedules, err := updateIfExistsPumpSettingsSleepSchedules(datum)
if err != nil {
return errorDebug(err)
return errorDebug(datumID, err)
} else if sleepSchedules != nil {
updates["sleepSchedules"] = sleepSchedules
}
Expand All @@ -155,9 +155,8 @@ func GetDatumUpdates(bsonData bson.M) (string, bson.M, error) {
var datum *selfmonitored.SelfMonitored
err = bson.Unmarshal(dataBytes, &datum)
if err != nil {
return errorDebug(err)
return errorDebug(datumID, err)
}
datumID = *datum.ID
if *datum.Units != glucose.MgdL && *datum.Units != glucose.Mgdl {
// NOTE: we need to ensure the same precision for the
// converted value as it is used to calculate the hash
Expand All @@ -168,15 +167,14 @@ func GetDatumUpdates(bsonData bson.M) (string, bson.M, error) {

log.Printf("smbg %s id %v", datumID, identityFields)
if err != nil {
return errorDebug(err)
return errorDebug(datumID, err)
}
case ketone.Type:
var datum *ketone.Ketone
err = bson.Unmarshal(dataBytes, &datum)
if err != nil {
return errorDebug(err)
return errorDebug(datumID, err)
}
datumID = *datum.ID
if *datum.Units != glucose.MgdL && *datum.Units != glucose.Mgdl {
// NOTE: we need to ensure the same precision for the
// converted value as it is used to calculate the hash
Expand All @@ -186,15 +184,14 @@ func GetDatumUpdates(bsonData bson.M) (string, bson.M, error) {
identityFields, err = datum.IdentityFields()
log.Printf("ketone %s id %v", datumID, identityFields)
if err != nil {
return errorDebug(err)
return errorDebug(datumID, err)
}
case continuous.Type:
var datum *continuous.Continuous
err = bson.Unmarshal(dataBytes, &datum)
if err != nil {
return errorDebug(err)
return errorDebug(datumID, err)
}
datumID = *datum.ID
if *datum.Units != glucose.MgdL && *datum.Units != glucose.Mgdl {
// NOTE: we need to ensure the same precision for the
// converted value as it is used to calculate the hash
Expand All @@ -204,26 +201,24 @@ func GetDatumUpdates(bsonData bson.M) (string, bson.M, error) {
identityFields, err = datum.IdentityFields()
log.Printf("cbg %s id %v", datumID, identityFields)
if err != nil {
return errorDebug(err)
return errorDebug(datumID, err)
}
default:
var datum *types.Base
err = bson.Unmarshal(dataBytes, &datum)
if err != nil {
return errorDebug(err)
return errorDebug(datumID, err)
}
datumID = *datum.ID
identityFields, err = datum.IdentityFields()

log.Printf("default %s id %v", datumID, identityFields)
if err != nil {
return errorDebug(err)
return errorDebug(datumID, err)
}
}

hash, err := deduplicator.GenerateIdentityHash(identityFields)
if err != nil {
return errorDebug(err)
return errorDebug(datumID, err)
}
updates["_deduplicator"] = bson.M{"hash": hash}

Expand Down

0 comments on commit 5e1acbe

Please sign in to comment.