Skip to content

Commit

Permalink
Add logs for subscribe request
Browse files Browse the repository at this point in the history
  • Loading branch information
jakubuid committed Aug 9, 2024
1 parent 4b5167e commit 2b37fbd
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,16 @@ import org.koin.core.KoinApplication

@OptIn(ExperimentalCoroutinesApi::class)
abstract class BaseRelayClient : RelayInterface {
private var foundationKoinApp: KoinApplication = KoinApplication.init()
lateinit var relayService: RelayService
protected var logger: Logger
private val resultState: MutableSharedFlow<RelayDTO> = MutableSharedFlow()

init {
foundationKoinApp.run { modules(foundationCommonModule()) }
logger = foundationKoinApp.koin.get()
}
private var foundationKoinApp: KoinApplication = KoinApplication.init()
lateinit var relayService: RelayService
protected var logger: Logger
private val resultState: MutableSharedFlow<RelayDTO> = MutableSharedFlow()
var isLoggingEnabled: Boolean = false

init {
foundationKoinApp.run { modules(foundationCommonModule()) }
logger = foundationKoinApp.koin.get()
}

fun observeResults() {
scope.launch {
Expand All @@ -63,26 +64,26 @@ abstract class BaseRelayClient : RelayInterface {
}
}

override val eventsFlow: SharedFlow<Relay.Model.Event> by lazy {
relayService
.observeWebSocketEvent()
.map { event -> event.toRelayEvent() }
.shareIn(scope, SharingStarted.Lazily, REPLAY)
}
override val eventsFlow: SharedFlow<Relay.Model.Event> by lazy {
relayService
.observeWebSocketEvent()
.map { event -> event.toRelayEvent() }
.shareIn(scope, SharingStarted.Lazily, REPLAY)
}

override val subscriptionRequest: Flow<Relay.Model.Call.Subscription.Request> by lazy {
relayService.observeSubscriptionRequest()
.map { request -> request.toRelay() }
.onEach { relayRequest -> supervisorScope { publishSubscriptionAcknowledgement(relayRequest.id) } }
}
override val subscriptionRequest: Flow<Relay.Model.Call.Subscription.Request> by lazy {
relayService.observeSubscriptionRequest()
.map { request -> request.toRelay() }
.onEach { relayRequest -> supervisorScope { publishSubscriptionAcknowledgement(relayRequest.id) } }
}

@ExperimentalCoroutinesApi
override fun publish(
topic: String,
message: String,
params: Relay.Model.IrnParams,
id: Long?,
onResult: (Result<Relay.Model.Call.Publish.Acknowledgement>) -> Unit,
topic: String,
message: String,
params: Relay.Model.IrnParams,
id: Long?,
onResult: (Result<Relay.Model.Call.Publish.Acknowledgement>) -> Unit,
) {
val (tag, ttl, prompt) = params
val publishParams = RelayDTO.Publish.Request.Params(Topic(topic), message, Ttl(ttl), tag, prompt)
Expand All @@ -97,15 +98,15 @@ abstract class BaseRelayClient : RelayInterface {
try {
withTimeout(RESULT_TIMEOUT) {
resultState
.filterIsInstance<RelayDTO.Publish.Result>()
.filter { relayResult -> relayResult.id == id }
.first { publishResult ->
when (publishResult) {
is RelayDTO.Publish.Result.Acknowledgement -> onResult(Result.success(publishResult.toRelay()))
is RelayDTO.Publish.Result.JsonRpcError -> onResult(Result.failure(Throwable(publishResult.error.errorMessage)))
}
true
.filterIsInstance<RelayDTO.Publish.Result>()
.filter { relayResult -> relayResult.id == id }
.first { publishResult ->
when (publishResult) {
is RelayDTO.Publish.Result.Acknowledgement -> onResult(Result.success(publishResult.toRelay()))
is RelayDTO.Publish.Result.JsonRpcError -> onResult(Result.failure(Throwable(publishResult.error.errorMessage)))
}
true
}
}
} catch (e: TimeoutCancellationException) {
onResult(Result.failure(e))
Expand All @@ -121,6 +122,10 @@ abstract class BaseRelayClient : RelayInterface {
override fun subscribe(topic: String, id: Long?, onResult: (Result<Relay.Model.Call.Subscribe.Acknowledgement>) -> Unit) {
val subscribeRequest = RelayDTO.Subscribe.Request(id = id ?: generateClientToServerId(), params = RelayDTO.Subscribe.Request.Params(Topic(topic)))

if (isLoggingEnabled) {
logger.log("Sending SubscribeRequest: $subscribeRequest")
}

observeSubscribeResult(subscribeRequest.id, onResult)
relayService.subscribeRequest(subscribeRequest)
}
Expand All @@ -130,15 +135,16 @@ abstract class BaseRelayClient : RelayInterface {
try {
withTimeout(RESULT_TIMEOUT) {
resultState
.filterIsInstance<RelayDTO.Subscribe.Result>()
.filter { relayResult -> relayResult.id == id }
.first { subscribeResult ->
when (subscribeResult) {
is RelayDTO.Subscribe.Result.Acknowledgement -> onResult(Result.success(subscribeResult.toRelay()))
is RelayDTO.Subscribe.Result.JsonRpcError -> onResult(Result.failure(Throwable(subscribeResult.error.errorMessage)))
}
true
.filterIsInstance<RelayDTO.Subscribe.Result>()
.onEach {relayResult -> if (isLoggingEnabled) logger.log("SubscribeResult: $relayResult") }
.filter { relayResult -> relayResult.id == id }
.first { subscribeResult ->
when (subscribeResult) {
is RelayDTO.Subscribe.Result.Acknowledgement -> onResult(Result.success(subscribeResult.toRelay()))
is RelayDTO.Subscribe.Result.JsonRpcError -> onResult(Result.failure(Throwable(subscribeResult.error.errorMessage)))
}
true
}
}
} catch (e: TimeoutCancellationException) {
onResult(Result.failure(e))
Expand All @@ -163,15 +169,15 @@ abstract class BaseRelayClient : RelayInterface {
try {
withTimeout(RESULT_TIMEOUT) {
resultState
.filterIsInstance<RelayDTO.BatchSubscribe.Result>()
.filter { relayResult -> relayResult.id == id }
.first { batchSubscribeResult ->
when (batchSubscribeResult) {
is RelayDTO.BatchSubscribe.Result.Acknowledgement -> onResult(Result.success(batchSubscribeResult.toRelay()))
is RelayDTO.BatchSubscribe.Result.JsonRpcError -> onResult(Result.failure(Throwable(batchSubscribeResult.error.errorMessage)))
}
true
.filterIsInstance<RelayDTO.BatchSubscribe.Result>()
.filter { relayResult -> relayResult.id == id }
.first { batchSubscribeResult ->
when (batchSubscribeResult) {
is RelayDTO.BatchSubscribe.Result.Acknowledgement -> onResult(Result.success(batchSubscribeResult.toRelay()))
is RelayDTO.BatchSubscribe.Result.JsonRpcError -> onResult(Result.failure(Throwable(batchSubscribeResult.error.errorMessage)))
}
true
}
}
} catch (e: TimeoutCancellationException) {
onResult(Result.failure(e))
Expand All @@ -185,14 +191,14 @@ abstract class BaseRelayClient : RelayInterface {

@ExperimentalCoroutinesApi
override fun unsubscribe(
topic: String,
subscriptionId: String,
id: Long?,
onResult: (Result<Relay.Model.Call.Unsubscribe.Acknowledgement>) -> Unit,
topic: String,
subscriptionId: String,
id: Long?,
onResult: (Result<Relay.Model.Call.Unsubscribe.Acknowledgement>) -> Unit,
) {
val unsubscribeRequest = RelayDTO.Unsubscribe.Request(
id = id ?: generateClientToServerId(),
params = RelayDTO.Unsubscribe.Request.Params(Topic(topic), SubscriptionId(subscriptionId))
id = id ?: generateClientToServerId(),
params = RelayDTO.Unsubscribe.Request.Params(Topic(topic), SubscriptionId(subscriptionId))
)

observeUnsubscribeResult(unsubscribeRequest.id, onResult)
Expand All @@ -204,15 +210,15 @@ abstract class BaseRelayClient : RelayInterface {
try {
withTimeout(RESULT_TIMEOUT) {
resultState
.filterIsInstance<RelayDTO.Unsubscribe.Result>()
.filter { relayResult -> relayResult.id == id }
.first { unsubscribeResult ->
when (unsubscribeResult) {
is RelayDTO.Unsubscribe.Result.Acknowledgement -> onResult(Result.success(unsubscribeResult.toRelay()))
is RelayDTO.Unsubscribe.Result.JsonRpcError -> onResult(Result.failure(Throwable(unsubscribeResult.error.errorMessage)))
}
true
.filterIsInstance<RelayDTO.Unsubscribe.Result>()
.filter { relayResult -> relayResult.id == id }
.first { unsubscribeResult ->
when (unsubscribeResult) {
is RelayDTO.Unsubscribe.Result.Acknowledgement -> onResult(Result.success(unsubscribeResult.toRelay()))
is RelayDTO.Unsubscribe.Result.JsonRpcError -> onResult(Result.failure(Throwable(unsubscribeResult.error.errorMessage)))
}
true
}
}
} catch (e: TimeoutCancellationException) {
onResult(Result.failure(e))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ class RelayTest {
startLoggingClientEventsFlow(clientA, "ClientA")
startLoggingClientEventsFlow(clientB, "ClientB")

clientA.isLoggingEnabled = true
clientB.isLoggingEnabled = true

return (clientA to clientB)
}

Expand Down Expand Up @@ -220,7 +223,9 @@ class RelayTest {
delay(10)
}

if (didTimeout(start, 50000L)) { throw Exception("Unable to establish socket connection") }
if (didTimeout(start, 50000L)) {
throw Exception("Unable to establish socket connection")
}

clientAJob.cancel()
clientBJob.cancel()
Expand Down

0 comments on commit 2b37fbd

Please sign in to comment.