Skip to content

Commit

Permalink
Implement message registry and minor bug fixes.
Browse files Browse the repository at this point in the history
  • Loading branch information
DivineThreepwood committed Dec 26, 2023
1 parent c4a8d5c commit 91461f4
Show file tree
Hide file tree
Showing 36 changed files with 1,424 additions and 648 deletions.
2 changes: 1 addition & 1 deletion lib/type
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ abstract class AbstractBCOGraphQLContext(
abstract val languageCode: String?

val auth: AuthTokenType.AuthToken?
get() = AuthTokenType.AuthToken.newBuilder().setAuthenticationToken(token).build()
get() = token?.let { AuthTokenType.AuthToken.newBuilder().setAuthenticationToken(it).build() }

companion object {
const val DATA_LOADER_UNITS = "units"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import org.openbase.bco.api.graphql.error.GenericError
import org.openbase.bco.api.graphql.error.ServerError
import org.openbase.bco.authentication.lib.SessionManager
import org.openbase.bco.authentication.lib.iface.BCOSession
import org.openbase.bco.registry.message.remote.registerUserMessageAuthenticated
import org.openbase.bco.registry.message.remote.removeUserMessageAuthenticated
import org.openbase.bco.registry.message.remote.updateUserMessageAuthenticated
import org.openbase.bco.registry.remote.Registries
import org.openbase.bco.registry.remote.session.BCOSessionImpl
import org.openbase.bco.registry.unit.remote.registerUnitConfigAuthenticated
Expand Down Expand Up @@ -407,7 +410,7 @@ class RegistrySchemaModule : SchemaModule() {
break
}
}
if (!entry.value.isEmpty()) {
if (entry.value.isNotEmpty()) {
metaConfigBuilder.addEntry(entry)
}

Expand All @@ -425,6 +428,92 @@ class RegistrySchemaModule : SchemaModule() {
throw GenericError(ex)
}

@Mutation("updateUserMessage")
@Throws(BCOGraphQLError::class)
fun updateUserMessage(
@Arg("userMessage") userMessage: UserMessage,
env: DataFetchingEnvironment,
): UserMessage = try {
val userMessageBuilder = Registries.getMessageRegistry(
ServerError.BCO_TIMEOUT_SHORT,
ServerError.BCO_TIMEOUT_TIME_UNIT
)
.getUserMessageById(userMessage.id)
.toBuilder()
userMessageBuilder.mergeFromWithoutRepeatedFields(userMessage)
Registries.getMessageRegistry(
ServerError.BCO_TIMEOUT_SHORT,
ServerError.BCO_TIMEOUT_TIME_UNIT
).updateUserMessageAuthenticated(
userMessageBuilder.build(),
env.context.auth
)[ServerError.BCO_TIMEOUT_SHORT, ServerError.BCO_TIMEOUT_TIME_UNIT]
} catch (ex: RuntimeException) {

Check warning

Code scanning / detekt

Caught exception is too generic. Prefer catching specific exceptions to the case that is currently handled. Warning

Caught exception is too generic. Prefer catching specific exceptions to the case that is currently handled.
throw GenericError(ex)
} catch (ex: CouldNotPerformException) {
throw GenericError(ex)
} catch (ex: InterruptedException) {
throw GenericError(ex)
} catch (ex: ExecutionException) {
throw GenericError(ex)
} catch (ex: TimeoutException) {
throw GenericError(ex)
}

@Mutation("removeUserMessage")
@Throws(BCOGraphQLError::class)
fun removeUserMessage(
@Arg("unitId") unitId: String?,
env: DataFetchingEnvironment,
): UserMessage = try {
val userMessage = Registries.getMessageRegistry(
ServerError.BCO_TIMEOUT_SHORT,
ServerError.BCO_TIMEOUT_TIME_UNIT
).getUserMessageById(unitId)
Registries.getMessageRegistry(
ServerError.BCO_TIMEOUT_SHORT,
ServerError.BCO_TIMEOUT_TIME_UNIT
).removeUserMessageAuthenticated(
userMessage,
env.context.auth
)[ServerError.BCO_TIMEOUT_SHORT, ServerError.BCO_TIMEOUT_TIME_UNIT]
} catch (ex: RuntimeException) {

Check warning

Code scanning / detekt

Caught exception is too generic. Prefer catching specific exceptions to the case that is currently handled. Warning

Caught exception is too generic. Prefer catching specific exceptions to the case that is currently handled.
throw GenericError(ex)
} catch (ex: CouldNotPerformException) {
throw GenericError(ex)
} catch (ex: InterruptedException) {
throw GenericError(ex)
} catch (ex: ExecutionException) {
throw GenericError(ex)
} catch (ex: TimeoutException) {
throw GenericError(ex)
}

@Mutation("registerUserMessage")
@Throws(BCOGraphQLError::class)
fun registerUserMessage(
@Arg("userMessage") userMessage: UserMessage?,
env: DataFetchingEnvironment,
): UserMessage = try {
Registries.getMessageRegistry(
ServerError.BCO_TIMEOUT_SHORT,
ServerError.BCO_TIMEOUT_TIME_UNIT
).registerUserMessageAuthenticated(
userMessage,
env.context.auth
)[ServerError.BCO_TIMEOUT_SHORT, ServerError.BCO_TIMEOUT_TIME_UNIT]
} catch (ex: RuntimeException) {

Check warning

Code scanning / detekt

Caught exception is too generic. Prefer catching specific exceptions to the case that is currently handled. Warning

Caught exception is too generic. Prefer catching specific exceptions to the case that is currently handled.
throw GenericError(ex)
} catch (ex: CouldNotPerformException) {
throw GenericError(ex)
} catch (ex: InterruptedException) {
throw GenericError(ex)
} catch (ex: ExecutionException) {
throw GenericError(ex)
} catch (ex: TimeoutException) {
throw GenericError(ex)
}

companion object {
@Throws(BCOGraphQLError::class)
fun getUnitConfigs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import org.openbase.bco.api.graphql.error.GenericError
import org.openbase.bco.api.graphql.error.ServerError
import org.openbase.bco.api.graphql.schema.RegistrySchemaModule
import org.openbase.bco.dal.lib.layer.unit.Unit
import org.openbase.bco.dal.lib.layer.unit.UnitRemote
import org.openbase.bco.dal.remote.layer.unit.CustomUnitPool
import org.openbase.bco.registry.remote.Registries
import org.openbase.jul.exception.CouldNotPerformException
Expand All @@ -20,7 +21,6 @@ import org.openbase.type.domotic.unit.UnitDataType
import org.openbase.type.domotic.unit.UnitFilterType.UnitFilter
import org.reactivestreams.Publisher
import org.slf4j.LoggerFactory
import java.util.function.Consumer

/*-
* #%L
Expand Down Expand Up @@ -51,15 +51,15 @@ import java.util.function.Consumer
@Throws(BCOGraphQLError::class)
fun subscribeUnits(unitFilter: UnitFilter): Publisher<UnitDataType.UnitData> {
return try {
val subscriptionUnitPool = CustomUnitPool()
val subscriptionUnitPool = CustomUnitPool<Message, UnitRemote<Message>>()
subscriptionUnitPool.init(unitFilter)
AbstractObserverMapper.createObservable(
Consumer { observer: Observer<Unit<Message>, Message> ->
{ observer: Observer<Unit<Message>, Message> ->
subscriptionUnitPool.addDataObserver(
observer
)
},
Consumer { observer: Observer<Unit<Message>, Message> ->
{ observer: Observer<Unit<Message>, Message> ->
subscriptionUnitPool.removeDataObserver(observer)
},
object : AbstractObserverMapper<Unit<Message>, Message, UnitDataType.UnitData>() {
Expand Down Expand Up @@ -100,12 +100,12 @@ import java.util.function.Consumer
ServerError.BCO_TIMEOUT_TIME_UNIT
)
AbstractObserverMapper.createObservable(
Consumer { observer: Observer<DataProvider<UnitRegistryData>, UnitRegistryData> ->
{ observer: Observer<DataProvider<UnitRegistryData>, UnitRegistryData> ->
unitRegistry.addDataObserver(
observer
)
},
Consumer { observer: Observer<DataProvider<UnitRegistryData>, UnitRegistryData> ->
{ observer: Observer<DataProvider<UnitRegistryData>, UnitRegistryData> ->
unitRegistry.removeDataObserver(observer)
},
observer
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package org.openbase.bco.app.preset

import org.openbase.bco.dal.control.layer.unit.app.AbstractAppController
import org.openbase.bco.dal.lib.layer.service.ServiceStateProcessor
import org.openbase.bco.dal.remote.layer.unit.BatteryRemote
import org.openbase.bco.dal.remote.layer.unit.CustomUnitPool
import org.openbase.bco.registry.message.remote.registerUserMessageAuthenticated
import org.openbase.bco.registry.remote.Registries
import org.openbase.jul.exception.CouldNotPerformException
import org.openbase.jul.exception.printer.ExceptionPrinter
import org.openbase.jul.extension.type.processing.LabelProcessor
import org.openbase.jul.extension.type.processing.MultiLanguageTextProcessor
import org.openbase.jul.extension.type.processing.TimestampProcessor
import org.openbase.jul.schedule.GlobalScheduledExecutorService
import org.openbase.type.domotic.action.ActionDescriptionType.ActionDescription
import org.openbase.type.domotic.communication.UserMessageType.UserMessage
import org.openbase.type.domotic.service.ServiceStateDescriptionType.ServiceStateDescription
import org.openbase.type.domotic.service.ServiceTemplateType.ServiceTemplate.ServiceType
import org.openbase.type.domotic.state.ActivationStateType.ActivationState
import org.openbase.type.domotic.state.BatteryStateType.BatteryState
import org.openbase.type.domotic.unit.UnitConfigType.UnitConfig
import org.openbase.type.domotic.unit.UnitFilterType.UnitFilter
import org.openbase.type.domotic.unit.UnitTemplateType.UnitTemplate.UnitType
import org.openbase.type.domotic.unit.dal.BatteryDataType.BatteryData
import org.openbase.type.language.MultiLanguageTextType.MultiLanguageText
import org.slf4j.LoggerFactory
import java.time.Duration
import java.util.*
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit

/**
* An app that notifies users about devices with empty batteries or offline states.
*/
class DeviceNotificationApp : AbstractAppController() {

private val batteryPool = CustomUnitPool<BatteryData, BatteryRemote>()
private var task: ScheduledFuture<*>? = null

init {
batteryPool.init(
UnitFilter.newBuilder().setProperties(UnitConfig.newBuilder().setUnitType(UnitType.BATTERY)).build()
)
}

@Throws(CouldNotPerformException::class, InterruptedException::class)
override fun execute(activationState: ActivationState): ActionDescription =
activationState.responsibleAction.also {
batteryPool.activate()
task?.cancel(false)
task = GlobalScheduledExecutorService.scheduleWithFixedDelay(
::checkDevices,
INITIAL_VALIDATION_DELAY.toMillis(),
VALIDATION_PERIOD.toMillis(),
TimeUnit.MILLISECONDS
)
LOGGER.trace(getLabel() + " is running.")
}

@Throws(InterruptedException::class, CouldNotPerformException::class)
override fun stop(activationState: ActivationState) =
super.stop(activationState).also {
batteryPool.deactivate()
task?.cancel(true)
LOGGER.trace(getLabel() + " has been stopped.")
}

fun checkDevices() {

Check warning

Code scanning / detekt

One method should have one responsibility. Long methods tend to handle many things at once. Prefer smaller methods to make them easier to understand. Warning

The function checkDevices is too long (69). The maximum length is 60.
try {
logger.error("#@# check device states")
batteryPool.internalUnitList
.onEach { remote -> remote.waitForData() }
.filter { remote ->
when (remote.data.batteryState.value) {
BatteryState.State.LOW, BatteryState.State.CRITICAL, BatteryState.State.UNKNOWN -> true
else -> false
}
}
.map { remote ->
val messageBuilder: UserMessage.Builder = UserMessage.newBuilder()
val textBuilder: MultiLanguageText.Builder = MultiLanguageText.newBuilder()

MultiLanguageTextProcessor.addMultiLanguageText(
textBuilder,
Locale.ENGLISH,
"Battery level of ${
LabelProcessor.getBestMatch(
Locale.ENGLISH,
remote.config.label
)
} is ${remote.data.batteryState.value}"
)

MultiLanguageTextProcessor.addMultiLanguageText(
textBuilder,
Locale.GERMAN,
"Batteriezustand von ${
LabelProcessor.getBestMatch(
Locale.GERMAN,
remote.config.label
)
} ist ${remote.data.batteryState.value}"
)

messageBuilder.id = UUID.randomUUID().toString()
messageBuilder.messageType = UserMessage.MessageType.WARNING
messageBuilder.timestamp = TimestampProcessor.getCurrentTimestamp()
messageBuilder.text = textBuilder.build()
messageBuilder.senderId = userConfig.id
messageBuilder.addCondition(
with(ServiceStateDescription.newBuilder()) {
setUnitId(remote.id)
setServiceType(ServiceType.BATTERY_STATE_SERVICE)
setServiceStateClassName(BatteryState::class.java.name)
setServiceState(
ServiceStateProcessor.serializeServiceState(
BatteryState.newBuilder().setValue(remote.data.batteryState.value).build(),
false,
)
)
}.also { ServiceStateProcessor.deserializeServiceState(it) }
)

// validate if message already exist
messageBuilder.build() to Registries.getMessageRegistry()
.getUserMessagesByText(
MultiLanguageTextProcessor.getBestMatch(
Locale.ENGLISH, messageBuilder.text
), Locale.ENGLISH
).isNotEmpty()
}
.filterNot { (_, exist) -> exist }
.forEach { (message, _) ->
Registries.getMessageRegistry()
.registerUserMessageAuthenticated(message, token)
.get(5, TimeUnit.SECONDS)

Check warning

Code scanning / detekt

Report magic numbers. Magic number is a numeric literal that is not defined as a constant and hence it's unclear what the purpose of this number is. It's better to declare such numbers as constants and give them a proper name. By default, -1, 0, 1, and 2 are not considered to be magic numbers. Warning

This expression contains a magic number. Consider defining it to a well named constant.
}
} catch (e: CouldNotPerformException) {
ExceptionPrinter.printHistory("Could not check device states!", e, logger)
}
}

companion object {
private val LOGGER = LoggerFactory.getLogger(TemplateApp::class.java)

private val VALIDATION_PERIOD: Duration = Duration.ofHours(24)
private val INITIAL_VALIDATION_DELAY: Duration = Duration.ofHours(1)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package org.openbase.bco.app.preset

import org.openbase.bco.dal.control.layer.unit.app.AbstractAppController
import org.openbase.jul.exception.CouldNotPerformException
import org.openbase.type.domotic.action.ActionDescriptionType.ActionDescription
import org.openbase.type.domotic.state.ActivationStateType
import org.openbase.type.domotic.state.ActivationStateType.ActivationState
import org.openbase.type.domotic.unit.UnitConfigType
import org.slf4j.LoggerFactory

/*
* #%L
* BCO App Preset
* %%
* Copyright (C) 2018 - 2021 openbase.org
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/gpl-3.0.html>.
* #L%
*/ /**
* A class that can be used as template for new apps.
*/
class TemplateApp : AbstractAppController() {

private var executing = false

@Throws(CouldNotPerformException::class, InterruptedException::class)
override fun applyConfigUpdate(config: UnitConfigType.UnitConfig): UnitConfigType.UnitConfig =
getManageWriteLockInterruptible(this).use {
super.applyConfigUpdate(config).also {
LOGGER.info(getLabel() + " config has been changed.")
}
}

override fun shutdown() = super.shutdown().also {
LOGGER.info(getLabel() + " is shutting down.")
}

@Throws(CouldNotPerformException::class, InterruptedException::class)
override fun execute(activationState: ActivationStateType.ActivationState): ActionDescription =
activationState.responsibleAction.also {
executing = true
LOGGER.info(getLabel() + " is running.")
}

@Throws(InterruptedException::class, CouldNotPerformException::class)
override fun stop(activationState: ActivationState) =
super.stop(activationState).also {
executing = false
LOGGER.info(getLabel() + " has been stopped.")
}

companion object {
private val LOGGER = LoggerFactory.getLogger(TemplateApp::class.java)
}
}
Loading

0 comments on commit 91461f4

Please sign in to comment.