Skip to content

Commit

Permalink
Adding Spotless to OpenTelemetry and Server module (#498)
Browse files Browse the repository at this point in the history
  • Loading branch information
javipacheco authored Oct 18, 2023
1 parent 3d216a3 commit d9aec25
Show file tree
Hide file tree
Showing 36 changed files with 1,133 additions and 1,281 deletions.
10 changes: 10 additions & 0 deletions integrations/opentelemetry/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ plugins {
alias(libs.plugins.arrow.gradle.publish)
alias(libs.plugins.semver.gradle)
alias(libs.plugins.detekt)
alias(libs.plugins.spotless)
}

dependencies { detektPlugins(project(":detekt-rules")) }
Expand Down Expand Up @@ -39,6 +40,15 @@ dependencies {
testRuntimeOnly(libs.kotest.junit5)
}

spotless {
kotlin {
target("**/*.kt")
ktfmt().googleStyle().configure {
it.setRemoveUnusedImport(true)
}
}
}

tasks {
withType<io.gitlab.arturbosch.detekt.Detekt>().configureEach {
dependsOn(":detekt-rules:assemble")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,40 +11,39 @@ import io.opentelemetry.sdk.trace.export.BatchSpanProcessor
import java.util.concurrent.TimeUnit

data class OpenTelemetryConfig(
val endpointConfig: String,
val defaultScopeName: String,
val serviceName: String
val endpointConfig: String,
val defaultScopeName: String,
val serviceName: String
) {

fun newInstance(): OpenTelemetry {
val jaegerOtlpExporter: OtlpGrpcSpanExporter = OtlpGrpcSpanExporter.builder()
.setEndpoint(endpointConfig)
.setTimeout(30, TimeUnit.SECONDS)
.build()

val serviceNameResource: Resource =
Resource.create(Attributes.of(AttributeKey.stringKey("service.name"), serviceName))

val tracerProvider = SdkTracerProvider.builder()
.addSpanProcessor(BatchSpanProcessor.builder(jaegerOtlpExporter).build())
.setResource(Resource.getDefault().merge(serviceNameResource))
.build()


val openTelemetry = OpenTelemetrySdk
.builder()
.setTracerProvider(tracerProvider)
.build()

Runtime.getRuntime().addShutdownHook(Thread { tracerProvider.close() })
return openTelemetry
}

companion object {
val DEFAULT = OpenTelemetryConfig(
endpointConfig = "http://localhost:4317",
defaultScopeName = "io.xef",
serviceName = "xef"
)
}
fun newInstance(): OpenTelemetry {
val jaegerOtlpExporter: OtlpGrpcSpanExporter =
OtlpGrpcSpanExporter.builder()
.setEndpoint(endpointConfig)
.setTimeout(30, TimeUnit.SECONDS)
.build()

val serviceNameResource: Resource =
Resource.create(Attributes.of(AttributeKey.stringKey("service.name"), serviceName))

val tracerProvider =
SdkTracerProvider.builder()
.addSpanProcessor(BatchSpanProcessor.builder(jaegerOtlpExporter).build())
.setResource(Resource.getDefault().merge(serviceNameResource))
.build()

val openTelemetry = OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).build()

Runtime.getRuntime().addShutdownHook(Thread { tracerProvider.close() })
return openTelemetry
}

companion object {
val DEFAULT =
OpenTelemetryConfig(
endpointConfig = "http://localhost:4317",
defaultScopeName = "io.xef",
serviceName = "xef"
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,61 +8,60 @@ import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.Tracer
import io.opentelemetry.context.Context

class OpenTelemetryMetric(
private val config: OpenTelemetryConfig = OpenTelemetryConfig.DEFAULT
) : Metric {
class OpenTelemetryMetric(private val config: OpenTelemetryConfig = OpenTelemetryConfig.DEFAULT) :
Metric {

private val conversations = mutableListOf<Pair<ConversationId, Context>>()
private val conversations = mutableListOf<Pair<ConversationId, Context>>()

private val openTelemetry = config.newInstance()
private val openTelemetry = config.newInstance()

override suspend fun <A> promptSpan(conversation: Conversation, prompt: Prompt, block: suspend Metric.() -> A): A {
val cid = conversation.conversationId ?: return block()
override suspend fun <A> promptSpan(
conversation: Conversation,
prompt: Prompt,
block: suspend Metric.() -> A
): A {
val cid = conversation.conversationId ?: return block()

val parentContext = cid.getParentConversation()
val parentContext = cid.getParentConversation()

val span = getTracer()
.spanBuilder("Prompt: ${prompt.messages.lastOrNull()?.content ?: "empty"}")
.setParent(parentContext)
.startSpan()
val span =
getTracer()
.spanBuilder("Prompt: ${prompt.messages.lastOrNull()?.content ?: "empty"}")
.setParent(parentContext)
.startSpan()

return try {
val output = block()
span.makeCurrent().use {
span.setAttribute("number-of-messages", prompt.messages.count().toString())
span.setAttribute("last-message", prompt.messages.lastOrNull()?.content ?: "empty")
}
output
} finally {
span.end()
}
return try {
val output = block()
span.makeCurrent().use {
span.setAttribute("number-of-messages", prompt.messages.count().toString())
span.setAttribute("last-message", prompt.messages.lastOrNull()?.content ?: "empty")
}
output
} finally {
span.end()
}
}

override fun log(conversation: Conversation, message: String) {
val cid = conversation.conversationId ?: return
override fun log(conversation: Conversation, message: String) {
val cid = conversation.conversationId ?: return

val parentContext = cid.getParentConversation()
val parentContext = cid.getParentConversation()

val span: Span = getTracer().spanBuilder(message)
.setParent(parentContext)
.startSpan()
span.end()
}

private fun ConversationId.getParentConversation(): Context {
val parent = conversations.find { it.first == this }?.second
return if (parent == null) {
val newParent = getTracer()
.spanBuilder(value)
.startSpan()
newParent.end()
val newContext = Context.current().with(newParent)
conversations.add(this to newContext)
newContext
} else parent
}
val span: Span = getTracer().spanBuilder(message).setParent(parentContext).startSpan()
span.end()
}

private fun getTracer(scopeName: String? = null): Tracer =
openTelemetry.getTracer(scopeName ?: config.defaultScopeName)
private fun ConversationId.getParentConversation(): Context {
val parent = conversations.find { it.first == this }?.second
return if (parent == null) {
val newParent = getTracer().spanBuilder(value).startSpan()
newParent.end()
val newContext = Context.current().with(newParent)
conversations.add(this to newContext)
newContext
} else parent
}

private fun getTracer(scopeName: String? = null): Tracer =
openTelemetry.getTracer(scopeName ?: config.defaultScopeName)
}
10 changes: 10 additions & 0 deletions server/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ plugins {
alias(libs.plugins.node.gradle)
alias(libs.plugins.arrow.gradle.publish)
alias(libs.plugins.semver.gradle)
alias(libs.plugins.spotless)
}

repositories {
Expand Down Expand Up @@ -68,6 +69,15 @@ dependencies {
testRuntimeOnly(libs.kotest.junit5)
}

spotless {
kotlin {
target("**/*.kt")
ktfmt().googleStyle().configure {
it.setRemoveUnusedImport(true)
}
}
}

tasks.getByName<Copy>("processResources") {
dependsOn(projects.xefGpt4all.dependencyProject.tasks.getByName("jvmProcessResources"))
from("${projects.xefGpt4all.dependencyProject.buildDir}/processedResources/jvm/main")
Expand Down
99 changes: 48 additions & 51 deletions server/src/main/kotlin/com/xebia/functional/xef/server/Server.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import com.xebia.functional.xef.server.services.RepositoryService
import io.ktor.client.*
import io.ktor.client.engine.cio.*
import io.ktor.client.plugins.auth.*
import io.ktor.client.plugins.contentnegotiation.ContentNegotiation as ClientContentNegotiation
import io.ktor.client.plugins.logging.*
import io.ktor.http.*
import io.ktor.serialization.kotlinx.json.*
Expand All @@ -27,63 +28,59 @@ import io.ktor.server.routing.*
import kotlinx.coroutines.awaitCancellation
import org.jetbrains.exposed.sql.Database
import org.slf4j.LoggerFactory
import io.ktor.client.plugins.contentnegotiation.ContentNegotiation as ClientContentNegotiation

object Server {
@JvmStatic
fun main(args: Array<String>) = SuspendApp {
resourceScope {
val config = ConfigFactory.load("database.conf").resolve()
val xefDBConfig = XefDatabaseConfig.load("xef", config)
Migrate.migrate(xefDBConfig)

val logger = LoggerFactory.getLogger("xef-server")
@JvmStatic
fun main(args: Array<String>) = SuspendApp {
resourceScope {
val config = ConfigFactory.load("database.conf").resolve()
val xefDBConfig = XefDatabaseConfig.load("xef", config)
Migrate.migrate(xefDBConfig)

val hikariDataSourceXefDB = RepositoryService.getHikariDataSource(
xefDBConfig.getUrl(),
xefDBConfig.user,
xefDBConfig.password
)
Database.connect(hikariDataSourceXefDB)
val vectorStoreConfig = XefVectorStoreConfig.load("xef-vector-store", config)
val vectorStoreService = vectorStoreConfig.getVectorStoreService(config, logger)
vectorStoreService.addCollection()
val logger = LoggerFactory.getLogger("xef-server")

val hikariDataSourceXefDB =
RepositoryService.getHikariDataSource(
xefDBConfig.getUrl(),
xefDBConfig.user,
xefDBConfig.password
)
Database.connect(hikariDataSourceXefDB)
val vectorStoreConfig = XefVectorStoreConfig.load("xef-vector-store", config)
val vectorStoreService = vectorStoreConfig.getVectorStoreService(config, logger)
vectorStoreService.addCollection()

val ktorClient = HttpClient(CIO) {
engine {
requestTimeout = 0 // disabled
}
install(Auth)
install(Logging) {
level = LogLevel.INFO
}
install(ClientContentNegotiation)
}
val ktorClient =
HttpClient(CIO) {
engine {
requestTimeout = 0 // disabled
}
install(Auth)
install(Logging) { level = LogLevel.INFO }
install(ClientContentNegotiation)
}

server(factory = Netty, port = 8081, host = "0.0.0.0") {
install(CORS) {
allowNonSimpleContentTypes = true
HttpMethod.DefaultMethods.forEach { allowMethod(it) }
allowHeaders { true }
anyHost()
}
install(ContentNegotiation) { json() }
install(Resources)
install(Authentication) {
bearer("auth-bearer") {
authenticate { tokenCredential ->
UserIdPrincipal(tokenCredential.token)
}
}
}
exceptionsHandler()
routing {
xefRoutes(logger)
aiRoutes(ktorClient)
}
}
awaitCancellation()
server(factory = Netty, port = 8081, host = "0.0.0.0") {
install(CORS) {
allowNonSimpleContentTypes = true
HttpMethod.DefaultMethods.forEach { allowMethod(it) }
allowHeaders { true }
anyHost()
}
install(ContentNegotiation) { json() }
install(Resources)
install(Authentication) {
bearer("auth-bearer") {
authenticate { tokenCredential -> UserIdPrincipal(tokenCredential.token) }
}
}
exceptionsHandler()
routing {
xefRoutes(logger)
aiRoutes(ktorClient)
}
}
awaitCancellation()
}
}
}
17 changes: 6 additions & 11 deletions server/src/main/kotlin/com/xebia/functional/xef/server/Web.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,9 @@ import io.ktor.server.routing.*

object WebApp {

@JvmStatic
fun main(args: Array<String>) {
embeddedServer(Netty, port = 8080) {
routing {
singlePageApplication {
react("web/dist")
}
}
}.start(wait = true)
}
}
@JvmStatic
fun main(args: Array<String>) {
embeddedServer(Netty, port = 8080) { routing { singlePageApplication { react("web/dist") } } }
.start(wait = true)
}
}
Loading

0 comments on commit d9aec25

Please sign in to comment.