Skip to content

Commit

Permalink
OpenTelemetry improved (#756)
Browse files Browse the repository at this point in the history
* OpenTelemetry improved

* JC comment addressed

* Parameters in customSpan for Metrics

* Metrics in private function
  • Loading branch information
javipacheco authored Jun 7, 2024
1 parent 0920656 commit 5621c97
Show file tree
Hide file tree
Showing 9 changed files with 271 additions and 300 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.xebia.functional.xef.llm

import com.xebia.functional.openai.generated.model.CreateChatCompletionResponse
import com.xebia.functional.openai.generated.model.MessageObject
import com.xebia.functional.openai.generated.model.RunObject
import com.xebia.functional.openai.generated.model.RunStepObject
import com.xebia.functional.xef.conversation.Conversation
Expand Down Expand Up @@ -60,34 +61,48 @@ suspend fun Prompt.addMetrics(conversation: Conversation) {
conversation.metric.parameter("openai.chat_completion.functions", functions.map { it.name })
}

suspend fun RunObject.addMetrics(metric: Metric): RunObject {
metric.assistantCreateRun(this)
suspend fun RunObject.addMetrics(metric: Metric, source: String): RunObject {
metric.assistantCreateRun(this, source)
return this
}

suspend fun RunStepObject.addMetrics(metric: Metric): RunStepObject {
metric.assistantCreateRunStep(this)
suspend fun RunStepObject.addMetrics(metric: Metric, source: String): RunStepObject {
metric.assistantCreateRunStep(this, source)
return this
}

suspend fun MessageObject.addMetrics(metric: Metric, source: String): MessageObject {
metric.assistantCreatedMessage(this, source)
return this
}

suspend fun RunDelta.addMetrics(metric: Metric): RunDelta {
when (this) {
is RunDelta.RunCancelled -> run.addMetrics(metric)
is RunDelta.RunCancelling -> run.addMetrics(metric)
is RunDelta.RunCompleted -> run.addMetrics(metric)
is RunDelta.RunCreated -> run.addMetrics(metric)
is RunDelta.RunExpired -> run.addMetrics(metric)
is RunDelta.RunFailed -> run.addMetrics(metric)
is RunDelta.RunInProgress -> run.addMetrics(metric)
is RunDelta.RunQueued -> run.addMetrics(metric)
is RunDelta.RunRequiresAction -> run.addMetrics(metric)
is RunDelta.RunStepCancelled -> runStep.addMetrics(metric)
is RunDelta.RunStepCompleted -> runStep.addMetrics(metric)
is RunDelta.RunStepCreated -> runStep.addMetrics(metric)
is RunDelta.RunStepExpired -> runStep.addMetrics(metric)
is RunDelta.RunStepFailed -> runStep.addMetrics(metric)
is RunDelta.RunStepInProgress -> runStep.addMetrics(metric)
else -> {} // ignore other cases
is RunDelta.RunCancelled -> run.addMetrics(metric, "RunCancelled")
is RunDelta.RunCancelling -> run.addMetrics(metric, "RunCancelling")
is RunDelta.RunCompleted -> run.addMetrics(metric, "RunCompleted")
is RunDelta.RunCreated -> run.addMetrics(metric, "RunCreated")
is RunDelta.RunExpired -> run.addMetrics(metric, "RunExpired")
is RunDelta.RunFailed -> run.addMetrics(metric, "RunFailed")
is RunDelta.RunInProgress -> run.addMetrics(metric, "RunInProgress")
is RunDelta.RunQueued -> run.addMetrics(metric, "RunQueued")
is RunDelta.RunRequiresAction -> run.addMetrics(metric, "RunRequiresAction")
is RunDelta.RunStepCancelled -> runStep.addMetrics(metric, "RunStepCancelled")
is RunDelta.RunStepCompleted -> runStep.addMetrics(metric, "RunStepCompleted")
is RunDelta.RunStepCreated -> runStep.addMetrics(metric, "RunStepCreated")
is RunDelta.RunStepExpired -> runStep.addMetrics(metric, "RunStepExpired")
is RunDelta.RunStepFailed -> runStep.addMetrics(metric, "RunStepFailed")
is RunDelta.RunStepInProgress -> runStep.addMetrics(metric, "RunStepInProgress")
is RunDelta.MessageCreated -> message.addMetrics(metric, "MessageCreated")
is RunDelta.MessageIncomplete -> message.addMetrics(metric, "MessageIncomplete")
is RunDelta.MessageCompleted -> message.addMetrics(metric, "MessageCompleted")
is RunDelta.MessageInProgress -> message.addMetrics(metric, "MessageInProgress")
is RunDelta.MessageDelta,
is RunDelta.RunIncomplete,
is RunDelta.RunStepDelta,
is RunDelta.RunSubmitToolOutputs,
is RunDelta.ThreadCreated,
is RunDelta.Unknown -> {}
}
return this
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class AssistantThread(
createRun(CreateRunRequest(assistantId = assistant.assistantId))

suspend fun createRun(request: CreateRunRequest): RunObject =
api.createRun(threadId, request, configure = ::defaultConfig).addMetrics(metric)
api.createRun(threadId, request, configure = ::defaultConfig).addMetrics(metric, "RunCreated")

fun createRunStream(assistant: Assistant, request: CreateRunRequest): Flow<RunDelta> = flow {
api
Expand Down Expand Up @@ -122,39 +122,42 @@ class AssistantThread(
)
}
)
val run =
metric.assistantToolOutputsRun(event.run.id) {
api
.submitToolOuputsToRunStream(
threadId = threadId,
runId = event.run.id,
submitToolOutputsRunRequest = toolOutputsRequest,
configure = ::defaultConfig
)
.collect {
val delta = RunDelta.fromServerSentEvent(it)
if (delta is RunDelta.RunStepCompleted) {
flowCollector.emit(RunDelta.RunSubmitToolOutputs(toolOutputsRequest))
}
flowCollector.emit(delta)
}
val run = getRun(event.run.id)
val finalEvent =
when (run.status) {
RunObject.Status.queued -> RunDelta.RunQueued(run)
RunObject.Status.in_progress -> RunDelta.RunInProgress(run)
RunObject.Status.requires_action -> RunDelta.RunRequiresAction(run)
RunObject.Status.cancelling -> RunDelta.RunCancelling(run)
RunObject.Status.cancelled -> RunDelta.RunCancelled(run)
RunObject.Status.failed -> RunDelta.RunFailed(run)
RunObject.Status.completed -> RunDelta.RunCompleted(run)
RunObject.Status.expired -> RunDelta.RunExpired(run)
RunObject.Status.incomplete -> RunDelta.RunIncomplete(run)
}
flowCollector.emit(finalEvent)
run

api
.submitToolOuputsToRunStream(
threadId = threadId,
runId = event.run.id,
submitToolOutputsRunRequest = toolOutputsRequest,
configure = ::defaultConfig
)
.collect {
val delta = RunDelta.fromServerSentEvent(it)

delta.launchMetricsIfNecessary()

if (delta is RunDelta.RunStepCompleted) {
flowCollector.emit(RunDelta.RunSubmitToolOutputs(toolOutputsRequest))
}
flowCollector.emit(delta)
}

val run = getRun(event.run.id)
val finalEvent =
when (run.status) {
RunObject.Status.queued -> Pair(RunDelta.RunQueued(run), "RunQueued")
RunObject.Status.in_progress -> Pair(RunDelta.RunInProgress(run), "RunInProgress")
RunObject.Status.requires_action ->
Pair(RunDelta.RunRequiresAction(run), "RunRequiresAction")
RunObject.Status.cancelling -> Pair(RunDelta.RunCancelling(run), "RunCancelling")
RunObject.Status.cancelled -> Pair(RunDelta.RunCancelled(run), "RunCancelled")
RunObject.Status.failed -> Pair(RunDelta.RunFailed(run), "RunFailed")
RunObject.Status.completed -> Pair(RunDelta.RunCompleted(run), "RunCompleted")
RunObject.Status.expired -> Pair(RunDelta.RunExpired(run), "RunExpired")
RunObject.Status.incomplete -> Pair(RunDelta.RunIncomplete(run), "RunIncomplete")
}
flowCollector.emit(finalEvent.first)
metric.assistantCreateRun(run, finalEvent.second)

if (run.status == RunObject.Status.requires_action) {
takeRequiredAction(
depth + 1,
Expand Down Expand Up @@ -210,6 +213,48 @@ class AssistantThread(
is RunStepObjectStepDetails.CaseRunStepDetailsToolCallsObject -> step.value.toolCalls
}

private suspend fun RunDelta.launchMetricsIfNecessary() {
launchRunMetricsIfNecessary()
launchRunStepsMetricsIfNecessary()
launchMessageMetricsIfNecessary()
}

private suspend fun RunDelta.launchRunMetricsIfNecessary() {
when (this) {
is RunDelta.RunCreated -> Pair(run, "RunCreated")
is RunDelta.RunQueued -> Pair(run, "RunQueued")
is RunDelta.RunFailed -> Pair(run, "RunFailed")
is RunDelta.RunCancelled -> Pair(run, "RunCancelled")
is RunDelta.RunCancelling -> Pair(run, "RunCancelling")
is RunDelta.RunExpired -> Pair(run, "RunExpired")
is RunDelta.RunInProgress -> Pair(run, "RunInProgress")
is RunDelta.RunIncomplete -> Pair(run, "RunIncomplete")
else -> null
}?.let { metric.assistantCreateRun(it.first, it.second) }
}

private suspend fun RunDelta.launchRunStepsMetricsIfNecessary() {
when (this) {
is RunDelta.RunStepCreated -> Pair(runStep, "RunStepCreated")
is RunDelta.RunStepInProgress -> Pair(runStep, "RunStepInProgress")
is RunDelta.RunStepCompleted -> Pair(runStep, "RunStepCompleted")
is RunDelta.RunStepFailed -> Pair(runStep, "RunStepFailed")
is RunDelta.RunStepCancelled -> Pair(runStep, "RunStepCancelled")
is RunDelta.RunStepExpired -> Pair(runStep, "RunStepExpired")
else -> null
}?.let { metric.assistantCreateRunStep(it.first, it.second) }
}

private suspend fun RunDelta.launchMessageMetricsIfNecessary() {
when (this) {
is RunDelta.MessageCreated -> Pair(message, "MessageCreated")
is RunDelta.MessageInProgress -> Pair(message, "MessageInProgress")
is RunDelta.MessageIncomplete -> Pair(message, "MessageIncomplete")
is RunDelta.MessageCompleted -> Pair(message, "MessageCompleted")
else -> null
}?.let { metric.assistantCreatedMessage(it.first, it.second) }
}

companion object {

/** Support for OpenAI-Beta: assistants=v2 */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,17 @@ class LogsMetric(private val level: Level = Level.INFO) : Metric {

private val logger = KotlinLogging.logger {}

override suspend fun <A> customSpan(name: String, block: suspend Metric.() -> A): A {
override suspend fun <A> customSpan(
name: String,
parameters: Map<String, String>,
block: suspend Metric.() -> A
): A {
val millis = getTimeMillis()
logger.at(level) { message = "${writeIndent(numberOfBlocks.get())}> Custom-Span: $name" }
numberOfBlocks.incrementAndGet()
parameters.map { (key, value) ->
logger.at(level) { message = "${writeIndent(numberOfBlocks.get())}|-- $key = $value" }
}
val output = block()
logger.at(level) {
message = "${writeIndent(numberOfBlocks.get())}|-- Finished in ${getTimeMillis() - millis} ms"
Expand All @@ -43,7 +50,7 @@ class LogsMetric(private val level: Level = Level.INFO) : Metric {
return output
}

override suspend fun assistantCreateRun(runObject: RunObject) {
override suspend fun assistantCreateRun(runObject: RunObject, source: String) {
logger.at(level) {
this.message = "${writeIndent(numberOfBlocks.get())}|-- AssistantId: ${runObject.assistantId}"
}
Expand All @@ -58,16 +65,7 @@ class LogsMetric(private val level: Level = Level.INFO) : Metric {
}
}

override suspend fun assistantCreateRun(
runId: String,
block: suspend Metric.() -> RunObject
): RunObject {
val output = block()
assistantCreateRun(output)
return output
}

override suspend fun assistantCreateRunStep(runObject: RunStepObject) {
override suspend fun assistantCreateRunStep(runObject: RunStepObject, source: String) {
logger.at(level) {
this.message = "${writeIndent(numberOfBlocks.get())}|-- AssistantId: ${runObject.assistantId}"
}
Expand All @@ -82,44 +80,23 @@ class LogsMetric(private val level: Level = Level.INFO) : Metric {
}
}

override suspend fun assistantCreatedMessage(
runId: String,
block: suspend Metric.() -> List<MessageObject>
): List<MessageObject> {
val output = block()
override suspend fun assistantCreatedMessage(messageObject: MessageObject, source: String) {
logger.at(level) {
this.message = "${writeIndent(numberOfBlocks.get())}|-- Size: ${output.size}"
}
return output
}

override suspend fun assistantCreateRunStep(
runId: String,
block: suspend Metric.() -> RunStepObject
): RunStepObject {
val output = block()
logger.at(level) {
this.message = "${writeIndent(numberOfBlocks.get())}|-- AssistantId: ${output.assistantId}"
this.message =
"${writeIndent(numberOfBlocks.get())}|-- AssistantId: ${messageObject.assistantId}"
}
logger.at(level) {
this.message = "${writeIndent(numberOfBlocks.get())}|-- ThreadId: ${output.threadId}"
this.message = "${writeIndent(numberOfBlocks.get())}|-- ThreadId: ${messageObject.threadId}"
}
logger.at(level) {
this.message = "${writeIndent(numberOfBlocks.get())}|-- RunId: ${output.runId}"
this.message = "${writeIndent(numberOfBlocks.get())}|-- RunId: ${messageObject.id}"
}
logger.at(level) {
this.message = "${writeIndent(numberOfBlocks.get())}|-- Status: ${output.status.name}"
if (messageObject.status != null) {
this.message =
"${writeIndent(numberOfBlocks.get())}|-- Status: ${messageObject.status!!.name}"
}
}
return output
}

override suspend fun assistantToolOutputsRun(
runId: String,
block: suspend Metric.() -> RunObject
): RunObject {
val output = block()
assistantCreateRun(output)
return output
}

override suspend fun event(message: String) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ import com.xebia.functional.openai.generated.model.RunStepObject
import com.xebia.functional.xef.prompt.Prompt

interface Metric {
suspend fun <A> customSpan(name: String, block: suspend Metric.() -> A): A
suspend fun <A> customSpan(
name: String,
parameters: Map<String, String>,
block: suspend Metric.() -> A
): A

suspend fun <A> promptSpan(prompt: Prompt, block: suspend Metric.() -> A): A

Expand All @@ -16,59 +20,32 @@ interface Metric {

suspend fun parameter(key: String, values: List<String>)

suspend fun assistantCreateRun(runObject: RunObject)
suspend fun assistantCreateRun(runObject: RunObject, source: String)

suspend fun assistantCreateRun(runId: String, block: suspend Metric.() -> RunObject): RunObject
suspend fun assistantCreateRunStep(runObject: RunStepObject, source: String)

suspend fun assistantCreateRunStep(runObject: RunStepObject)

suspend fun assistantCreatedMessage(
runId: String,
block: suspend Metric.() -> List<MessageObject>
): List<MessageObject>

suspend fun assistantCreateRunStep(
runId: String,
block: suspend Metric.() -> RunStepObject
): RunStepObject

suspend fun assistantToolOutputsRun(
runId: String,
block: suspend Metric.() -> RunObject
): RunObject
suspend fun assistantCreatedMessage(messageObject: MessageObject, source: String)

companion object {
val EMPTY: Metric =
object : Metric {
override suspend fun <A> customSpan(name: String, block: suspend Metric.() -> A): A =
block()
override suspend fun <A> customSpan(
name: String,
parameters: Map<String, String>,
block: suspend Metric.() -> A
): A = block()

override suspend fun <A> promptSpan(prompt: Prompt, block: suspend Metric.() -> A): A =
block()

override suspend fun assistantCreateRun(runObject: RunObject) {}

override suspend fun assistantCreateRun(
runId: String,
block: suspend Metric.() -> RunObject
): RunObject = block()
override suspend fun assistantCreateRun(runObject: RunObject, source: String) {}

override suspend fun assistantCreateRunStep(runObject: RunStepObject) {}
override suspend fun assistantCreateRunStep(runObject: RunStepObject, source: String) {}

override suspend fun assistantCreatedMessage(
runId: String,
block: suspend Metric.() -> List<MessageObject>
): List<MessageObject> = block()

override suspend fun assistantCreateRunStep(
runId: String,
block: suspend Metric.() -> RunStepObject
): RunStepObject = block()

override suspend fun assistantToolOutputsRun(
runId: String,
block: suspend Metric.() -> RunObject
): RunObject = block()
messageObject: MessageObject,
source: String
) {}

override suspend fun event(message: String) {}

Expand Down
Loading

0 comments on commit 5621c97

Please sign in to comment.