Skip to content

Commit

Permalink
Fix assistant streaming (#651)
Browse files Browse the repository at this point in the history
* Assistant fix

* Spotless

* Update core/src/commonMain/kotlin/com/xebia/functional/xef/llm/assistants/AssistantThread.kt

Co-authored-by: José Carlos Montañez <[email protected]>

* Update core/src/commonMain/kotlin/com/xebia/functional/xef/llm/assistants/AssistantThread.kt

Co-authored-by: José Carlos Montañez <[email protected]>

---------

Co-authored-by: David Vega Lichacz <[email protected]>
Co-authored-by: José Carlos Montañez <[email protected]>
  • Loading branch information
3 people authored Feb 2, 2024
1 parent fc62c74 commit bf23d6a
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.xebia.functional.openai.models.ext.assistant.RunStepDetailsToolCallsO
import com.xebia.functional.openai.models.ext.assistant.RunStepObjectStepDetails
import com.xebia.functional.xef.llm.fromEnvironment
import kotlin.jvm.JvmName
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.flow
Expand Down Expand Up @@ -83,13 +84,45 @@ class AssistantThread(
val stepCache = mutableSetOf<RunStepObject>() // CacheTool
val messagesCache = mutableSetOf<MessageObject>()
val runCache = mutableSetOf<RunObject>()
var run = checkRun(runId = runId, cache = runCache)
while (run.status != RunObject.Status.completed) {
checkSteps(assistant = assistant, runId = runId, cache = stepCache)
try {
var run = checkRun(runId = runId, cache = runCache)
while (run.status != RunObject.Status.completed) {
checkSteps(assistant = assistant, runId = runId, cache = stepCache)
delay(500) // To avoid excessive calls to OpenAI
checkMessages(cache = messagesCache)
delay(500) // To avoid excessive calls to OpenAI
run = checkRun(runId = runId, cache = runCache)
}
} catch (e: Exception) {
emit(
RunDelta.Run(
RunObject(
id = runId,
`object` = RunObject.Object.thread_run,
createdAt = 0,
threadId = threadId,
assistantId = assistant.assistantId,
status = RunObject.Status.failed,
lastError =
RunObjectLastError(
code = RunObjectLastError.Code.server_error,
message = e.message ?: "Unknown error"
),
startedAt = null,
cancelledAt = null,
failedAt = null,
completedAt = null,
model = "",
instructions = "",
tools = emptyList(),
fileIds = emptyList(),
metadata = null
)
)
)
} finally {
checkMessages(cache = messagesCache)
run = checkRun(runId = runId, cache = runCache)
}
checkMessages(cache = messagesCache)
}

private suspend fun FlowCollector<RunDelta>.checkRun(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,15 @@ open class ApiClient(val baseUrl: String) : AutoCloseable {
{
it.install(ContentNegotiation) { json(jsonBlock) }
it.install(HttpTimeout) {
requestTimeoutMillis = 60 * 1000
connectTimeoutMillis = 60 * 1000
socketTimeoutMillis = 60 * 1000
requestTimeoutMillis = 45 * 1000
connectTimeoutMillis = 45 * 1000
socketTimeoutMillis = 45 * 1000
}
it.install(HttpRequestRetry) {
maxRetries = 5
retryIf { _, response -> !response.status.isSuccess() }
retryOnExceptionIf { _, _ -> true }
delayMillis { retry -> retry * 1000L }
}
it.install(Logging) { level = LogLevel.NONE }
httpClientConfig?.invoke(it)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,15 @@ import {{packageName}}.auth.*
{
it.install(ContentNegotiation) { json(jsonBlock) }
it.install(HttpTimeout) {
requestTimeoutMillis = 60 * 1000
connectTimeoutMillis = 60 * 1000
socketTimeoutMillis = 60 * 1000
requestTimeoutMillis = 45 * 1000
connectTimeoutMillis = 45 * 1000
socketTimeoutMillis = 45 * 1000
}
it.install(HttpRequestRetry) {
maxRetries = 5
retryIf { _, response -> !response.status.isSuccess() }
retryOnExceptionIf { _, _ -> true }
delayMillis { retry -> retry * 1000L }
}
it.install(Logging) { level = LogLevel.NONE }
httpClientConfig?.invoke(it)
Expand Down

0 comments on commit bf23d6a

Please sign in to comment.