-
Notifications
You must be signed in to change notification settings - Fork 70
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor(kafka_topic): add KafkaTopic repository
- Loading branch information
Showing
15 changed files
with
662 additions
and
697 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
package kafkatopicrepository | ||
|
||
import ( | ||
"context" | ||
"time" | ||
|
||
"github.com/aiven/aiven-go-client" | ||
"github.com/avast/retry-go" | ||
) | ||
|
||
func (rep *repo) Create(ctx context.Context, project, service string, req aiven.CreateKafkaTopicRequest) error { | ||
return retry.Do(func() error { | ||
err := rep.client.Create(project, service, req) | ||
if err == nil || aiven.IsAlreadyExists(err) { | ||
rep.Lock() | ||
rep.cache[newKey(project, service, req.TopicName)] = true | ||
rep.Unlock() | ||
return nil | ||
} | ||
|
||
// If some brokers are offline while the request is being executed | ||
// the operation may fail. | ||
_, ok := err.(aiven.Error) | ||
if ok { | ||
return err | ||
} | ||
return retry.Unrecoverable(err) | ||
}, retry.Context(ctx), retry.Delay(10*time.Second)) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
package kafkatopicrepository | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/aiven/aiven-go-client" | ||
) | ||
|
||
func (rep *repo) Delete(_ context.Context, project, service, topic string) error { | ||
err := rep.client.Delete(project, service, topic) | ||
if !(err == nil || aiven.IsNotFound(err)) { | ||
return err | ||
} | ||
rep.Lock() | ||
rep.cache[newKey(project, service, topic)] = false | ||
rep.Unlock() | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,146 @@ | ||
package kafkatopicrepository | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
"github.com/aiven/aiven-go-client" | ||
"github.com/avast/retry-go" | ||
"github.com/samber/lo" | ||
) | ||
|
||
func (rep *repo) Read(ctx context.Context, project, service, topic string) (*aiven.KafkaTopic, error) { | ||
c := make(chan *response, 1) | ||
req := &request{ | ||
project: project, | ||
service: service, | ||
topic: topic, | ||
rsp: c, | ||
} | ||
|
||
// Adds request to the queue | ||
rep.Lock() | ||
rep.queue = append(rep.queue, req) | ||
rep.Unlock() | ||
|
||
// Waits response from channel | ||
// Or exists on context done | ||
select { | ||
case <-ctx.Done(): | ||
return nil, ctx.Err() | ||
case rsp := <-c: | ||
close(c) | ||
return rsp.topic, rsp.err | ||
} | ||
} | ||
|
||
// fetch fetches requested topics | ||
// 1. groups topics by service | ||
// 2. removes "not found" topics | ||
// 3. requests "found" topics (in chunks) | ||
func (rep *repo) fetch(queue map[string]*request) { | ||
// Groups topics by service | ||
rawByService := make(map[string][]*request, 0) | ||
for i := range queue { | ||
r := queue[i] | ||
key := newKey(r.project, r.service) | ||
rawByService[key] = append(rawByService[key], r) | ||
} | ||
|
||
// Removes "not found" topics from the list | ||
// If we call V2List with at least one "not found" topic, | ||
// it will return 404 for all topics | ||
foundByService := make([]map[string]*request, 0, len(rawByService)) | ||
for _, reqs := range rawByService { | ||
project := reqs[0].project | ||
service := reqs[0].service | ||
|
||
// Requests all topics for the service | ||
list, err := rep.client.List(project, service) | ||
if err != nil { | ||
for _, r := range reqs { | ||
r.send(nil, err) | ||
} | ||
} | ||
|
||
// Marks existing topics | ||
exists := make(map[string]bool, len(list)) | ||
for _, r := range list { | ||
exists[newKey(project, service, r.TopicName)] = true | ||
} | ||
|
||
// Send "not found" and gathers "found" | ||
found := make(map[string]*request, len(reqs)) | ||
for i := range reqs { | ||
r := reqs[i] | ||
k := r.key() | ||
if exists[k] || rep.cache[k] { | ||
found[r.topic] = r | ||
} else { | ||
r.send(nil, errNotFound) | ||
} | ||
} | ||
|
||
// If any exists | ||
if len(found) != 0 { | ||
foundByService = append(foundByService, found) | ||
} | ||
} | ||
|
||
// Fetches "found" topics configuration | ||
for _, reqs := range foundByService { | ||
topicNames := make([]string, 0, len(reqs)) | ||
for _, r := range reqs { | ||
topicNames = append(topicNames, r.topic) | ||
} | ||
|
||
// foundByService are grouped by service | ||
// We can share this values | ||
project := reqs[topicNames[0]].project | ||
service := reqs[topicNames[0]].service | ||
|
||
// Slices topic names by rep.v2ListBatchSize | ||
// because V2List has a limit | ||
for _, chunk := range lo.Chunk(topicNames, rep.v2ListBatchSize) { | ||
// V2List() and Get() do not get info immediately | ||
// Some retries should be applied | ||
var list []*aiven.KafkaTopic | ||
err := retry.Do(func() error { | ||
rspList, err := rep.client.V2List(project, service, chunk) | ||
|
||
// 404 means that there is "not found" on the list | ||
// But previous step should have cleaned it, so now this is a fail | ||
if aiven.IsNotFound(err) { | ||
return retry.Unrecoverable(fmt.Errorf("topic list has changed")) | ||
} | ||
|
||
// Something else happened | ||
// We have retries in the client, so this is bad | ||
if err != nil { | ||
return retry.Unrecoverable(err) | ||
} | ||
|
||
// This is an old cache, we need to retry until succeed | ||
if len(rspList) != len(chunk) { | ||
return fmt.Errorf("got %d topics, expected %d. Retrying", len(rspList), len(chunk)) | ||
} | ||
|
||
list = rspList | ||
return nil | ||
}, retry.Delay(rep.v2ListRetryDelay)) | ||
|
||
if err != nil { | ||
// Send errors | ||
for _, r := range reqs { | ||
// Flattens error to string, because it might go really completed for testing | ||
r.send(nil, fmt.Errorf("topic read error: %s", err)) | ||
} | ||
} else { | ||
// Sends topics | ||
for _, t := range list { | ||
reqs[t.TopicName].send(t, nil) | ||
} | ||
} | ||
} | ||
} | ||
} |
Oops, something went wrong.