Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#143: Handling order complition event #144

Merged
merged 2 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/backend/docker/docker-compose.kafka.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ services:
- kafka
command: "bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b kafka:29092 1 30 && \
kafka-topics --create --topic checkout --partitions 3 --replication-factor 1 --if-not-exists --bootstrap-server kafka:29092'"
kafka-topics --create --topic checkout --partitions 3 --replication-factor 1 --if-not-exists --bootstrap-server kafka:29092 && \
kafka-topics --create --topic orders --partitions 3 --replication-factor 1 --if-not-exists --bootstrap-server kafka:29092
'"

kafka-ui:
image: provectuslabs/kafka-ui
Expand Down
2 changes: 1 addition & 1 deletion src/backend/docker/docker-compose.override.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ services:
environment:
- REDIS_HOST=redis:6379
- KAFKA_BROKER=kafka:29092
- CHECKOUT_TOPIC=checkout
- ORDERS_TOPIC=orders
- IDENTITY_URL=http://identity-api
- AUTH_URL=http://localhost:8080/identity
- BASE_PATH=/shoppingcart
Expand Down
39 changes: 22 additions & 17 deletions src/backend/load-tests/checkout.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,36 @@ export const options = {
vus: 300,
duration: "30s",
thresholds: {
'http_req_duration': ['p(99)<300'],
http_req_duration: ["p(99)<300"],
},
};

const baseUrl = "http://localhost:8080";
const catalogItems = baseUrl + "/cart/api/v1/items/all";

export default function () {
const customer_id = 'dfdee7b6-04d3-4d77-89a9-6542a4f2f31a'
const baseUrl = "http://localhost:8080";
const customer_id = "dfdee7b6-04d3-4d77-89a9-6542a4f2f31a";
const foods = http.get(baseUrl + "/catalog/items/all").json();
const food = foods[Math.floor(Math.random() * foods.length)];

http.post(baseUrl + '/basket/api/v1/items', JSON.stringify({
customer_id,
items: [
{
food_id: food.id,
food_name: food.name,
old_unit_price: 0,
picture: food.image,
quantity: Math.floor(Math.random() * 20),
unit_price: food.price,
}
]
}))
http.post(
catalogItems,
JSON.stringify({
customer_id,
items: [
{
food_id: food.id,
food_name: food.name,
old_unit_price: 0,
picture: food.image,
quantity: Math.floor(Math.random() * 20),
unit_price: food.price,
},
],
})
);

sleep(1);

http.post(baseUrl + '')
http.post(baseUrl + "");
}
36 changes: 23 additions & 13 deletions src/backend/services/cart-api/cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,17 @@ import (
"syscall"
"time"

"github.com/IBM/sarama"
"github.com/dnwe/otelsarama"
"github.com/jurabek/cart-api/cmd/config"
"github.com/jurabek/cart-api/internal/database"
"github.com/jurabek/cart-api/internal/docs"
"github.com/jurabek/cart-api/internal/events"
grpcsvc "github.com/jurabek/cart-api/internal/grpc"
"github.com/jurabek/cart-api/internal/handlers"
"github.com/jurabek/cart-api/internal/middlewares"
pbv1 "github.com/jurabek/cart-api/pb/v1"
"github.com/jurabek/cart-api/pkg/reciever"
"github.com/redis/go-redis/v9"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand Down Expand Up @@ -87,18 +91,27 @@ func main() {
if err != nil {
fmt.Print(err)
}
cartRepository := repositories.NewCartRepository(redisClient)

// p, err := sarama.NewSyncProducer([]string{cfg.KafkaBroker}, nil)
// if err != nil {
// log.Fatal().Err(err).Msg("new producer failed!")
// }
config := sarama.NewConfig()
config.Consumer.Offsets.AutoCommit.Enable = true
config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second

// tracedProducer := otelsarama.WrapSyncProducer(nil, p)
// defer tracedProducer.Close()
kafkaConsumer, error := sarama.NewConsumer([]string{cfg.KafkaBroker}, config)
if error != nil {
log.Fatal().Err(error).Msg("new consumer failed!")
}
kafkaConsumer = otelsarama.WrapConsumer(kafkaConsumer)

cartRepository := repositories.NewCartRepository(redisClient)
cartHandler := handlers.NewCartHandler(cartRepository)
msgReciever := reciever.NewMessageReciever(kafkaConsumer, cfg.OrdersTopic)
go func() {
recieveErr := msgReciever.Recieve(events.NewOrderCompletedEventHandler(cartRepository))
log.Error().Err(recieveErr).Msg("Error recieving messages")
}()

go grpcServer(grpcsvc.NewCartGrpcService(cartRepository))

cartHandler := handlers.NewCartHandler(cartRepository)
apiV1 := router.Group(basePath + "/api/v1")
{
cart := apiV1.Group("/cart")
Expand All @@ -107,7 +120,7 @@ func main() {
cart.GET(":id", handlers.ErrorHandler(cartHandler.Get))
cart.DELETE(":id", handlers.ErrorHandler(cartHandler.Delete))
cart.PUT(":id", handlers.ErrorHandler(cartHandler.Update))
cart.POST(":id/item", handlers.ErrorHandler(cartHandler.AddItem)) // adds item or increments quantity by CartID
cart.POST(":id/item", handlers.ErrorHandler(cartHandler.AddItem)) // adds item or increments quantity by CartID
cart.PUT(":id/item/:itemID", handlers.ErrorHandler(cartHandler.UpdateItem)) // updates line item item_id is ignored
cart.DELETE(":id/item/:itemID", handlers.ErrorHandler(cartHandler.DeleteItem))
}
Expand All @@ -122,8 +135,6 @@ func main() {
func(c *ginSwagger.Config) {
c.URL = basePath + "/swagger/doc.json"
}))

go grpcServer(grpcsvc.NewCartGrpcService(cartRepository))
_ = router.Run()
}

Expand All @@ -134,8 +145,7 @@ func grpcServer(svc pbv1.CartServiceServer) {
}

server := grpc.NewServer(
grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor()),
grpc.StatsHandler(otelgrpc.NewServerHandler()),
)
reflection.Register(server)

Expand Down
12 changes: 6 additions & 6 deletions src/backend/services/cart-api/cmd/config/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import (

// Configuration injects all environment variables into object
type Configuration struct {
ServerPort string
RedisHost string
KafkaBroker string
CheckoutTopic string
ServerPort string
RedisHost string
KafkaBroker string
OrdersTopic string
}

// Init initializes environment variables into config
Expand All @@ -24,8 +24,8 @@ func Init() *Configuration {
cfg.KafkaBroker = kafkaBroker
}

if checkoutTopic, ok := os.LookupEnv("CHECKOUT_TOPIC"); ok {
cfg.CheckoutTopic = checkoutTopic
if ordersTopic, ok := os.LookupEnv("ORDERS_TOPIC"); ok {
cfg.OrdersTopic = ordersTopic
}

return &cfg
Expand Down
49 changes: 25 additions & 24 deletions src/backend/services/cart-api/go.mod
Original file line number Diff line number Diff line change
@@ -1,63 +1,64 @@
module github.com/jurabek/cart-api

require (
github.com/Shopify/sarama v1.38.1
github.com/gin-gonic/gin v1.9.1
github.com/gomodule/redigo v2.0.0+incompatible
github.com/google/uuid v1.3.1
github.com/redis/go-redis/extra/redisotel/v9 v9.0.2
github.com/redis/go-redis/v9 v9.0.2
github.com/redis/go-redis/v9 v9.0.5
github.com/rs/zerolog v1.31.0
github.com/stretchr/testify v1.8.4
github.com/swaggo/gin-swagger v1.6.0
github.com/swaggo/swag v1.16.2
go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama v0.40.0
go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin v0.40.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.40.0
go.opentelemetry.io/otel v1.14.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.14.0
go.opentelemetry.io/otel/sdk v1.14.0
go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin v0.47.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0
go.opentelemetry.io/otel v1.22.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.22.0
go.opentelemetry.io/otel/sdk v1.22.0
)

require (
github.com/IBM/sarama v1.42.1
github.com/dnwe/otelsarama v0.0.0-20231212173111-631a0a53d5d4
github.com/redis/go-redis/extra/redisotel/v9 v9.0.5
)

require (
github.com/bytedance/sonic v1.10.2 // indirect
github.com/cenkalti/backoff/v4 v4.2.0 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect
github.com/chenzhuoyu/iasm v0.9.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/eapache/go-resiliency v1.3.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 // indirect
github.com/eapache/go-resiliency v1.4.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.3 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/klauspost/compress v1.15.15 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/redis/go-redis/extra/rediscmd/v9 v9.0.2 // indirect
github.com/redis/go-redis/extra/rediscmd/v9 v9.0.5 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.14.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.14.0 // indirect
go.opentelemetry.io/otel/metric v0.37.0 // indirect
go.opentelemetry.io/otel/trace v1.14.0 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.22.0 // indirect
go.opentelemetry.io/otel/metric v1.22.0 // indirect
go.opentelemetry.io/otel/trace v1.22.0 // indirect
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
golang.org/x/arch v0.7.0 // indirect
google.golang.org/genproto v0.0.0-20231002182017-d307bd883b97 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20231002182017-d307bd883b97 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231002182017-d307bd883b97 // indirect
)
Expand Down
Loading
Loading