From d97349ebea9ad36931b6530093a37c41133d5d56 Mon Sep 17 00:00:00 2001 From: Jacek Wysocki Date: Mon, 11 Sep 2023 13:59:39 +0200 Subject: [PATCH] chore: testing JetStream implementation --- go.mod | 6 +- go.sum | 11 ++-- pkg/event/logs/logs_test.go | 88 +++++++++++++++++++++++++++++ pkg/event/logs/main.go | 110 ++++++++++++++++++++++++++++++++++++ pkg/executor/client/job.go | 1 + 5 files changed, 208 insertions(+), 8 deletions(-) create mode 100644 pkg/event/logs/logs_test.go create mode 100644 pkg/event/logs/main.go diff --git a/go.mod b/go.mod index 81c6691928e..ffe6672bc9e 100644 --- a/go.mod +++ b/go.mod @@ -28,7 +28,7 @@ require ( github.com/minio/minio-go/v7 v7.0.47 github.com/montanaflynn/stats v0.6.6 github.com/moogar0880/problems v0.1.1 - github.com/nats-io/nats.go v1.22.1 + github.com/nats-io/nats.go v1.28.0 github.com/olekukonko/tablewriter v0.0.5 github.com/onsi/ginkgo/v2 v2.1.6 github.com/onsi/gomega v1.20.2 @@ -109,7 +109,7 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/nats-io/jwt/v2 v2.3.0 // indirect github.com/nats-io/nats-server/v2 v2.8.4 // indirect - github.com/nats-io/nkeys v0.3.0 // indirect + github.com/nats-io/nkeys v0.4.4 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/package-url/packageurl-go v0.1.0 // indirect github.com/pquerna/cachecontrol v0.2.0 // indirect @@ -158,7 +158,7 @@ require ( github.com/imdario/mergo v0.3.13 // indirect github.com/inconshreveable/mousetrap v1.0.1 // indirect github.com/json-iterator/go v1.1.12 // indirect - github.com/klauspost/compress v1.15.15 // indirect + github.com/klauspost/compress v1.16.5 // indirect github.com/mattn/go-runewidth v0.0.14 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.2 // indirect github.com/minio/md5-simd v1.1.2 // indirect diff --git a/go.sum b/go.sum index c18a72d21d2..b7834f68ca8 100644 --- a/go.sum +++ b/go.sum @@ -372,8 +372,8 @@ github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47e github.com/klauspost/compress v1.14.1/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.15.0/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= -github.com/klauspost/compress v1.15.15 h1:EF27CXIuDsYJ6mmvtBRlEuB2UVOqHG1tAXgZ7yIO+lw= -github.com/klauspost/compress v1.15.15/go.mod h1:ZcK2JAFqKOpnBlxcLsJzYfrS9X1akm9fHZNnD9+Vo/4= +github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI= +github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= @@ -462,10 +462,11 @@ github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI= github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= github.com/nats-io/nats-server/v2 v2.8.4 h1:0jQzze1T9mECg8YZEl8+WYUXb9JKluJfCBriPUtluB4= github.com/nats-io/nats-server/v2 v2.8.4/go.mod h1:8zZa+Al3WsESfmgSs98Fi06dRWLH5Bnq90m5bKD/eT4= -github.com/nats-io/nats.go v1.22.1 h1:XzfqDspY0RNufzdrB8c4hFR+R3dahkxlpWe5+IWJzbE= -github.com/nats-io/nats.go v1.22.1/go.mod h1:tLqubohF7t4z3du1QDPYJIQQyhb4wl6DhjxEajSI7UA= -github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= +github.com/nats-io/nats.go v1.28.0 h1:Th4G6zdsz2d0OqXdfzKLClo6bOfoI/b1kInhRtFIy5c= +github.com/nats-io/nats.go v1.28.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= +github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA= +github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= diff --git a/pkg/event/logs/logs_test.go b/pkg/event/logs/logs_test.go new file mode 100644 index 00000000000..f46c242aa00 --- /dev/null +++ b/pkg/event/logs/logs_test.go @@ -0,0 +1,88 @@ +package main + +import ( + "context" + "fmt" + "strconv" + "sync" + "testing" + "time" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + "github.com/stretchr/testify/assert" +) + +func TestLogs(t *testing.T) { + + // connect to nats server + nc, _ := nats.Connect(nats.DefaultURL) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + fmt.Printf("%+v\n", "a1") + + err := nc.Publish("aaaa", []byte("hello message 1")) + assert.NoError(t, err) + + js, err := jetstream.New(nc) + assert.NoError(t, err) + fmt.Printf("%+v\n", "a00000") + + // create a stream (this is an idempotent operation) + s, err := js.CreateStream(ctx, jetstream.StreamConfig{ + Name: "LOGS", + Subjects: []string{"LOGS.*"}, + }) + defer js.DeleteStream(ctx, "LOGS") + + assert.NoError(t, err) + + fmt.Printf("%+v\n", "a2") + fmt.Printf("%+v\n", s.CachedInfo()) + + // Create durable consumer + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ + Durable: "AAA", + AckPolicy: jetstream.AckExplicitPolicy, + }) + assert.NoError(t, err) + fmt.Printf("%+v\n", "a3") + + var wg sync.WaitGroup + + fmt.Printf("%+v\n", "AAAA") + + wg.Add(1) + go func() { + fmt.Printf("%+v\n", "START") + + // Publish some messages + for i := 0; i < 100; i++ { + js.Publish(ctx, "LOGS.a123", []byte("hello message "+strconv.Itoa(i))) + fmt.Printf("Published hello message %d\n", i) + } + for i := 0; i < 100; i++ { + js.Publish(ctx, "LOGS.b999", []byte("hello message "+strconv.Itoa(i))) + fmt.Printf("Published hello message %d\n", i) + } + }() + + wg.Add(1) + go func() { + messageCounter := 0 + // Receive messages continuously in a callback + cons, _ := c.Consume(func(msg jetstream.Msg) { + msg.Ack() + fmt.Printf("Received a JetStream message via callback: %s\n", string(msg.Data())) + messageCounter++ + fmt.Printf("%+v\n", messageCounter) + }) + + cons.Stop() + }() + + t.Fail() + +} diff --git a/pkg/event/logs/main.go b/pkg/event/logs/main.go new file mode 100644 index 00000000000..84d948bbf25 --- /dev/null +++ b/pkg/event/logs/main.go @@ -0,0 +1,110 @@ +package main + +import ( + "context" + "fmt" + "os" + "strconv" + "time" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +func main() { + // In the `jetstream` package, almost all API calls rely on `context.Context` for timeout/cancellation handling + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + nc, _ := nats.Connect(nats.DefaultURL) + + // Create a JetStream management interface + js, _ := jetstream.New(nc) + + switch os.Args[1] { + case "create": + // Create a stream + s, _ := js.CreateStream(ctx, jetstream.StreamConfig{ + Name: "ORDERS", + Subjects: []string{"ORDERS.*"}, + }) + // // Create durable consumer + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ + Name: "OC", + Durable: "OC", + AckPolicy: jetstream.AckExplicitPolicy, + }) + + fmt.Printf("%+v\n", err) + fmt.Printf("%+v\n", c) + + case "publish": + // Publish some messages + for i := 1; i <= 100; i++ { + js.Publish(ctx, "ORDERS.new", []byte("hello message "+strconv.Itoa(i))) + fmt.Printf("Published hello message %d\n", i) + } + + // Publish some messages + for i := 1; i <= 100; i++ { + js.Publish(ctx, "ORDERS.old", []byte("hello message "+strconv.Itoa(i))) + fmt.Printf("Published hello message %d\n", i) + } + + case "consume": + + c, err := js.Consumer(ctx, "ORDERS", "OC") + if err != nil { + fmt.Printf("%+v\n", err) + return + } + + // Receive messages continuously in a callback + var messageCounter int + cons, _ := c.Consume(func(msg jetstream.Msg) { + err := msg.Ack() + if err != nil { + fmt.Printf("ack error: %+v\n", err) + return + } + messageCounter++ + fmt.Printf("Received %d message via callback: %s\n", messageCounter, string(msg.Data())) + }) + defer cons.Stop() + fmt.Printf("%+v\n", messageCounter) + + time.Sleep(time.Hour) + + case "iter": + from := 0 + if len(os.Args) == 3 { + from, _ = strconv.Atoi(os.Args[2]) + } + + c, err := js.CreateOrUpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{ + Name: "AAA1", + Durable: "AAA1", + DeliverPolicy: jetstream.DeliverByStartSequencePolicy, + OptStartSeq: uint64(from), + }) + + if err != nil { + fmt.Printf("%+v\n", err) + return + } + defer js.DeleteConsumer(ctx, "ORDERS", "AAA1") + + messageCounter := 0 + // Iterate over messages continuously + it, _ := c.Messages() + for i := 0; i < 10; i++ { + msg, _ := it.Next() + msg.Ack() + fmt.Printf("Received a JetStream message via iterator: %s\n", string(msg.Data())) + messageCounter++ + } + it.Stop() + + } + +} diff --git a/pkg/executor/client/job.go b/pkg/executor/client/job.go index e9e694f8984..52a5c2bbe95 100644 --- a/pkg/executor/client/job.go +++ b/pkg/executor/client/job.go @@ -310,6 +310,7 @@ func (c *JobExecutor) CreateJob(ctx context.Context, execution testkube.Executio c.clusterID, execution, options) if err != nil { return err + } c.Log.Debug("creating job with options", "options", jobOptions)