Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:同一流水线多次构建时资源调度优先级优化 #9897 #10848

Merged
merged 31 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
40dabed
feat:同一流水线多次构建时资源调度优先级优化 #9897
tangruotian Jul 30, 2024
df3cab8
feat:同一流水线多次构建时资源调度优先级优化 #9897
tangruotian Aug 9, 2024
0aecd41
feat:同一流水线多次构建时资源调度优先级优化 #9897
tangruotian Aug 13, 2024
fae21ec
feat:同一流水线多次构建时资源调度优先级优化 #9897
tangruotian Aug 13, 2024
9027cd6
feat:同一流水线多次构建时资源调度优先级优化 #9897
tangruotian Aug 13, 2024
8df8e50
feat:同一流水线多次构建时资源调度优先级优化 #9897
tangruotian Aug 13, 2024
c608e7d
feat:同一流水线多次构建时资源调度优先级优化 #9897
tangruotian Aug 13, 2024
e4f1b4d
feat:同一流水线多次构建时资源调度优先级优化 #9897
tangruotian Aug 13, 2024
9baf905
feat:同一流水线多次构建时资源调度优先级优化 #9897
tangruotian Aug 13, 2024
523d7b8
feat:同一流水线多次构建时资源调度优先级优化 #9897
tangruotian Aug 14, 2024
d525bbb
feat:同一流水线多次构建时资源调度优先级优化 #9897
tangruotian Aug 19, 2024
43ded2d
feat:同一流水线多次构建时资源调度优先级优化 #9897
tangruotian Aug 20, 2024
d673043
feat:同一流水线多次构建时资源调度优先级优化 #9897
tangruotian Aug 20, 2024
58ea799
feat:同一流水线多次构建时资源调度优先级优化 #9897
tangruotian Aug 20, 2024
c4adc35
Merge branch 'master' of https://github.com/TencentBlueKing/bk-ci int…
tangruotian Aug 21, 2024
c78cb5a
feat:同一流水线多次构建时资源调度优先级优化 #9897
tangruotian Aug 21, 2024
a034a04
feat:同一流水线多次构建时资源调度优先级优化 #9897 忽略掉因调用打印接口出错而导致调度失败的问题以及超时心跳的重复打印日志问题
irwinsun Aug 21, 2024
6bafe30
Merge pull request #10 from irwinsun/issue_10740
tangruotian Aug 22, 2024
e1d91e0
feat:同一流水线多次构建时资源调度优先级优化 #9897
tangruotian Aug 22, 2024
e4d8f3e
Merge branch 'issue-10740' of https://github.com/tangruotian/bk-ci in…
tangruotian Aug 22, 2024
a83ebb7
Merge branch 'master' of https://github.com/TencentBlueKing/bk-ci int…
tangruotian Aug 22, 2024
cfc356d
feat:同一流水线多次构建时资源调度优先级优化 #9897
tangruotian Aug 22, 2024
0efae1b
feat:同一流水线多次构建时资源调度优先级优化 #9897
tangruotian Aug 22, 2024
9c6f19d
feat:同一流水线多次构建时资源调度优先级优化 #9897
tangruotian Aug 23, 2024
273aebb
Merge branch 'master' of https://github.com/TencentBlueKing/bk-ci int…
tangruotian Aug 23, 2024
dd4efaf
feat:同一流水线多次构建时资源调度优先级优化 #9897
tangruotian Aug 23, 2024
68b3046
AgentId复用类型转换问题 #10915
tangruotian Sep 6, 2024
4b5f26e
AgentId复用类型转换问题 #10915
tangruotian Sep 6, 2024
2604854
Merge branch 'master' of https://github.com/TencentBlueKing/bk-ci int…
tangruotian Sep 10, 2024
08c5051
Merge branch 'issue-10915' of https://github.com/tangruotian/bk-ci in…
tangruotian Sep 10, 2024
41083b4
feat:同一流水线多次构建时资源调度优先级优化 #9897
tangruotian Sep 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,4 @@ import com.tencent.devops.common.api.util.UUIDUtil
open class UniqueIdException(
val msg: String?,
val uniqueId: String? = UUIDUtil.generate()
) :
RuntimeException("[uniqueId=$uniqueId]$msg")
) : RuntimeException("[uniqueId=$uniqueId]$msg")
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ interface BuildListener {
buildId = event.buildId,
containerHashId = event.containerHashId,
vmSeqId = event.vmSeqId,
message = "${I18nUtil.getCodeLanMessage("$BK_FAILED_START_BUILD_MACHINE")}- ${e.message}",
message = "${I18nUtil.getCodeLanMessage(BK_FAILED_START_BUILD_MACHINE)}- ${e.message}",
executeCount = event.executeCount,
jobId = event.jobId
)
Expand All @@ -136,14 +136,14 @@ interface BuildListener {
errorMessage = e.formatErrorMessage
errorType = e.errorType

onFailure(dispatchService, event, e)
dispatchService.onFailure(event, e)
} catch (t: Throwable) {
logger.warn("Fail to handle the start up message - DispatchService($event)", t)
dispatchService.logRed(
buildId = event.buildId,
containerHashId = event.containerHashId,
vmSeqId = event.vmSeqId,
message = "${I18nUtil.getCodeLanMessage("$BK_FAILED_START_BUILD_MACHINE")} - ${t.message}",
message = "${I18nUtil.getCodeLanMessage(BK_FAILED_START_BUILD_MACHINE)} - ${t.message}",
executeCount = event.executeCount,
jobId = event.jobId
)
Expand All @@ -152,8 +152,7 @@ interface BuildListener {
errorMessage = "Fail to handle the start up message"
errorType = ErrorType.SYSTEM

onFailure(
dispatchService = dispatchService,
dispatchService.onFailure(
event = event,
e = BuildFailureException(
errorType = ErrorType.SYSTEM,
Expand Down Expand Up @@ -361,15 +360,6 @@ interface BuildListener {

private fun getClient() = SpringContextUtil.getBean(Client::class.java)

private fun onFailure(
dispatchService: DispatchService,
event: PipelineAgentStartupEvent,
e: BuildFailureException
) {
dispatchService.onContainerFailure(event, e)
DispatchLogRedisUtils.removeRedisExecuteCount(event.buildId)
}

companion object {
private val logger = LoggerFactory.getLogger(BuildListener::class.java)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import com.tencent.devops.common.dispatch.sdk.pojo.docker.DockerConstants.ENV_KE
import com.tencent.devops.common.dispatch.sdk.pojo.docker.DockerConstants.ENV_KEY_BUILD_ID
import com.tencent.devops.common.dispatch.sdk.pojo.docker.DockerConstants.ENV_KEY_PROJECT_ID
import com.tencent.devops.common.dispatch.sdk.utils.ChannelUtils
import com.tencent.devops.common.dispatch.sdk.utils.DispatchLogRedisUtils
import com.tencent.devops.common.event.dispatcher.pipeline.PipelineEventDispatcher
import com.tencent.devops.common.event.pojo.pipeline.IPipelineEvent
import com.tencent.devops.common.log.utils.BuildLogPrinter
Expand Down Expand Up @@ -153,10 +154,33 @@ class DispatchService constructor(
}

fun checkRunning(event: PipelineAgentStartupEvent): Boolean {
val (startBuildTask, buildContainer) = getContainerStartupInfo(event)
return checkRunning(
projectId = event.projectId,
buildId = event.buildId,
containerId = event.containerId,
retryTime = event.retryTime,
executeCount = event.executeCount,
logTag = "$event"
)
}

fun checkRunning(
projectId: String,
buildId: String,
containerId: String,
retryTime: Int,
executeCount: Int?,
logTag: String?
): Boolean {
val (startBuildTask, buildContainer) = getContainerStartupInfo(
projectId = projectId,
buildId = buildId,
containerId = containerId,
logTag = logTag
)

var needStart = true
if (event.executeCount != startBuildTask.executeCount) {
if (executeCount != startBuildTask.executeCount) {
// 如果已经重试过或执行次数不匹配则直接丢弃
needStart = false
} else if (startBuildTask.status.isFinish() && buildContainer.status.isRunning()) {
Expand All @@ -167,9 +191,9 @@ class DispatchService constructor(
}

if (!needStart) {
logger.warn("The build event($event) is not running")
logger.warn("The build event($logTag) is not running")
// dispatch主动发起的重试或者用户已取消的流水线忽略异常报错
if (event.retryTime > 1 || buildContainer.status.isCancel()) {
if (retryTime > 1 || buildContainer.status.isCancel()) {
return false
}

Expand All @@ -184,26 +208,71 @@ class DispatchService constructor(
return true
}

fun onContainerFailure(event: PipelineAgentStartupEvent, e: BuildFailureException) {
logger.warn("[${event.buildId}|${event.vmSeqId}] Container startup failure")
fun onFailure(
event: PipelineAgentStartupEvent,
e: BuildFailureException
) {
onFailure(
projectId = event.projectId,
pipelineId = event.pipelineId,
buildId = event.buildId,
vmSeqId = event.vmSeqId,
e = e,
logTag = "$event"
)
}

fun onFailure(
projectId: String,
pipelineId: String,
buildId: String,
vmSeqId: String,
e: BuildFailureException,
logTag: String?
) {
onContainerFailure(
projectId = projectId,
pipelineId = pipelineId,
buildId = buildId,
vmSeqId = vmSeqId,
e = e,
logTag
)
DispatchLogRedisUtils.removeRedisExecuteCount(buildId)
}

private fun onContainerFailure(
projectId: String,
pipelineId: String,
buildId: String,
vmSeqId: String,
e: BuildFailureException,
logTag: String?
) {
logger.warn("[$buildId|$vmSeqId] Container startup failure")
try {
val (startBuildTask, buildContainer) = getContainerStartupInfo(event)
val (startBuildTask, buildContainer) = getContainerStartupInfo(
projectId = projectId,
buildId = buildId,
containerId = vmSeqId,
logTag = logTag
)
if (buildContainer.status.isCancel() || startBuildTask.status.isCancel()) {
return
}

client.get(ServiceBuildResource::class).setVMStatus(
projectId = event.projectId,
pipelineId = event.pipelineId,
buildId = event.buildId,
vmSeqId = event.vmSeqId,
projectId = projectId,
pipelineId = pipelineId,
buildId = buildId,
vmSeqId = vmSeqId,
status = BuildStatus.FAILED,
errorType = e.errorType,
errorCode = e.errorCode,
errorMsg = e.formatErrorMessage
)
} catch (ignore: ClientException) {
logger.error("SystemErrorLogMonitor|onContainerFailure|${event.buildId}|error=${e.message},${e.errorCode}")
logger.error("SystemErrorLogMonitor|onContainerFailure|$buildId|error=${e.message},${e.errorCode}")
}
}

Expand Down Expand Up @@ -250,20 +319,23 @@ class DispatchService constructor(
}

private fun getContainerStartupInfo(
event: PipelineAgentStartupEvent
projectId: String,
buildId: String,
containerId: String,
logTag: String?
): Pair<PipelineBuildTask, PipelineBuildContainer> {
// 判断流水线当前container是否在运行中
val statusResult = client.get(ServicePipelineTaskResource::class).getContainerStartupInfo(
projectId = event.projectId,
buildId = event.buildId,
containerId = event.containerId,
taskId = VMUtils.genStartVMTaskId(event.containerId)
projectId = projectId,
buildId = buildId,
containerId = containerId,
taskId = VMUtils.genStartVMTaskId(containerId)
)
val startBuildTask = statusResult.data?.startBuildTask
val buildContainer = statusResult.data?.buildContainer
if (statusResult.isNotOk() || startBuildTask == null || buildContainer == null) {
logger.warn(
"The build event($event) fail to check if pipeline task is running " +
"The build event($logTag) fail to check if pipeline task is running " +
"because of statusResult(${statusResult.message})"
)
val errorMessage = I18nUtil.getCodeLanMessage(UNABLE_GET_PIPELINE_JOB_STATUS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ object MQ {
const val ROUTE_AGENT_SHUTDOWN = "r.engine.pipeline.agent.shutdown"
const val QUEUE_AGENT_SHUTDOWN = "q.engine.pipeline.agent.shutdown"

// 第三方 AGENT 排队消息队列 ====================================
const val EXCHANGE_THIRD_PARTY_AGENT_QUEUE = "e.dispatch.tp.agent.queue"
const val ROUTE_THIRD_PARTY_AGENT_QUEUE = "r.dispatch.tp.agent.queue"
const val QUEUE_THIRD_PARTY_AGENT_QUEUE = "q.dispatch.tp.agent.queue"

// 无构建环境的Docker构建机启停消息队列 ====================================
const val EXCHANGE_BUILD_LESS_AGENT_LISTENER_DIRECT = "e.engine.pipeline.bl.agent"
const val ROUTE_BUILD_LESS_AGENT_STARTUP_DISPATCH = "r.engine.pipeline.bl.agent.dispatch.startup"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import com.tencent.devops.common.pipeline.type.DispatchType
abstract class ThirdPartyAgentDispatch(
override var value: String,
open val agentType: AgentType,
open var workspace: String?,
// 第三方构建机用docker作为构建机
open val dockerInfo: ThirdPartyAgentDockerInfo?,
// 类型为REUSE_JOB时,被复用的job的value,防止同一个stage并发下拿不到agent,启动时填充
open var reusedInfo: ReusedInfo?
) : DispatchType(value) {
Expand All @@ -19,6 +22,9 @@ abstract class ThirdPartyAgentDispatch(

// 是否在复用锁定链上
fun hasReuseMutex(): Boolean = this.agentType.isReuse() || this.reusedInfo != null

fun isEnv() = this is ThirdPartyAgentEnvDispatchType
fun isSingle() = this is ThirdPartyAgentIDDispatchType
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,21 @@ package com.tencent.devops.common.pipeline.type.agent
import com.fasterxml.jackson.annotation.JsonProperty
import com.tencent.devops.common.api.util.EnvUtils
import com.tencent.devops.common.pipeline.type.BuildType
import io.swagger.v3.oas.annotations.media.Schema

data class ThirdPartyAgentEnvDispatchType(
@JsonProperty("value")
var envName: String,
@get:Schema(title = "共享环境时必填,值为提供共享环境的项目id")
override var workspace: String?,
// 共享环境时必填,值为提供共享环境的项目id
var envProjectId: String?,
@get:Schema(title = "工作空间")
var workspace: String?,
@get:Schema(title = "agent类型,默认NAME")
override val agentType: AgentType = AgentType.NAME,
// 第三方构建机用docker作为构建机
val dockerInfo: ThirdPartyAgentDockerInfo?,
override val dockerInfo: ThirdPartyAgentDockerInfo?,
override var reusedInfo: ReusedInfo?
) : ThirdPartyAgentDispatch(
value = envName,
workspace = workspace,
agentType = agentType,
dockerInfo = dockerInfo,
reusedInfo = reusedInfo
) {
override fun cleanDataBeforeSave() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,17 @@ import com.tencent.devops.common.api.util.EnvUtils
import com.tencent.devops.common.pipeline.type.BuildType

data class ThirdPartyAgentIDDispatchType(
@JsonProperty("value") var displayName: String,
var workspace: String?,
@JsonProperty("value")
var displayName: String,
override var workspace: String?,
override val agentType: AgentType = AgentType.NAME,
// 第三方构建机用docker作为构建机
val dockerInfo: ThirdPartyAgentDockerInfo?,
override val dockerInfo: ThirdPartyAgentDockerInfo?,
override var reusedInfo: ReusedInfo?
) : ThirdPartyAgentDispatch(
value = displayName,
agentType = agentType,
workspace = workspace,
dockerInfo = dockerInfo,
reusedInfo = reusedInfo
) {
override fun cleanDataBeforeSave() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ open class RedisLock(
private val redisOperation: RedisOperation,
private val lockKey: String,
private val expiredTimeInSeconds: Long,
private val sleepTime: Long = 100L
private val sleepTime: Long = 100L,
private var lockValue: String = UUID.randomUUID().toString()
) : AutoCloseable {
private val lockValue = UUID.randomUUID().toString()

/**
* 锁是否已经被占用
Expand Down Expand Up @@ -127,6 +127,12 @@ open class RedisLock(

private fun getLocalLock(): Any = localLock.get(lockKey)!!

fun getLockValue() = lockValue

fun setLockValue(lockValue: String) {
this.lockValue = lockValue
}

override fun close() {
unlock()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.tencent.devops.dispatch.api

import io.swagger.v3.oas.annotations.Operation
import io.swagger.v3.oas.annotations.tags.Tag
import javax.ws.rs.Consumes
import javax.ws.rs.POST
import javax.ws.rs.Path
import javax.ws.rs.Produces
import javax.ws.rs.QueryParam
import javax.ws.rs.core.MediaType

@Tag(name = "OP_AGENT", description = "agent相关")
@Path("/op/agent")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
interface OpAgentResource {

@Operation(summary = "修改灰度排队功能的项目或者流水线")
@POST
@Path("/update_gray_queue")
fun updateGrayQueue(
@QueryParam("projectId")
projectId: String,
@QueryParam("operate")
operate: String,
pipelineIds: Set<String>?
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,9 @@ const val AGENT_REUSE_MUTEX_WAIT_REUSED_ENV = "agentReuseMuteXWaitReusedEnv"
const val BK_ENV_NODE_DISABLE = "bkEnvNodeDisable"
const val BK_THIRD_JOB_ENV_CURR = "bkThirdJobEnvCurr" // 当前环境下所有构建机并发{0}已经超过配置的{1},排队{2}分钟
const val BK_THIRD_JOB_NODE_CURR = "bkThirdJobNodeCurr" // 当前环境下所有节点运行任务都超过了配置的{0},排队{1}分钟
// 构建机复用互斥,节点 {0} 已被 {1} 构建使用,剩余可调度空间不足,重新调度
const val AGENT_REUSE_MUTEX_RESERVE_REDISPATCH = "agentReuseMutexReserveRedispatch"
// 构建环境调度结束,已选取节点 {0}
const val BK_ENV_DISPATCH_AGENT = "bkEnvDispatchAgent"
// 尝试下发任务至节点 {0}
const val TRY_AGENT_DISPATCH = "tryAgentDispatch"
Loading
Loading