Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

how to use gtm to sync to a mongo primary #21

Open
mosesBD opened this issue Feb 2, 2020 · 21 comments
Open

how to use gtm to sync to a mongo primary #21

mosesBD opened this issue Feb 2, 2020 · 21 comments

Comments

@mosesBD
Copy link

mosesBD commented Feb 2, 2020

i,m trying to use gtm to sync two mongo instances.
first instance has been running for a while and has roughly 50GB of data.
my question is if i do a direct read and start writing to the secondary database what can i do about the changes made to the primary database while i,m reading the data since it can change after i read it
my own idea is to store the op-log while doing the direct read and then apply it to the second database.
i understand that there are already tools for this but this is part of a bigger project and i need to achieve this via code.
any better options?

@mosesBD
Copy link
Author

mosesBD commented Feb 3, 2020

i also have another issue with direct read
here is how i start it

                        fmt.Println(startDirectRead)
			ctx := gtm.Start(client, &gtm.Options{
				DirectReadNs: []string{config.SourceDB + "." + config.SourceCollection},
			})
			go printOp(ctx)
			ctx.DirectReadWg.Wait()

in my printOp go routine i simply use the code you have as an example to print operations

    for {

		select {

		case op = <-ctx.OpC:
			
			count = count + 1
			fmt.Println(count)
			break

		case err = <-ctx.ErrC:
			
			fmt.Println("got error %+v", err)
			break

		default:
			return

		}


	}

i expect this code to print until the collection is fully received then since the channel is nil the default case will run and the routine will exit however the routine hits the default case immediately
i tried to add 1 to the waitGroup with ADD() function and then calling the Done() function after routine is done.
if i remove the default case the for loop will go on forever
how can i access the collection data in direct read mode and then continue with the rest of the code?

@rwynn
Copy link
Owner

rwynn commented Feb 4, 2020

Hopefully this will help...

main.go

package main

import (
	"context"
	"flag"
	"fmt"
	"log"
	"time"

	"github.com/rwynn/gtm"
	"go.mongodb.org/mongo-driver/mongo"
	"go.mongodb.org/mongo-driver/mongo/options"
)

func readContext(ctx *gtm.OpCtx, done chan bool) {
	var errsDone, opsDone bool
	var opCnt, errCnt int
	for !errsDone || !opsDone {
		select {
		case op, open := <-ctx.OpC:
			if op == nil && !open {
				opsDone = true
				break
			}
			opCnt++
			break
		case err, open := <-ctx.ErrC:
			if err == nil && !open {
				errsDone = true
				break
			}
			errCnt++
			break
		}
	}
	fmt.Printf("Processed %d ops and %d errs\n", opCnt, errCnt)
	close(done)
}

func main() {
	var mongoURL string
	flag.StringVar(&mongoURL, "url", "mongodb://localhost:27017", "MongoDB connection URL")
	flag.Parse()
	log.Printf("Connecting to MongoDB at %s", mongoURL)
	client, err := mongo.NewClient(options.Client().ApplyURI(mongoURL))
	if err != nil {
		log.Fatalln(err)
	}
	if err = client.Connect(context.Background()); err != nil {
		log.Fatalln(err)
	}
	defer client.Disconnect(context.Background())

	done := make(chan bool)
	gtmCtx := gtm.Start(client, &gtm.Options{
		DirectReadNs:   []string{"test.test"},
		ChangeStreamNs: []string{"test.test"},
		MaxAwaitTime:   time.Duration(10) * time.Second,
		OpLogDisabled:  true,
	})
	go readContext(gtmCtx, done)
	gtmCtx.DirectReadWg.Wait()
	gtmCtx.Stop()
	<-done
	fmt.Println("All done")
}

go.mod

module test

go 1.12

require (
	github.com/DataDog/zstd v1.4.4 // indirect
	github.com/go-stack/stack v1.8.0 // indirect
	github.com/golang/snappy v0.0.1 // indirect
	github.com/google/go-cmp v0.4.0 // indirect
	github.com/minio/highwayhash v1.0.0 // indirect
	github.com/pkg/errors v0.9.1 // indirect
	github.com/rwynn/gtm v1.0.1-0.20191119151623-081995b34c9c
	github.com/serialx/hashring v0.0.0-20190515033939-7706f26af194 // indirect
	github.com/stretchr/testify v1.4.0 // indirect
	github.com/tidwall/pretty v1.0.0 // indirect
	github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c // indirect
	github.com/xdg/stringprep v1.0.0 // indirect
	go.mongodb.org/mongo-driver v1.2.1
	golang.org/x/crypto v0.0.0-20200128174031-69ecbb4d6d5d // indirect
	golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e // indirect
	golang.org/x/text v0.3.2 // indirect
)

@mosesBD
Copy link
Author

mosesBD commented Feb 4, 2020

awesome
thanks very much
one question though:
i noticed you have both ChangeStreamNs and DirectReadNs enabled also the OpLogDisabled is true
would you please clarify whether this code just reads the collection or does it also read the changes applied to it while direct read is happening.
it would be great if the Options struct had some comments

@mosesBD
Copy link
Author

mosesBD commented Feb 4, 2020

i ran the code and saw something interesting
if i print the op and err the number of processed operations and errs changes
if i don't print the op and err and just count them (like your code) it has the correct number of ops
otherwise it seems like a couple of thousands of ops are not counted
i,m doing direct read on a collection with about 3 million records
the database is not under any load so no fields change during the read

with operation print:

     Processed 2916557 ops and 0 errs
     All done

without operation print:

    Processed 2918601 ops and 0 errs
    All done

@mosesBD
Copy link
Author

mosesBD commented Feb 5, 2020

i did a bit more investigation
consider the code below

     ctx.DirectReadWg.Wait()
     fmt.Println("direct read done")
     ctx.Stop()
     <-done

after the direct read is done you have closed the ctx which i think kills the channel and causes the
readContext function to close but when i print the operations i can see the function printing operations even after the fmt.Println("direct read done") is executed which means the ctx is being closed prematurely.
any thoughts?

@rwynn
Copy link
Owner

rwynn commented Feb 5, 2020

I think it may be related to the use of buffered channels in gtm.

Try updating the readContext function I posted and give the name op to the 2 _ vars. Then only set the 2 done flags if...

op == nil && !open

Due to buffering I think that the channel is closed but still has data on it. When the channel is empty the zero value of nil for a pointer will start getting returned.

@rwynn
Copy link
Owner

rwynn commented Feb 5, 2020

Also to your previous question. With Change stream NS set, yes you will also receive changes to the collections that are happening. There are methods to check if the event source is direct read or change event. Make that an empty slice if you don’t want changes.

You will probably want OplogDisabled always set to true. This project is old and thus supports reading directly from the oplog. But you don’t want to do that anymore since change streams.

@rwynn
Copy link
Owner

rwynn commented Feb 5, 2020

i updated my original posted code to account for the buffering.

@mosesBD
Copy link
Author

mosesBD commented Feb 6, 2020

i tried the new code and just added fmt.Println(op) and fmt.Println(err) but still i lose some operations

    Processed 2916562 ops and 0 errs
    All done

it should have 2918601 ops
also change stream NS was set to empty string since database has no operations at the moment and simply has 2918601 inserts in it (backup restore)

@rwynn
Copy link
Owner

rwynn commented Feb 6, 2020

Can you post the full code that does not work? Also, does it work with or without the Println?

@mosesBD
Copy link
Author

mosesBD commented Feb 6, 2020

package main

import (
        "context"
        "flag"
        "fmt"
        "log"
        "time"

        "github.com/rwynn/gtm"
        "go.mongodb.org/mongo-driver/mongo"
        "go.mongodb.org/mongo-driver/mongo/options"
)

func readContext(ctx *gtm.OpCtx, done chan bool) {
        var errsDone, opsDone bool
        var opCnt, errCnt int
        for !errsDone && !opsDone {
                select {
                case op, open := <-ctx.OpC:
                        if op == nil && !open {
                                opsDone = true
                                break
                        }
                        opCnt++
                        fmt.Println(op.Data)
                        break
                case err, open := <-ctx.ErrC:
                        if err == nil && !open {
                                errsDone = true
                                break
                        }
                        fmt.Println(err)
                        errCnt++
                        break
                }
        }
        fmt.Printf("Processed %d ops and %d errs\n", opCnt, errCnt)
        close(done)
}

func main() {
        var mongoURL string
        flag.StringVar(&mongoURL, "url", "mongodb://172.20.11.57:27017", "MongoDB connection URL")
        flag.Parse()
        log.Printf("Connecting to MongoDB at %s", mongoURL)
        client, err := mongo.NewClient(options.Client().ApplyURI(mongoURL))
        if err != nil {
                log.Fatalln(err)
        }
        if err = client.Connect(context.Background()); err != nil {
                log.Fatalln(err)
        }
        defer client.Disconnect(context.Background())

        done := make(chan bool)
        gtmCtx := gtm.Start(client, &gtm.Options{
                DirectReadNs:   []string{"record_stats.answerlogs"},
                ChangeStreamNs: []string{},
                MaxAwaitTime:   time.Duration(10) * time.Second,
                OpLogDisabled:  true,
        })
        go readContext(gtmCtx, done)
        gtmCtx.DirectReadWg.Wait()
        fmt.Println("Direct read done")
        gtmCtx.Stop()
        <-done
        fmt.Println("All done")
}

@mosesBD
Copy link
Author

mosesBD commented Feb 6, 2020

Direct read done
Processed 2918601 ops and 0 errs
All done

works correctly without the print
so did the previous code

@rwynn
Copy link
Owner

rwynn commented Feb 6, 2020

Even when I add the Println like you have I still come up with the same number, in my case a collection of size 4000012. Using go version 1.13.4 linux/amd64 here.

That's really a mystery to me why that Print would be affecting the count.

@mosesBD
Copy link
Author

mosesBD commented Feb 6, 2020

i,m using 1.13.1 on centos7
i will try with 1.13.4 and see what happens
thank you

@mosesBD
Copy link
Author

mosesBD commented Feb 6, 2020

i tried with go 1.13.4 but no luck i also used another server with more resources to make sure there are no bottlenecks there
when i run the code i posted the text Direct read done is printed but there are operations left (still printing) before the main routine exits

Direct read done
map[_id:ObjectID("5e2794e980ea13a924dd989c") answer:-1 created_at: game_id: hint_used: question_id: question_number:7 state:NOT_ANSWERED user_id: zone_number:1]
map[_id:ObjectID("5e2794e980ea13a924dd989d") answer:-1 created_at:game_id: hint_used: question_id: question_number:8 state:NOT_ANSWERED user_id: zone_number:1]
map[_id:ObjectID("5e2495c99dae954a6cb4ea4a") answer:-1 created_at:game_id: hint_used: question_id: question_number:13 state:NOT_ANSWERED user_id: zone_number:1]
Processed 2916590 ops and 0 errs
All done

is it the same for you?

@mosesBD
Copy link
Author

mosesBD commented Feb 6, 2020

on the new server running without the print returns different results (some correct and some not correct)
looks like the problem is not with the code rather my distribution or hardware
would you please tell me your linux distro for the mongo server and mongo version you are using
also information about the hardware would be useful
i am using mongo 4.2.3 on centos7
server has 8gb ram and 8 cpu cores using ssd disk

@mosesBD
Copy link
Author

mosesBD commented Feb 6, 2020

here is my mongo logs . looks like latency issues

2020-02-06T14:10:26.264-0500 I  NETWORK  [listener] connection accepted from 127.0.0.1:50832 #74 (1 connection now open)
2020-02-06T14:10:26.265-0500 I  NETWORK  [conn74] received client metadata from 127.0.0.1:50832 conn74: { driver: { name: "mongo-go-driver", version: "v1.2.1" }, os: { type: "linux", architecture: "amd64" }, platform: "go1.13.4" }
2020-02-06T14:10:26.266-0500 I  NETWORK  [listener] connection accepted from 127.0.0.1:50834 #75 (2 connections now open)
2020-02-06T14:10:26.266-0500 I  NETWORK  [conn75] received client metadata from 127.0.0.1:50834 conn75: { driver: { name: "mongo-go-driver", version: "v1.2.1" }, os: { type: "linux", architecture: "amd64" }, platform: "go1.13.4" }
2020-02-06T14:10:26.511-0500 I  COMMAND  [conn75] command record_stats.answerlogs command: aggregate { aggregate: "answerlogs", allowDiskUse: false, pipeline: [ { $match: {} }, { $sort: { _id: 1 } }, { $skip: 291860 }, { $limit: 1 }, { $project: { _id: 1 } } ], cursor: {}, lsid: { id: UUID("23fae6e9-0c35-4852-b126-a68a1d5b5e26") }, $db: "record_stats" } planSummary: IXSCAN { _id: 1 } keysExamined:291861 docsExamined:0 cursorExhausted:1 numYields:2280 nreturned:1 queryHash:051BBA2A planCacheKey:051BBA2A reslen:137 locks:{ ReplicationStateTransition: { acquireCount: { w: 2300 } }, Global: { acquireCount: { r: 2300 } }, Database: { acquireCount: { r: 2300 } }, Collection: { acquireCount: { r: 2300 } }, Mutex: { acquireCount: { r: 20 } } } storage:{} protocol:op_msg 238ms
2020-02-06T14:10:26.512-0500 I  NETWORK  [listener] connection accepted from 127.0.0.1:50836 #76 (3 connections now open)
2020-02-06T14:10:26.513-0500 I  NETWORK  [conn76] received client metadata from 127.0.0.1:50836 conn76: { driver: { name: "mongo-go-driver", version: "v1.2.1" }, os: { type: "linux", architecture: "amd64" }, platform: "go1.13.4" }
2020-02-06T14:10:26.808-0500 I  COMMAND  [conn75] command record_stats.answerlogs command: aggregate { aggregate: "answerlogs", allowDiskUse: false, pipeline: [ { $match: { _id: { $gte: ObjectId('5dd451fdf2b4ae5988416dfd') } } }, { $sort: { _id: 1 } }, { $skip: 291860 }, { $limit: 1 }, { $project: { _id: 1 } } ], cursor: {}, lsid: { id: UUID("23fae6e9-0c35-4852-b126-a68a1d5b5e26") }, $db: "record_stats" } planSummary: IXSCAN { _id: 1 } keysExamined:291861 docsExamined:0 cursorExhausted:1 numYields:2280 nreturned:1 queryHash:58536163 planCacheKey:5F700006 reslen:137 locks:{ ReplicationStateTransition: { acquireCount: { w: 2300 } }, Global: { acquireCount: { r: 2300 } }, Database: { acquireCount: { r: 2300 } }, Collection: { acquireCount: { r: 2300 } }, Mutex: { acquireCount: { r: 20 } } } storage:{} protocol:op_msg 295ms
2020-02-06T14:10:26.809-0500 I  NETWORK  [listener] connection accepted from 127.0.0.1:50838 #77 (4 connections now open)
2020-02-06T14:10:26.809-0500 I  NETWORK  [conn77] received client metadata from 127.0.0.1:50838 conn77: { driver: { name: "mongo-go-driver", version: "v1.2.1" }, os: { type: "linux", architecture: "amd64" }, platform: "go1.13.4" }
2020-02-06T14:10:27.106-0500 I  COMMAND  [conn75] command record_stats.answerlogs command: aggregate { aggregate: "answerlogs", allowDiskUse: false, pipeline: [ { $match: { _id: { $gte: ObjectId('5dd999b8cef705b56f075a57') } } }, { $sort: { _id: 1 } }, { $skip: 291860 }, { $limit: 1 }, { $project: { _id: 1 } } ], cursor: {}, lsid: { id: UUID("23fae6e9-0c35-4852-b126-a68a1d5b5e26") }, $db: "record_stats" } planSummary: IXSCAN { _id: 1 } keysExamined:291861 docsExamined:0 cursorExhausted:1 numYields:2280 nreturned:1 queryHash:58536163 planCacheKey:5F700006 reslen:137 locks:{ ReplicationStateTransition: { acquireCount: { w: 2300 } }, Global: { acquireCount: { r: 2300 } }, Database: { acquireCount: { r: 2300 } }, Collection: { acquireCount: { r: 2300 } }, Mutex: { acquireCount: { r: 20 } } } storage:{} protocol:op_msg 297ms
2020-02-06T14:10:27.107-0500 I  NETWORK  [listener] connection accepted from 127.0.0.1:50840 #78 (5 connections now open)
2020-02-06T14:10:27.107-0500 I  NETWORK  [conn78] received client metadata from 127.0.0.1:50840 conn78: { driver: { name: "mongo-go-driver", version: "v1.2.1" }, os: { type: "linux", architecture: "amd64" }, platform: "go1.13.4" }
2020-02-06T14:10:27.414-0500 I  COMMAND  [conn75] command record_stats.answerlogs command: aggregate { aggregate: "answerlogs", allowDiskUse: false, pipeline: [ { $match: { _id: { $gte: ObjectId('5de108bf984e0f8957c747e0') } } }, { $sort: { _id: 1 } }, { $skip: 291860 }, { $limit: 1 }, { $project: { _id: 1 } } ], cursor: {}, lsid: { id: UUID("23fae6e9-0c35-4852-b126-a68a1d5b5e26") }, $db: "record_stats" } planSummary: IXSCAN { _id: 1 } keysExamined:291861 docsExamined:0 cursorExhausted:1 numYields:2280 nreturned:1 queryHash:58536163 planCacheKey:5F700006 reslen:137 locks:{ ReplicationStateTransition: { acquireCount: { w: 2300 } }, Global: { acquireCount: { r: 2300 } }, Database: { acquireCount: { r: 2300 } }, Collection: { acquireCount: { r: 2300 } }, Mutex: { acquireCount: { r: 20 } } } storage:{} protocol:op_msg 307ms
2020-02-06T14:10:27.418-0500 I  NETWORK  [listener] connection accepted from 127.0.0.1:50842 #79 (6 connections now open)
2020-02-06T14:10:27.418-0500 I  NETWORK  [conn79] received client metadata from 127.0.0.1:50842 conn79: { driver: { name: "mongo-go-driver", version: "v1.2.1" }, os: { type: "linux", architecture: "amd64" }, platform: "go1.13.4" }
2020-02-06T14:10:27.737-0500 I  COMMAND  [conn78] command record_stats.answerlogs command: aggregate { aggregate: "answerlogs", allowDiskUse: false, pipeline: [ { $match: { _id: { $gte: ObjectId('5dfc9f870ff0bbb61395759a') } } }, { $sort: { _id: 1 } }, { $skip: 291860 }, { $limit: 1 }, { $project: { _id: 1 } } ], cursor: {}, lsid: { id: UUID("e3c44531-fbec-4c24-a26d-2f3d8e431df5") }, $db: "record_stats" } planSummary: IXSCAN { _id: 1 } keysExamined:291861 docsExamined:0 cursorExhausted:1 numYields:2280 nreturned:1 queryHash:58536163 planCacheKey:5F700006 reslen:137 locks:{ ReplicationStateTransition: { acquireCount: { w: 2300 } }, Global: { acquireCount: { r: 2300 } }, Database: { acquireCount: { r: 2300 } }, Collection: { acquireCount: { r: 2300 } }, Mutex: { acquireCount: { r: 20 } } } storage:{} protocol:op_msg 321ms
2020-02-06T14:10:28.076-0500 I  COMMAND  [conn78] command record_stats.answerlogs command: aggregate { aggregate: "answerlogs", allowDiskUse: false, pipeline: [ { $match: { _id: { $gte: ObjectId('5e0dec7411714ed41f504082') } } }, { $sort: { _id: 1 } }, { $skip: 291860 }, { $limit: 1 }, { $project: { _id: 1 } } ], cursor: {}, lsid: { id: UUID("e3c44531-fbec-4c24-a26d-2f3d8e431df5") }, $db: "record_stats" } planSummary: IXSCAN { _id: 1 } keysExamined:291861 docsExamined:0 cursorExhausted:1 numYields:2280 nreturned:1 queryHash:58536163 planCacheKey:5F700006 reslen:137 locks:{ ReplicationStateTransition: { acquireCount: { w: 2300 } }, Global: { acquireCount: { r: 2300 } }, Database: { acquireCount: { r: 2300 } }, Collection: { acquireCount: { r: 2300 } }, Mutex: { acquireCount: { r: 20 } } } storage:{} protocol:op_msg 338ms
2020-02-06T14:10:28.472-0500 I  COMMAND  [conn78] command record_stats.answerlogs command: aggregate { aggregate: "answerlogs", allowDiskUse: false, pipeline: [ { $match: { _id: { $gte: ObjectId('5e16361af67e1200a5f8eab2') } } }, { $sort: { _id: 1 } }, { $skip: 291860 }, { $limit: 1 }, { $project: { _id: 1 } } ], cursor: {}, lsid: { id: UUID("e3c44531-fbec-4c24-a26d-2f3d8e431df5") }, $db: "record_stats" } planSummary: IXSCAN { _id: 1 } keysExamined:291861 docsExamined:0 cursorExhausted:1 numYields:2280 nreturned:1 queryHash:58536163 planCacheKey:5F700006 reslen:137 locks:{ ReplicationStateTransition: { acquireCount: { w: 2300 } }, Global: { acquireCount: { r: 2300 } }, Database: { acquireCount: { r: 2300 } }, Collection: { acquireCount: { r: 2300 } }, Mutex: { acquireCount: { r: 20 } } } storage:{} protocol:op_msg 395ms
2020-02-06T14:10:28.913-0500 I  COMMAND  [conn78] command record_stats.answerlogs command: aggregate { aggregate: "answerlogs", allowDiskUse: false, pipeline: [ { $match: { _id: { $gte: ObjectId('5e1cbfe7d01063c2f09edccd') } } }, { $sort: { _id: 1 } }, { $skip: 291860 }, { $limit: 1 }, { $project: { _id: 1 } } ], cursor: {}, lsid: { id: UUID("e3c44531-fbec-4c24-a26d-2f3d8e431df5") }, $db: "record_stats" } planSummary: IXSCAN { _id: 1 } keysExamined:291861 docsExamined:0 cursorExhausted:1 numYields:2280 nreturned:1 queryHash:58536163 planCacheKey:5F700006 reslen:137 locks:{ ReplicationStateTransition: { acquireCount: { w: 2300 } }, Global: { acquireCount: { r: 2300 } }, Database: { acquireCount: { r: 2300 } }, Collection: { acquireCount: { r: 2300 } }, Mutex: { acquireCount: { r: 20 } } } storage:{} protocol:op_msg 440ms
2020-02-06T14:10:29.485-0500 I  COMMAND  [conn75] command record_stats.answerlogs command: aggregate { aggregate: "answerlogs", allowDiskUse: false, pipeline: [ { $match: { _id: { $gte: ObjectId('5e217712c7b942a926c26db7') } } }, { $sort: { _id: 1 } }, { $skip: 291860 }, { $limit: 1 }, { $project: { _id: 1 } } ], cursor: {}, lsid: { id: UUID("e3c44531-fbec-4c24-a26d-2f3d8e431df5") }, $db: "record_stats" } planSummary: IXSCAN { _id: 1 } keysExamined:291861 docsExamined:0 cursorExhausted:1 numYields:2280 nreturned:1 queryHash:58536163 planCacheKey:5F700006 reslen:137 locks:{ ReplicationStateTransition: { acquireCount: { w: 2300 } }, Global: { acquireCount: { r: 2300 } }, Database: { acquireCount: { r: 2300 } }, Collection: { acquireCount: { r: 2300 } }, Mutex: { acquireCount: { r: 20 } } } storage:{} protocol:op_msg 561ms
2020-02-06T14:10:36.137-0500 I  NETWORK  [conn79] end connection 127.0.0.1:50842 (5 connections now open)
2020-02-06T14:10:36.137-0500 I  NETWORK  [conn75] end connection 127.0.0.1:50834 (3 connections now open)
2020-02-06T14:10:36.137-0500 I  NETWORK  [conn78] end connection 127.0.0.1:50840 (1 connection now open)
2020-02-06T14:10:36.137-0500 I  NETWORK  [conn74] end connection 127.0.0.1:50832 (0 connections now open)
2020-02-06T14:10:36.137-0500 I  NETWORK  [conn76] end connection 127.0.0.1:50836 (2 connections now open)
2020-02-06T14:10:36.137-0500 I  NETWORK  [conn77] end connection 127.0.0.1:50838 (4 connections now open)

@mosesBD
Copy link
Author

mosesBD commented Feb 6, 2020

i ran the code couple more times on both servers
looks like i was wrong even on the slower server just printing the count can produce different results but it happens less often
slower server is running mongo 4.2.1
this is probably a matter of hardware and mongo version

@rwynn
Copy link
Owner

rwynn commented Feb 7, 2020

@mosesBD,

I think I found the mistake on my part. It was silly. The condition for the for loop should be

for !errsDone || !opsDone {

Notice the || instead of &&. I updated the code sample in this thread.

I think gtm is OK and this was just an error in usage.

@mosesBD
Copy link
Author

mosesBD commented Feb 7, 2020

looks like that was it
testing with count and print works fine

@mosesBD
Copy link
Author

mosesBD commented Feb 9, 2020

i checked a couple more times and it is working :)
how do i tell whether the operation i am receiving is coming from DirectReadNs or ChangeStreamNs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants