diff --git a/migrations/20231128_jellyfish_migration/20231128_jellyfish_migration.go b/migrations/20231128_jellyfish_migration/20231128_jellyfish_migration.go index c4b44da3b..84430b88f 100644 --- a/migrations/20231128_jellyfish_migration/20231128_jellyfish_migration.go +++ b/migrations/20231128_jellyfish_migration/20231128_jellyfish_migration.go @@ -402,7 +402,7 @@ func (m *Migration) fetchAndUpdateBatch() bool { func (m *Migration) writeBatchUpdates() (int, error) { var getBatches = func(chunkSize int) [][]mongo.WriteModel { - log.Printf("updates to apply count: %d", len(m.updates)) + //log.Printf("updates to apply count: %d", len(m.updates)) batches := [][]mongo.WriteModel{} for i := 0; i < len(m.updates); i += chunkSize { end := i + chunkSize @@ -414,7 +414,7 @@ func (m *Migration) writeBatchUpdates() (int, error) { return batches } updateCount := 0 - log.Printf("write batch size %d", *m.writeBatchSize) + //log.Printf("write batch size %d", *m.writeBatchSize) for _, batch := range getBatches(int(*m.writeBatchSize)) { if err := m.blockUntilDBReady(); err != nil { log.Printf("writeBatchUpdates-blocking error: %s", err) @@ -425,18 +425,15 @@ func (m *Migration) writeBatchUpdates() (int, error) { return updateCount, err } log.Printf("updates to write %d", len(batch)) - log.Printf("first to write %v", batch[0]) if m.dryRun { updateCount += len(batch) log.Println("dry run so not applying changes") continue } - bulkOption := options.BulkWriteOptions{} - bulkOption.SetOrdered(true) if deviceC := m.getDataCollection(); deviceC != nil { - results, err := deviceC.BulkWrite(m.ctx, batch, &bulkOption) + results, err := deviceC.BulkWrite(m.ctx, batch) if err != nil { log.Printf("error writing batch updates %v", err) return updateCount, err diff --git a/migrations/20231128_jellyfish_migration/utils/utils.go b/migrations/20231128_jellyfish_migration/utils/utils.go index ee56913d4..1aa2159ba 100644 --- a/migrations/20231128_jellyfish_migration/utils/utils.go +++ b/migrations/20231128_jellyfish_migration/utils/utils.go @@ -68,23 +68,27 @@ func GetBGValuePlatformPrecision(mmolVal float64) float64 { func GetDatumUpdates(bsonData bson.M) (string, bson.M, error) { updates := bson.M{} - datumID := "" var identityFields []string // while doing test runs - var errorDebug = func(err error) (string, bson.M, error) { - log.Printf("[%s] error [%s] creating hash for datum %v", datumID, err, bsonData) - return datumID, nil, err + var errorDebug = func(id string, err error) (string, bson.M, error) { + log.Printf("[%s] error [%s] creating hash for datum %v", id, err, bsonData) + return id, nil, err + } + + datumID, ok := bsonData["_id"].(string) + if !ok { + return errorDebug("", errors.New("cannot get the datum id")) } datumType, ok := bsonData["type"].(string) if !ok { - return errorDebug(errors.New("cannot get the datum type")) + return errorDebug(datumID, errors.New("cannot get the datum type")) } dataBytes, err := bson.Marshal(bsonData) if err != nil { - return errorDebug(err) + return errorDebug(datumID, err) } switch datumType { @@ -92,61 +96,57 @@ func GetDatumUpdates(bsonData bson.M) (string, bson.M, error) { var datum *basal.Basal err = bson.Unmarshal(dataBytes, &datum) if err != nil { - return errorDebug(err) + return errorDebug(datumID, err) } - datumID = *datum.ID identityFields, err = datum.IdentityFields() log.Printf("basal %s id %v", datumID, identityFields) if err != nil { - return errorDebug(err) + return errorDebug(datumID, err) } case bolus.Type: var datum *bolus.Bolus err = bson.Unmarshal(dataBytes, &datum) if err != nil { - return errorDebug(err) + return errorDebug(datumID, err) } - datumID = *datum.ID identityFields, err = datum.IdentityFields() log.Printf("bolus %s id %v", datumID, identityFields) if err != nil { - return errorDebug(err) + return errorDebug(datumID, err) } case device.Type: var datum *bolus.Bolus err = bson.Unmarshal(dataBytes, &datum) if err != nil { - return errorDebug(err) + return errorDebug(datumID, err) } - datumID = *datum.ID identityFields, err = datum.IdentityFields() log.Printf("device %s id %v", datumID, identityFields) if err != nil { - return errorDebug(err) + return errorDebug(datumID, err) } case pump.Type: var datum *pump.Pump err = bson.Unmarshal(dataBytes, &datum) if err != nil { - return errorDebug(err) + return errorDebug(datumID, err) } - datumID = *datum.ID identityFields, err = datum.IdentityFields() log.Printf("pump %s id %v", datumID, identityFields) if err != nil { - return errorDebug(err) + return errorDebug(datumID, err) } boluses, err := updateIfExistsPumpSettingsBolus(bsonData) if err != nil { - return errorDebug(err) + return errorDebug(datumID, err) } else if boluses != nil { updates["boluses"] = boluses } sleepSchedules, err := updateIfExistsPumpSettingsSleepSchedules(datum) if err != nil { - return errorDebug(err) + return errorDebug(datumID, err) } else if sleepSchedules != nil { updates["sleepSchedules"] = sleepSchedules } @@ -155,9 +155,8 @@ func GetDatumUpdates(bsonData bson.M) (string, bson.M, error) { var datum *selfmonitored.SelfMonitored err = bson.Unmarshal(dataBytes, &datum) if err != nil { - return errorDebug(err) + return errorDebug(datumID, err) } - datumID = *datum.ID if *datum.Units != glucose.MgdL && *datum.Units != glucose.Mgdl { // NOTE: we need to ensure the same precision for the // converted value as it is used to calculate the hash @@ -168,15 +167,14 @@ func GetDatumUpdates(bsonData bson.M) (string, bson.M, error) { log.Printf("smbg %s id %v", datumID, identityFields) if err != nil { - return errorDebug(err) + return errorDebug(datumID, err) } case ketone.Type: var datum *ketone.Ketone err = bson.Unmarshal(dataBytes, &datum) if err != nil { - return errorDebug(err) + return errorDebug(datumID, err) } - datumID = *datum.ID if *datum.Units != glucose.MgdL && *datum.Units != glucose.Mgdl { // NOTE: we need to ensure the same precision for the // converted value as it is used to calculate the hash @@ -186,15 +184,14 @@ func GetDatumUpdates(bsonData bson.M) (string, bson.M, error) { identityFields, err = datum.IdentityFields() log.Printf("ketone %s id %v", datumID, identityFields) if err != nil { - return errorDebug(err) + return errorDebug(datumID, err) } case continuous.Type: var datum *continuous.Continuous err = bson.Unmarshal(dataBytes, &datum) if err != nil { - return errorDebug(err) + return errorDebug(datumID, err) } - datumID = *datum.ID if *datum.Units != glucose.MgdL && *datum.Units != glucose.Mgdl { // NOTE: we need to ensure the same precision for the // converted value as it is used to calculate the hash @@ -204,26 +201,24 @@ func GetDatumUpdates(bsonData bson.M) (string, bson.M, error) { identityFields, err = datum.IdentityFields() log.Printf("cbg %s id %v", datumID, identityFields) if err != nil { - return errorDebug(err) + return errorDebug(datumID, err) } default: var datum *types.Base err = bson.Unmarshal(dataBytes, &datum) if err != nil { - return errorDebug(err) + return errorDebug(datumID, err) } - datumID = *datum.ID identityFields, err = datum.IdentityFields() - log.Printf("default %s id %v", datumID, identityFields) if err != nil { - return errorDebug(err) + return errorDebug(datumID, err) } } hash, err := deduplicator.GenerateIdentityHash(identityFields) if err != nil { - return errorDebug(err) + return errorDebug(datumID, err) } updates["_deduplicator"] = bson.M{"hash": hash}