Skip to content

Commit

Permalink
refactor(kafka_topic): add KafkaTopic repository
Browse files Browse the repository at this point in the history
  • Loading branch information
byashimov committed Sep 9, 2023
1 parent 7e58842 commit 60bf3d7
Show file tree
Hide file tree
Showing 9 changed files with 262 additions and 567 deletions.
2 changes: 2 additions & 0 deletions examples_tests/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ func (s *BaseTestSuite) TearDownSuite() {
}

// withDefaults adds default options for terraform test
//
//lint:ignore U1000 Ignore unused function. Used in child structs
func (s *BaseTestSuite) withDefaults(opts *terraform.Options) *terraform.Options {
// No need to use lock file for dev build
opts.Lock = false
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ require (
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v0.13.0 // indirect
github.com/ProtonMail/go-crypto v0.0.0-20230217124315-7d5c6f04bbb8 // indirect
github.com/avast/retry-go v3.0.0+incompatible // indirect
github.com/cloudflare/circl v1.3.3 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ github.com/apparentlymart/go-textseg v1.0.0/go.mod h1:z96Txxhf3xSFMPmb5X/1W05FF/
github.com/apparentlymart/go-textseg/v12 v12.0.0/go.mod h1:S/4uRK2UtaQttw1GenVJEynmyUenKwP++x/+DdGV/Ec=
github.com/apparentlymart/go-textseg/v13 v13.0.0 h1:Y+KvPE1NYz0xl601PVImeQfFyEy6iT90AvPUL1NNfNw=
github.com/apparentlymart/go-textseg/v13 v13.0.0/go.mod h1:ZK2fH7c4NqDTLtiYLvIkEghdlcqw7yxLeM89kiTRPUo=
github.com/avast/retry-go v3.0.0+incompatible h1:4SOWQ7Qs+oroOTQOYnAHqelpCO0biHSxpiH9JdtuBj0=
github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY=
github.com/aws/aws-sdk-go v1.44.122 h1:p6mw01WBaNpbdP2xrisz5tIkcNwzj/HysobNoaAHjgo=
github.com/aws/aws-sdk-go v1.44.122/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
github.com/bgentry/go-netrc v0.0.0-20140422174119-9fd32a8b3d3d h1:xDfNPAt8lFiC1UJrqV3uuy861HCTo708pDMbjHHdCas=
Expand Down
253 changes: 253 additions & 0 deletions internal/sdkprovider/kafkatopicrepository/repository.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
package kafkatopicrepository

import (
"fmt"
"strings"
"sync"
"time"

"github.com/aiven/aiven-go-client"
"github.com/avast/retry-go"
)

var (
initOnce sync.Once
singleRep *repository
)

const (
// defaultV2ListBatchSize the max size of batch to call V2List
defaultV2ListBatchSize = 100

// defaultV2ListRetryDelay V2List caches results, so we retry it by this delay
defaultV2ListRetryDelay = 5 * time.Second

// defaultWorkerCallInterval how often worker should run
defaultWorkerCallInterval = 5 * time.Second
)

// GetTopic returns KafkaTopic
// Gathers topics to a batch and fetches them by worker with a rate limit (in background)
func GetTopic(client topicsClient, project, service, topic string) (*aiven.KafkaTopic, error) {
// Inits global singleton
initOnce.Do(func() {
singleRep = newRepository(client)
})

c := make(chan *response, 1)
req := &request{
project: project,
service: service,
topic: topic,
rsp: c,
}

// Adds request to the queue
singleRep.Lock()
singleRep.queue = append(singleRep.queue, req)
singleRep.Unlock()

// Waits response from channel
rsp := <-c
close(c)
return rsp.topic, rsp.err
}

// topicsClient interface for unit tests
type topicsClient interface {
// List returns existing topics
List(project, service string) ([]*aiven.KafkaListTopic, error)
// V2List returns requested topics configuration
V2List(project, service string, topicNames []string) ([]*aiven.KafkaTopic, error)
}

func newRepository(client topicsClient) *repository {
r := &repository{
client: client,
v2ListBatchSize: defaultV2ListBatchSize,
v2ListRetryDelay: defaultV2ListRetryDelay,
workerCallInterval: defaultWorkerCallInterval,
}
go r.worker()
return r
}

type repository struct {
sync.Mutex
client topicsClient
queue []*request
v2ListBatchSize int
v2ListRetryDelay time.Duration
workerCallInterval time.Duration
}

// worker runs in background and processes the queue
func (rep *repository) worker() {
ticker := time.NewTicker(rep.workerCallInterval)
for {
<-ticker.C
b := rep.withdrawal()
if b != nil {
rep.fetch(b)
}
}
}

// withdrawal returns the queue and cleans it
func (rep *repository) withdrawal() map[string]*request {
rep.Lock()
defer rep.Unlock()

if len(rep.queue) == 0 {
return nil
}

q := make(map[string]*request, len(rep.queue))
for _, r := range rep.queue {
q[r.key()] = r
}

// todo: use empty() on go 1.21
rep.queue = make([]*request, 0)
return q
}

// fetch fetches requested topics
// 1. groups topics by service
// 2. removes "not found" topics
// 3. requests "found" topics (in chunks)
func (rep *repository) fetch(queue map[string]*request) {
// Groups topics by service
rawByService := make(map[string][]*request, 0)
for i := range queue {
r := queue[i]
key := newKey(r.project, r.service)
rawByService[key] = append(rawByService[key], r)
}

// Removes "not found" topics from the list
// If we call V2List with at least one "not found" topic,
// it will return 404 for all topics
foundByService := make([]map[string]*request, 0, len(rawByService))
for _, reqs := range rawByService {
project := reqs[0].project
service := reqs[0].service

// Requests all topics for the service
list, err := rep.client.List(project, service)
if err != nil {
for _, r := range reqs {
r.send(nil, err)
}
}

// Marks existing topics
exists := make(map[string]bool, len(list))
for _, r := range list {
exists[newKey(project, service, r.TopicName)] = true
}

// Send "not found" and gathers "found"
found := make(map[string]*request, len(reqs))
for i := range reqs {
r := reqs[i]
if exists[r.key()] {
found[r.topic] = r
} else {
r.send(nil, aiven.Error{Status: 404, Message: "Topic not found"})
}
}

// If any exists
if len(found) != 0 {
foundByService = append(foundByService, found)
}
}

// Fetches "found" topics configuration
for _, reqs := range foundByService {
topicNames := make([]string, 0, len(reqs))
for _, r := range reqs {
topicNames = append(topicNames, r.topic)
}

// foundByService are grouped by service
// We can share this values
project := reqs[topicNames[0]].project
service := reqs[topicNames[0]].service

// Slices topic names by rep.v2ListBatchSize
// because V2List has a limit
for _, chunk := range chunksBy(topicNames, rep.v2ListBatchSize) {
// V2List() and Get() do not get info immediately
// Some retries should be applied
var list []*aiven.KafkaTopic
err := retry.Do(func() error {
rspList, err := rep.client.V2List(project, service, chunk)

// 404 means that there is "not found" on the list
// But previous step should have cleaned it, so now this is a fail
if aiven.IsNotFound(err) {
return retry.Unrecoverable(fmt.Errorf("kafka topic list has changed"))
}

// Something else happened
// We have retries in the client, so this is bad
if err != nil {
return retry.Unrecoverable(err)
}

// This is an old cache, we need to retry until succeed
if len(rspList) != len(chunk) {
return fmt.Errorf("got %d topics, expected %d. Retrying", len(rspList), len(chunk))
}

list = rspList
return nil
}, retry.Delay(rep.v2ListRetryDelay))

if err != nil {
// Send errors
for _, r := range reqs {
r.send(nil, err)
}
} else {
// Sends topics
for _, t := range list {
reqs[t.TopicName].send(t, nil)
}
}
}
}
}

func newKey(parts ...string) string {
return strings.Join(parts, "/")
}

type response struct {
topic *aiven.KafkaTopic
err error
}

type request struct {
project string
service string
topic string
rsp chan *response
}

func (r *request) key() string {
return newKey(r.project, r.service, r.topic)
}

func (r *request) send(topic *aiven.KafkaTopic, err error) {
r.rsp <- &response{topic: topic, err: err}
}

func chunksBy[T any](items []T, size int) (chunks [][]T) {
for size < len(items) {
items, chunks = items[size:], append(chunks, items[0:size:size])
}
return append(chunks, items)
}
35 changes: 4 additions & 31 deletions internal/sdkprovider/service/kafkatopic/kafka_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package kafkatopic
import (
"context"
"errors"
"fmt"
"log"
"time"

Expand All @@ -16,6 +15,7 @@ import (
"github.com/aiven/terraform-provider-aiven/internal/schemautil"
"github.com/aiven/terraform-provider-aiven/internal/schemautil/userconfig"
"github.com/aiven/terraform-provider-aiven/internal/schemautil/userconfig/stateupgrader"
"github.com/aiven/terraform-provider-aiven/internal/sdkprovider/kafkatopicrepository"
)

var aivenKafkaTopicSchema = map[string]*schema.Schema{
Expand Down Expand Up @@ -269,7 +269,7 @@ func resourceKafkaTopicCreate(ctx context.Context, d *schema.ResourceData, m int
// aiven.KafkaTopics.Create() function may return 501 on create
// Second call might say that topic already exists
// So to be sure, better check it before create
_, err := getTopic(ctx, m, d.Timeout(schema.TimeoutRead), project, serviceName, topicName)
_, err := kafkatopicrepository.GetTopic(m.(*aiven.Client).KafkaTopics, project, serviceName, topicName)

// No error means topic exists
if err == nil {
Expand Down Expand Up @@ -306,9 +306,6 @@ func resourceKafkaTopicCreate(ctx context.Context, d *schema.ResourceData, m int

d.SetId(schemautil.BuildResourceID(project, serviceName, topicName))

// Invalidates cache for the topic
DeleteTopicFromCache(project, serviceName, topicName)

// We do not call a Kafka Topic read here to speed up the performance.
// However, in the case of Kafka Topic resource getting a computed field
// in the future, a read operation should be called after creation.
Expand Down Expand Up @@ -369,13 +366,13 @@ func getKafkaTopicConfig(d *schema.ResourceData) aiven.KafkaTopicConfig {
}
}

func resourceKafkaTopicRead(ctx context.Context, d *schema.ResourceData, m interface{}, isResource bool) diag.Diagnostics {
func resourceKafkaTopicRead(_ context.Context, d *schema.ResourceData, m interface{}, isResource bool) diag.Diagnostics {
project, serviceName, topicName, err := schemautil.SplitResourceID3(d.Id())
if err != nil {
return diag.FromErr(err)
}

topic, err := getTopic(ctx, m, d.Timeout(schema.TimeoutRead), project, serviceName, topicName)
topic, err := kafkatopicrepository.GetTopic(m.(*aiven.Client).KafkaTopics, project, serviceName, topicName)

// Topics are destroyed when kafka is off
// https://docs.aiven.io/docs/platform/concepts/service-power-cycle
Expand Down Expand Up @@ -446,30 +443,6 @@ func flattenKafkaTopicTags(list []aiven.KafkaTopicTag) []map[string]interface{}
return tags
}

func getTopic(ctx context.Context, m interface{}, timeout time.Duration, project, serviceName, topicName string) (*aiven.KafkaTopic, error) {
client, ok := m.(*aiven.Client)
if !ok {
return nil, fmt.Errorf("invalid Aiven client")
}

w, err := newKafkaTopicAvailabilityWaiter(client, project, serviceName, topicName)
if err != nil {
return nil, err
}

// nolint:staticcheck // TODO: Migrate to helper/retry package to avoid deprecated WaitForStateContext.
topic, err := w.Conf(timeout).WaitForStateContext(ctx)
if err != nil {
return nil, err
}

kt, ok := topic.(aiven.KafkaTopic)
if !ok {
return nil, fmt.Errorf("can't cast value to aiven.KafkaTopic")
}
return &kt, nil
}

func resourceKafkaTopicUpdate(_ context.Context, d *schema.ResourceData, m interface{}) diag.Diagnostics {
client := m.(*aiven.Client)

Expand Down
Loading

0 comments on commit 60bf3d7

Please sign in to comment.