Skip to content

Commit

Permalink
refactor(kafkatopic): add KafkaTopic repository
Browse files Browse the repository at this point in the history
  • Loading branch information
byashimov committed Nov 8, 2023
1 parent 7906913 commit 451a50e
Show file tree
Hide file tree
Showing 16 changed files with 827 additions and 689 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/hashicorp/terraform-plugin-sdk/v2 v2.29.0
github.com/kelseyhightower/envconfig v1.4.0
github.com/liip/sheriff v0.11.1
github.com/samber/lo v1.38.1
github.com/stoewer/go-strcase v1.3.0
github.com/stretchr/testify v1.8.4
golang.org/x/exp v0.0.0-20230809150735-7b3493d9a819
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM=
github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ=
github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
Expand Down
32 changes: 32 additions & 0 deletions internal/sdkprovider/kafkatopicrepository/create.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package kafkatopicrepository

import (
"context"

"github.com/aiven/aiven-go-client/v2"
)

// Create creates topic.
// First checks if topic does not exist for the safety
// Then calls creates topic.
func (rep *repository) Create(ctx context.Context, project, service string, req aiven.CreateKafkaTopicRequest) error {
// aiven.KafkaTopics.Create() function may return 501 on create
// Second call might say that topic already exists, and we have retries in aiven client
// So to be sure, better check it before create
err := rep.exists(ctx, project, service, req.TopicName, true)
if err == nil {
return errAlreadyExists
}

// If this is not errNotFound, then something happened
if err != errNotFound {
return err
}

// 501 is retried in the client, so it can return 429
err = rep.client.Create(ctx, project, service, req)
if aiven.IsAlreadyExists(err) {
return nil
}
return err
}
70 changes: 70 additions & 0 deletions internal/sdkprovider/kafkatopicrepository/create_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package kafkatopicrepository

import (
"context"
"sync"
"sync/atomic"
"testing"

"github.com/aiven/aiven-go-client/v2"
"github.com/stretchr/testify/assert"
)

// TestCreateConflict tests that one goroutine out of 100 creates the topic, while others get errAlreadyExists
func TestCreateConflict(t *testing.T) {
client := &fakeTopicClient{}
rep := newRepository(client)
ctx := context.Background()

var conflictErr int32
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
err := rep.Create(ctx, "a", "b", aiven.CreateKafkaTopicRequest{TopicName: "c"})
if err == errAlreadyExists {
atomic.AddInt32(&conflictErr, 1)
}
wg.Done()
}()
}
wg.Wait()
assert.EqualValues(t, 99, conflictErr)
assert.EqualValues(t, 1, client.createCalled)
assert.EqualValues(t, 1, client.v1ListCalled)
assert.EqualValues(t, 0, client.v2ListCalled)
assert.True(t, rep.seenServices["a/b"])
assert.True(t, rep.seenTopics["a/b/c"])
}

// TestCreateRecreateMissing must recreate missing topic
// When Kafka is off, it looses all topics. We recreate them instead of making user clear the state
func TestCreateRecreateMissing(t *testing.T) {
client := &fakeTopicClient{}
rep := newRepository(client)
ctx := context.Background()

// Creates topic
err := rep.Create(ctx, "a", "b", aiven.CreateKafkaTopicRequest{TopicName: "c"})
assert.NoError(t, err)
assert.EqualValues(t, 1, client.createCalled)
assert.EqualValues(t, 1, client.v1ListCalled)
assert.EqualValues(t, 0, client.v2ListCalled)
assert.True(t, rep.seenServices["a/b"])
assert.True(t, rep.seenTopics["a/b/c"])

// Forgets the topic, like if it's missing
err = rep.forgetTopic("a", "b", "c")
assert.NoError(t, err)
assert.True(t, rep.seenServices["a/b"])
assert.False(t, rep.seenTopics["a/b/c"]) // not cached, missing

// Recreates topic
err = rep.Create(ctx, "a", "b", aiven.CreateKafkaTopicRequest{TopicName: "c"})
assert.NoError(t, err)
assert.EqualValues(t, 2, client.createCalled) // Updated
assert.EqualValues(t, 1, client.v1ListCalled)
assert.EqualValues(t, 0, client.v2ListCalled)
assert.True(t, rep.seenServices["a/b"])
assert.True(t, rep.seenTopics["a/b/c"]) // cached again
}
23 changes: 23 additions & 0 deletions internal/sdkprovider/kafkatopicrepository/delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package kafkatopicrepository

import (
"context"

"github.com/aiven/aiven-go-client/v2"
)

func (rep *repository) Delete(ctx context.Context, project, service, topic string) error {
// This might give us false positive,
// because 404 is also returned for "unknown" topic.
// But it speedups things a lot (no "read" performed),
// and if kafka has been off, it will make it easier to remove topics from state
err := rep.client.Delete(ctx, project, service, topic)
if !(err == nil || aiven.IsNotFound(err)) {
return err
}

rep.Lock()
rep.seenTopics[newKey(project, service, topic)] = false
rep.Unlock()
return nil
}
40 changes: 40 additions & 0 deletions internal/sdkprovider/kafkatopicrepository/delete_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package kafkatopicrepository

import (
"context"
"testing"

"github.com/aiven/aiven-go-client/v2"
"github.com/stretchr/testify/assert"
)

// TestDeleteDoesNotExist shouldn't rise that topic does not exist on delete,
// if it doesn't exist for real
func TestDeleteDoesNotExist(t *testing.T) {
client := &fakeTopicClient{}
rep := newRepository(client)
ctx := context.Background()
err := rep.Delete(ctx, "a", "b", "c")
assert.NoError(t, err)
assert.EqualValues(t, 0, client.v1ListCalled)
assert.EqualValues(t, 0, client.v2ListCalled)
assert.EqualValues(t, 1, client.deleteCalled)
}

// TestDeletesAfterRetry proves that it deletes topic
// when client has made retries under the hood and got 404 on some call
func TestDeletesAfterRetry(t *testing.T) {
client := &fakeTopicClient{
deleteErr: errNotFound,
storage: map[string]*aiven.KafkaListTopic{
"a/b/c": {TopicName: "c"},
},
}
rep := newRepository(client)
ctx := context.Background()
err := rep.Delete(ctx, "a", "b", "c")
assert.NoError(t, err)
assert.EqualValues(t, 0, client.v1ListCalled)
assert.EqualValues(t, 0, client.v2ListCalled)
assert.EqualValues(t, 1, client.deleteCalled)
}
163 changes: 163 additions & 0 deletions internal/sdkprovider/kafkatopicrepository/read.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package kafkatopicrepository

import (
"context"
"fmt"

"github.com/aiven/aiven-go-client/v2"
"github.com/avast/retry-go"
"github.com/samber/lo"
)

func (rep *repository) Read(ctx context.Context, project, service, topic string) (*aiven.KafkaTopic, error) {
// We have quick methods to determine that topic does not exist
err := rep.exists(ctx, project, service, topic, false)
if err != nil {
return nil, err
}

// Adds request to the queue
c := make(chan *response, 1)
r := &request{
project: project,
service: service,
topic: topic,
rsp: c,
}
rep.Lock()
rep.queue = append(rep.queue, r)
rep.Unlock()

// Waits response from the channel
// Or exits on context done
select {
case <-ctx.Done():
return nil, ctx.Err()
case rsp := <-c:
close(c)
return rsp.topic, rsp.err
}
}

// exists returns nil if topic exists, or errNotFound if doesn't:
// 1. checks repository.seenTopics for known topics
// 2. calls v1List for the remote state for the given service and marks it in repository.seenServices
// 3. saves topic names to repository.seenTopics, so its result can be reused
// 4. when acquire true, then saves topic to repository.seenTopics (for creating)
// todo: use context with the new client
func (rep *repository) exists(ctx context.Context, project, service, topic string, acquire bool) error {
rep.Lock()
defer rep.Unlock()
// Checks repository.seenTopics.
// If it has been just created, it is not available in v1List.
// So calling it first doesn't make any sense
serviceKey := newKey(project, service)
topicKey := newKey(serviceKey, topic)
if rep.seenTopics[topicKey] {
return nil
}

// Goes for v1List
if !rep.seenServices[serviceKey] {
list, err := rep.client.List(ctx, project, service)
if err != nil {
return err
}

// Marks seen all the topics
for _, t := range list {
rep.seenTopics[newKey(serviceKey, t.TopicName)] = true
}

// Service is seen too. It never goes here again
rep.seenServices[serviceKey] = true
}

// Checks updated list
if rep.seenTopics[topicKey] {
return nil
}

// Create functions run in parallel need to lock the name before create
// Otherwise they may run into conflict
if acquire {
rep.seenTopics[topicKey] = true
}

// v1List doesn't contain the topic
return errNotFound
}

// fetch fetches requested topics configuration
// 1. groups topics by service
// 2. requests topics (in chunks)
// Warning: if we call V2List with at least one "not found" topic, it will return 404 for all topics
// Should be certain that all topics in queue do exist. Call repository.exists first to do so
func (rep *repository) fetch(ctx context.Context, queue map[string]*request) {
// Groups topics by service
byService := make(map[string][]*request, 0)
for i := range queue {
r := queue[i]
key := newKey(r.project, r.service)
byService[key] = append(byService[key], r)
}

// Fetches topics configuration
for _, reqs := range byService {
topicNames := make([]string, 0, len(reqs))
for _, r := range reqs {
topicNames = append(topicNames, r.topic)
}

// Topics are grouped by service
// We can share this values
project := reqs[0].project
service := reqs[0].service

// Slices topic names by repository.v2ListBatchSize
// because V2List has a limit
for _, chunk := range lo.Chunk(topicNames, rep.v2ListBatchSize) {
// V2List() and Get() do not get info immediately
// Some retries should be applied if result is not equal to requested values
var list []*aiven.KafkaTopic
err := retry.Do(func() error {
rspList, err := rep.client.V2List(ctx, project, service, chunk)

// 404 means that there is "not found" on the list
// But repository.exists should have checked these, so now this is a fail
if aiven.IsNotFound(err) {
return retry.Unrecoverable(fmt.Errorf("topic list has changed"))
}

// Something else happened
// We have retries in the client, so this is bad
if err != nil {
return retry.Unrecoverable(err)
}

// This is an old cache, we need to retry it until succeed
if len(rspList) != len(chunk) {
return fmt.Errorf("got %d topics, expected %d. Retrying", len(rspList), len(chunk))
}

list = rspList
return nil
}, retry.Delay(rep.v2ListRetryDelay))

if err != nil {
// Send errors
// Flattens error to a string, because it might go really completed for testing
err = fmt.Errorf("topic read error: %s", err)
for _, r := range reqs {
r.send(nil, err)
}
continue
}

// Sends topics
for _, t := range list {
queue[newKey(project, service, t.TopicName)].send(t, nil)
}
}
}
}
Loading

0 comments on commit 451a50e

Please sign in to comment.