Skip to content

Commit

Permalink
Fix tool outputs (#652)
Browse files Browse the repository at this point in the history
* Emit the run object returned by the tool outputs

* submit tool outputs in parallel when the run requires that action

* load run before submitting tool calls to ensure it's in the right state

* Apply spotless formatting

---------

Co-authored-by: raulraja <[email protected]>
  • Loading branch information
raulraja and raulraja authored Feb 5, 2024
1 parent bf23d6a commit 56f2035
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class Assistant(
} catch (e: Exception) {
val message = "Error calling to tool registered $name: ${e.message}"
val logger = KtorSimpleLogger("Functions")
logger.error(message)
logger.error(message, e)
JsonObject(mapOf("error" to JsonPrimitive(message)))
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.xebia.functional.xef.llm.assistants

import arrow.fx.coroutines.parMap
import com.xebia.functional.openai.apis.AssistantsApi
import com.xebia.functional.openai.infrastructure.ApiClient
import com.xebia.functional.openai.models.*
Expand Down Expand Up @@ -163,10 +164,10 @@ class AssistantThread(
) {
val steps = runSteps(runId)
steps.forEach { step ->
val callsWithArguments =
step.stepDetails.toolCalls().filter {
it.function != null && it.function!!.arguments.isNotBlank()
}
val calls = step.stepDetails.toolCalls()
// .filter {
// it.function != null && it.function!!.arguments.isNotBlank()
// }

// We have detected that tool call in in_progress state sometimes don't have any arguments
// and this is not valid. We need to skip this step in this case.
Expand All @@ -175,38 +176,44 @@ class AssistantThread(
step.type == RunStepObject.Type.tool_calls &&
step.status == RunStepObject.Status.in_progress
)
callsWithArguments.isNotEmpty()
calls.isNotEmpty()
else true

val emitEvent = canEmitToolCalls && step !in cache

if (emitEvent) {
cache.add(step)
emit(RunDelta.Step(step))

if (step.status == RunStepObject.Status.in_progress) {
callsWithArguments.forEach { toolCall ->
val function = toolCall.function
if (function != null && function.arguments.isNotBlank()) {
}
val run = getRun(runId)
if (
run.status == RunObject.Status.requires_action &&
run.requiredAction?.type == RunObjectRequiredAction.Type.submit_tool_outputs
) {
val results: Map<String, JsonElement> =
calls
.filter { it.function != null }
.parMap { toolCall ->
val function = toolCall.function!!
val result: JsonElement =
assistant.getToolRegistered(function.name, function.arguments)
api.submitToolOuputsToRun(
threadId = threadId,
runId = runId,
submitToolOutputsRunRequest =
SubmitToolOutputsRunRequest(
toolOutputs =
listOf(
SubmitToolOutputsRunRequestToolOutputsInner(
toolCallId = toolCall.id,
output = ApiClient.JSON_DEFAULT.encodeToString(result)
)
)
)
)
toolCall.id to result
}
}
}
.toMap()
api.submitToolOuputsToRun(
threadId = threadId,
runId = runId,
submitToolOutputsRunRequest =
SubmitToolOutputsRunRequest(
toolOutputs =
results.map { (toolCallId, result) ->
SubmitToolOutputsRunRequestToolOutputsInner(
toolCallId = toolCallId,
output = ApiClient.JSON_DEFAULT.encodeToString(result)
)
}
)
)
}
}
}
Expand Down

0 comments on commit 56f2035

Please sign in to comment.