Skip to content

Commit

Permalink
use client connection
Browse files Browse the repository at this point in the history
  • Loading branch information
jh-bate committed Dec 7, 2023
1 parent 2cdc4c6 commit 8736360
Showing 1 changed file with 76 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"errors"
"fmt"
"log"
"math"
Expand Down Expand Up @@ -38,8 +39,6 @@ type Migration struct {
config *Config
//*migrationMongo.Migration
client *mongo.Client
oplogC *mongo.Collection
deviceDataC *mongo.Collection
writeBatchSize *int64
updates []mongo.WriteModel
}
Expand Down Expand Up @@ -74,6 +73,14 @@ func (m *Migration) RunAndExit() {
}

m.CLI().Action = func(ctx *cli.Context) error {
log.Println("prepare")
var err error
m.client, err = mongo.Connect(m.ctx, options.Client().ApplyURI(m.config.uri))
if err != nil {
return fmt.Errorf("unable to connect to MongoDB: %w", err)
}
defer m.client.Disconnect(m.ctx)

log.Printf("config %#v", m.config)
if err := m.prepare(); err != nil {
log.Printf("prepare failed: %s", err)
Expand Down Expand Up @@ -173,30 +180,25 @@ func (m *Migration) CLI() *cli.App {
return m.cli
}

func (m *Migration) prepare() error {
log.Println("prepare")
var err error
m.client, err = mongo.Connect(m.ctx, options.Client().ApplyURI(m.config.uri))
if err != nil {
return fmt.Errorf("unable to connect to MongoDB: %w", err)
}
defer m.client.Disconnect(m.ctx)

m.oplogC = m.client.Database("local").Collection(oplogName)
m.deviceDataC = m.client.Database("data").Collection("deviceData")
func (m *Migration) getDataCollection() *mongo.Collection {
return m.client.Database("data").Collection("deviceData")
}
func (m *Migration) getOplogCollection() *mongo.Collection {
return m.client.Database("local").Collection(oplogName)
}

func (m *Migration) prepare() error {
if err := m.checkFreeSpace(); err != nil {
return err
}

err = m.setWriteBatchSize()
if err != nil {
if err := m.setWriteBatchSize(); err != nil {
return err
}
return nil
}

func (m *Migration) execute() error {
log.Println("about to run execute")
totalMigrated := 0
for m.fetchAndUpdateBatch() {
updatedCount, err := m.writeBatchUpdates()
Expand All @@ -214,16 +216,16 @@ func (m *Migration) getOplogDuration() (time.Duration, error) {
type MongoMetaData struct {
Wall time.Time `json:"wall"`
}
if m.oplogC != nil {
if oplogC := m.getOplogCollection(); oplogC != nil {
var oldest MongoMetaData
if err := m.oplogC.FindOne(
if err := oplogC.FindOne(
m.ctx, bson.M{},
options.FindOne().SetSort("$natural"),
options.FindOne().SetProjection(bson.M{"wall": 1})).Decode(&oldest); err != nil {
return 0, err
}
var newest MongoMetaData
if err := m.oplogC.FindOne(m.ctx,
if err := oplogC.FindOne(m.ctx,
bson.M{},
options.FindOne().SetSort("-$natural"),
options.FindOne().SetProjection(bson.M{"wall": 1})).Decode(&newest); err != nil {
Expand All @@ -245,13 +247,13 @@ func calculateBatchSize(oplogSize int, oplogEntryBytes int, oplogMinWindow int,

func (m *Migration) setWriteBatchSize() error {
log.Println("set write batch size...")
if m.oplogC != nil {
if oplogC := m.getOplogCollection(); oplogC != nil {
log.Println("Getting oplog stats...")
type MongoMetaData struct {
MaxSize int `json:"maxSize"`
}
var metaData MongoMetaData
if err := m.oplogC.Database().RunCommand(m.ctx, bson.M{"collStats": oplogName}).Decode(&metaData); err != nil {
if err := oplogC.Database().RunCommand(m.ctx, bson.M{"collStats": oplogName}).Decode(&metaData); err != nil {
return err
}
log.Printf("oplog maxSize: %d", metaData.MaxSize)
Expand All @@ -274,19 +276,24 @@ func (m *Migration) checkFreeSpace() error {
}
var metaData MongoMetaData
log.Println("Getting DB free space...")
err := m.deviceDataC.Database().RunCommand(m.ctx, bson.M{"dbStats": 1}).Decode(&metaData)
if err != nil {
return err
}
log.Printf("DB free space: %v", metaData)
bytesFree := metaData.FsTotalSize - metaData.FsUsedSize
percentFree := int(math.Floor(float64(bytesFree) / float64(metaData.FsTotalSize) * 100))
log.Printf("DB disk currently has %d%% (%d bytes) free.", percentFree, bytesFree)

if m.config.minFreePercent > percentFree {
return fmt.Errorf("error %d%% is below minimum free space of %d%%", percentFree, m.config.minFreePercent)
if dataC := m.getDataCollection(); dataC != nil {

err := dataC.Database().RunCommand(m.ctx, bson.M{"dbStats": 1}).Decode(&metaData)
if err != nil {
return err
}
log.Printf("DB free space: %v", metaData)
bytesFree := metaData.FsTotalSize - metaData.FsUsedSize
percentFree := int(math.Floor(float64(bytesFree) / float64(metaData.FsTotalSize) * 100))
log.Printf("DB disk currently has %d%% (%d bytes) free.", percentFree, bytesFree)

if m.config.minFreePercent > percentFree {
return fmt.Errorf("error %d%% is below minimum free space of %d%%", percentFree, m.config.minFreePercent)
}
return nil
}
return nil
return errors.New("could not get deviceData database")
}

func (m *Migration) getWaitTime() (float64, error) {
Expand Down Expand Up @@ -357,6 +364,7 @@ func (m *Migration) blockUntilDBReady() error {
}

func (m *Migration) fetchAndUpdateBatch() bool {

selector := bson.M{
// jellyfish uses a generated _id that is not an mongo objectId
"_id": bson.M{"$not": bson.M{"$type": "objectId"}},
Expand All @@ -366,45 +374,49 @@ func (m *Migration) fetchAndUpdateBatch() bool {
}
m.updates = []mongo.WriteModel{}

dDataCursor, err := m.deviceDataC.Find(m.ctx, selector,
&options.FindOptions{Limit: &m.config.readBatchSize},
)
if err != nil {
log.Printf("failed to select data: %s", err)
return false
}

var dDataResult bson.M

defer dDataCursor.Close(m.ctx)
for dDataCursor.Next(m.ctx) {
err = dDataCursor.Decode(&dDataResult)
if dataC := m.getDataCollection(); dataC != nil {
dDataCursor, err := dataC.Find(m.ctx, selector,
&options.FindOptions{Limit: &m.config.readBatchSize},
)
if err != nil {
log.Printf("failed decoding data: %s", err)
log.Printf("failed to select data: %s", err)
return false
}

datumID, err := utils.GetValidatedString(dDataResult, "_id")
if err != nil {
log.Printf("failed getting dutum _id: %s", err)
return false
}
var dDataResult bson.M

updates, err := utils.GetDatumUpdates(dDataResult)
if err != nil {
log.Printf("failed getting datum updates: %s", err)
return false
}
defer dDataCursor.Close(m.ctx)
for dDataCursor.Next(m.ctx) {
err = dDataCursor.Decode(&dDataResult)
if err != nil {
log.Printf("failed decoding data: %s", err)
return false
}

m.updates = append(m.updates, mongo.NewUpdateOneModel().SetFilter(
bson.M{
"_id": datumID,
"modifiedTime": dDataResult["modifiedTime"],
}).SetUpdate(bson.M{
"$set": updates,
}))
datumID, err := utils.GetValidatedString(dDataResult, "_id")
if err != nil {
log.Printf("failed getting dutum _id: %s", err)
return false
}

updates, err := utils.GetDatumUpdates(dDataResult)
if err != nil {
log.Printf("failed getting datum updates: %s", err)
return false
}

m.updates = append(m.updates, mongo.NewUpdateOneModel().SetFilter(
bson.M{
"_id": datumID,
"modifiedTime": dDataResult["modifiedTime"],
}).SetUpdate(bson.M{
"$set": updates,
}))
}
return len(m.updates) > 0
}
return len(m.updates) > 0
log.Println("get deviceData collection ")
return false
}

func (m *Migration) writeBatchUpdates() (int, error) {
Expand Down

0 comments on commit 8736360

Please sign in to comment.