Skip to content

Commit

Permalink
MLT-0034 Reactive migration (#25)
Browse files Browse the repository at this point in the history
Create basic implementation for creating orders and go over to using reactive streams
  • Loading branch information
anotheroneofthese authored Aug 13, 2024
1 parent 4121450 commit 02568f5
Show file tree
Hide file tree
Showing 12 changed files with 166 additions and 73 deletions.
10 changes: 8 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
<kotlin.compiler.apiVersion>2.0</kotlin.compiler.apiVersion>
<kotlin.compiler.incremental>true</kotlin.compiler.incremental>
<kotlin.compiler.languageVersion>2.0</kotlin.compiler.languageVersion>

<kotlin.logging.version>7.0.0</kotlin.logging.version>
</properties>

<dependencies>
Expand All @@ -98,7 +98,7 @@

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>

<dependency>
Expand Down Expand Up @@ -181,6 +181,12 @@
<version>${mockk.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.github.oshai</groupId>
<artifactId>kotlin-logging-jvm</artifactId>
<version>${kotlin.logging.version}</version>
</dependency>


</dependencies>

Expand Down
11 changes: 11 additions & 0 deletions src/main/kotlin/no/nb/mlt/wls/core/config/WebClientConfig.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package no.nb.mlt.wls.core.config

import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.web.reactive.function.client.WebClient

@Configuration
class WebClientConfig {
@Bean
fun webClient(builder: WebClient.Builder): WebClient = builder.build()
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class OrderController(val orderService: OrderService) {
responseCode = "400",
description =
"""Order payload is invalid and was not created.
Error message contains information about the invalid fields.""",
An empty error message means the order already exists with the current ID.
Otherwise, the error message contains information about the invalid fields.""",
content = [Content(schema = Schema())]
),
ApiResponse(
Expand All @@ -48,7 +49,7 @@ class OrderController(val orderService: OrderService) {
)
)
@PostMapping("/order/batch/create")
fun createOrder(
suspend fun createOrder(
@RequestBody payload: ApiOrderPayload
): ResponseEntity<ApiOrderPayload> = orderService.createOrder(payload)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package no.nb.mlt.wls.order.repository

import no.nb.mlt.wls.core.data.HostName
import no.nb.mlt.wls.order.model.Order
import org.springframework.data.mongodb.repository.MongoRepository
import org.springframework.data.mongodb.repository.ReactiveMongoRepository
import org.springframework.stereotype.Repository
import reactor.core.publisher.Mono

@Repository
interface OrderRepository : MongoRepository<Order, String> {
interface OrderRepository : ReactiveMongoRepository<Order, String> {
fun getByHostNameAndHostOrderId(
hostName: HostName,
hostOrderId: String
): Order?
): Mono<Order>
}
44 changes: 32 additions & 12 deletions src/main/kotlin/no/nb/mlt/wls/order/service/OrderService.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package no.nb.mlt.wls.order.service

import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.reactive.awaitSingle
import kotlinx.coroutines.reactor.awaitSingleOrNull
import no.nb.mlt.wls.order.model.Order
import no.nb.mlt.wls.order.payloads.ApiOrderPayload
import no.nb.mlt.wls.order.payloads.toApiOrderPayload
import no.nb.mlt.wls.order.payloads.toOrder
Expand All @@ -9,28 +13,44 @@ import org.springframework.http.HttpStatus
import org.springframework.http.ResponseEntity
import org.springframework.stereotype.Service
import org.springframework.web.server.ServerErrorException
import java.time.Duration
import java.util.concurrent.TimeoutException

private val logger = KotlinLogging.logger {}

@Service
class OrderService(val db: OrderRepository, val synqService: SynqOrderService) {
fun createOrder(payload: ApiOrderPayload): ResponseEntity<ApiOrderPayload> {
// TODO - Order validation?
suspend fun createOrder(payload: ApiOrderPayload): ResponseEntity<ApiOrderPayload> {
val existingOrder = getByHostNameAndHostOrderId(payload)

val existingOrder = db.getByHostNameAndHostOrderId(payload.hostName, payload.orderId)
if (existingOrder != null) {
return ResponseEntity.ok(existingOrder.toApiOrderPayload())
}

val synqResponse = synqService.createOrder(payload.toOrder().toSynqPayload())
if (!synqResponse.statusCode.is2xxSuccessful) {
return ResponseEntity.internalServerError().build()
return ResponseEntity.badRequest().build()
}

try {
db.save(payload.toOrder())
// TODO - Usages?
val synqResponse = synqService.createOrder(payload.toOrder().toSynqPayload())
// Return what the database saved, as it could contain changes
val order =
db.save(payload.toOrder())
.timeout(Duration.ofSeconds(6))
.awaitSingle()
return ResponseEntity.status(HttpStatus.CREATED).body(order.toApiOrderPayload())
} catch (e: Exception) {
throw ServerErrorException("Failed to save product in database, but created in storage system", e)
throw ServerErrorException("Failed to create order in storage system", e)
}
}

return ResponseEntity.status(HttpStatus.CREATED).build()
suspend fun getByHostNameAndHostOrderId(payload: ApiOrderPayload): Order? {
// TODO - See if timeouts can be made configurable
return db.getByHostNameAndHostOrderId(payload.hostName, payload.orderId)
.timeout(Duration.ofSeconds(8))
.doOnError {
if (it is TimeoutException) {
logger.error(it, { "Timed out while fetching from WLS database" })
}
}
.onErrorComplete(TimeoutException::class.java)
.awaitSingleOrNull()
}
}
30 changes: 20 additions & 10 deletions src/main/kotlin/no/nb/mlt/wls/order/service/SynqOrderService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,21 @@ package no.nb.mlt.wls.order.service
import no.nb.mlt.wls.core.data.synq.SynqError
import no.nb.mlt.wls.order.payloads.SynqOrder
import no.nb.mlt.wls.order.payloads.SynqOrderPayload
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.http.HttpEntity
import org.springframework.http.HttpMethod
import org.springframework.http.HttpStatus
import org.springframework.http.ResponseEntity
import org.springframework.stereotype.Service
import org.springframework.web.client.HttpClientErrorException
import org.springframework.web.client.RestTemplate
import org.springframework.web.reactive.function.BodyInserters
import org.springframework.web.reactive.function.client.WebClient
import org.springframework.web.server.ServerErrorException
import java.net.URI

@Service
class SynqOrderService {
val restTemplate: RestTemplate = RestTemplate()

class SynqOrderService(
@Autowired val webClient: WebClient
) {
@Value("\${synq.path.base}")
lateinit var baseUrl: String

Expand All @@ -27,16 +28,25 @@ class SynqOrderService {
val orders = SynqOrder(listOf(payload))

try {
return restTemplate.exchange(uri, HttpMethod.POST, HttpEntity(orders), SynqError::class.java)
} catch (e: HttpClientErrorException) {
val errorBody = e.getResponseBodyAs(SynqError::class.java)
return ResponseEntity(
webClient
.post()
.uri(uri)
.body(BodyInserters.fromValue(orders))
.retrieve()
.bodyToMono(SynqError::class.java)
.block(),
HttpStatus.CREATED
)
} catch (exception: HttpClientErrorException) {
val errorBody = exception.getResponseBodyAs(SynqError::class.java)

throw ServerErrorException(
"Failed to create product in SynQ, the storage system responded with error code: " +
"'${errorBody?.errorCode ?: "NO ERROR CODE FOUND"}' " +
"and error text: " +
"'${errorBody?.errorText ?: "NO ERROR TEXT FOUND"}'",
e
exception
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class ProductController(val productService: ProductService) {
]
)
@PostMapping("/product")
fun createProduct(
suspend fun createProduct(
@RequestBody payload: ApiProductPayload
): ResponseEntity<ApiProductPayload> = productService.save(payload)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package no.nb.mlt.wls.product.repository

import no.nb.mlt.wls.core.data.HostName
import no.nb.mlt.wls.product.model.Product
import org.springframework.data.mongodb.repository.MongoRepository
import org.springframework.data.mongodb.repository.ReactiveMongoRepository
import org.springframework.stereotype.Repository
import reactor.core.publisher.Mono

@Repository
interface ProductRepository : MongoRepository<Product, String> {
interface ProductRepository : ReactiveMongoRepository<Product, String> {
fun findByHostNameAndHostId(
hostName: HostName,
hostId: String
): Product?
): Mono<Product>
}
31 changes: 24 additions & 7 deletions src/main/kotlin/no/nb/mlt/wls/product/service/ProductService.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package no.nb.mlt.wls.product.service

import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.reactor.awaitSingle
import kotlinx.coroutines.reactor.awaitSingleOrNull
import no.nb.mlt.wls.core.data.HostName
import no.nb.mlt.wls.product.model.Product
import no.nb.mlt.wls.product.payloads.ApiProductPayload
Expand All @@ -12,10 +15,14 @@ import org.springframework.http.ResponseEntity
import org.springframework.stereotype.Service
import org.springframework.web.server.ServerErrorException
import org.springframework.web.server.ServerWebInputException
import java.time.Duration
import java.util.concurrent.TimeoutException

private val logger = KotlinLogging.logger {}

@Service
class ProductService(val db: ProductRepository, val synqProductService: SynqProductService) {
fun save(payload: ApiProductPayload): ResponseEntity<ApiProductPayload> {
suspend fun save(payload: ApiProductPayload): ResponseEntity<ApiProductPayload> {
// Check if the payload is valid and throw an exception if it is not
throwIfInvalidPayload(payload)

Expand All @@ -37,11 +44,12 @@ class ProductService(val db: ProductRepository, val synqProductService: SynqProd
}

// Product service should save the product in the database, and return 500 if it fails
try {
db.save(product)
} catch (e: Exception) {
throw ServerErrorException("Failed to save product in the database, but created in the storage system", e)
}
db.save(product)
.timeout(Duration.ofSeconds(6))
.onErrorMap {
throw ServerErrorException("Failed to save product in the database, but created in the storage system", it)
}
.awaitSingle()

// Product service should return a 201 response if the product was created with created product in response body
return ResponseEntity.status(HttpStatus.CREATED).body(product.toApiPayload())
Expand All @@ -61,10 +69,19 @@ class ProductService(val db: ProductRepository, val synqProductService: SynqProd
}
}

fun getByHostNameAndId(
suspend fun getByHostNameAndId(
hostName: HostName,
name: String
): Product? {
// TODO - See if timeouts can be made configurable
return db.findByHostNameAndHostId(hostName, name)
.timeout(Duration.ofSeconds(8))
.doOnError {
if (it is TimeoutException) {
logger.error(it, { "Timed out while fetching from WLS database" })
}
}
.onErrorComplete(TimeoutException::class.java)
.awaitSingleOrNull()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,38 @@ package no.nb.mlt.wls.product.service

import no.nb.mlt.wls.core.data.synq.SynqError
import no.nb.mlt.wls.product.payloads.SynqProductPayload
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.http.HttpEntity
import org.springframework.http.HttpMethod
import org.springframework.http.HttpStatus
import org.springframework.http.ResponseEntity
import org.springframework.stereotype.Service
import org.springframework.web.client.HttpClientErrorException
import org.springframework.web.client.RestTemplate
import org.springframework.web.reactive.function.BodyInserters
import org.springframework.web.reactive.function.client.WebClient
import org.springframework.web.server.ServerErrorException
import java.net.URI

@Service
class SynqProductService {
val restTemplate: RestTemplate = RestTemplate()

class SynqProductService(
@Autowired val webClient: WebClient
) {
@Value("\${synq.path.base}")
lateinit var baseUrl: String

fun createProduct(payload: SynqProductPayload): ResponseEntity<SynqError> {
// NOTE - Could trust validation from product service? Or should this have some SynQ specific validation?
val uri = URI.create("$baseUrl/nbproducts")
try {
return restTemplate.exchange(uri, HttpMethod.POST, HttpEntity(payload), SynqError::class.java)
return ResponseEntity(
webClient
.post()
.uri(uri)
.body(BodyInserters.fromValue(payload))
.retrieve()
.bodyToMono(SynqError::class.java)
.block(),
HttpStatus.CREATED
)
} catch (exception: HttpClientErrorException) {
// Get SynQ error from the response body if possible
val errorBody = exception.getResponseBodyAs(SynqError::class.java)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package no.nb.mlt.wls.product.controller
import com.ninjasquad.springmockk.MockkBean
import io.mockk.every
import io.mockk.junit5.MockKExtension
import kotlinx.coroutines.reactor.awaitSingle
import kotlinx.coroutines.runBlocking
import no.nb.mlt.wls.EnableTestcontainers
import no.nb.mlt.wls.core.data.Environment.NONE
import no.nb.mlt.wls.core.data.HostName
Expand Down Expand Up @@ -35,6 +37,7 @@ import org.springframework.web.client.HttpClientErrorException
import org.springframework.web.server.ServerErrorException
import java.net.URI

// FIXME - Should respect couroutines
@EnableTestcontainers
@TestInstance(PER_CLASS)
@AutoConfigureWebTestClient
Expand Down Expand Up @@ -76,12 +79,14 @@ class ProductControllerTest(
.exchange()
.expectStatus().isCreated

val product = repository.findByHostNameAndHostId(testProductPayload.hostName, testProductPayload.hostId)
runBlocking {
val product = repository.findByHostNameAndHostId(testProductPayload.hostName, testProductPayload.hostId).awaitSingle()

assertThat(product)
.isNotNull
.extracting("description", "location", "quantity")
.containsExactly(testProductPayload.description, null, 0.0)
assertThat(product)
.isNotNull
.extracting("description", "location", "quantity")
.containsExactly(testProductPayload.description, null, 0.0)
}
}

@Test
Expand Down Expand Up @@ -124,6 +129,7 @@ class ProductControllerTest(
@Test
@WithMockUser
fun `createProduct where SynQ says it's a duplicate returns OK`() {
// SynqService converts an error to return OK if it finds a duplicate product
every {
synqProductService.createProduct(any())
} returns ResponseEntity.ok().build()
Expand Down Expand Up @@ -194,7 +200,8 @@ class ProductControllerTest(

fun populateDb() {
// Make sure we start with clean DB instance for each test
repository.deleteAll()
repository.save(duplicateProductPayload.toProduct())
runBlocking {
repository.deleteAll().then(repository.save(duplicateProductPayload.toProduct())).awaitSingle()
}
}
}
Loading

0 comments on commit 02568f5

Please sign in to comment.