Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 优化schedule #221 #226

Merged
merged 1 commit into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ data class JobLog(
var triggerCode: Int = ExecutionCodeEnum.INITIALED.code(),
var triggerMsg: String? = null,
var triggerType: Int,
var scheduledFireTime: LocalDateTime,

/**
* handle信息
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import com.tencent.devops.schedule.pojo.page.BasePageRequest
import java.time.LocalDateTime

class LogQueryParam(
var jobId: String? = null,
var triggerTime: List<String>? = null,
var triggerTimeFrom: LocalDateTime? = null,
var triggerTimeTo: LocalDateTime? = null
): BasePageRequest()
var jobId: String? = null,
var triggerTime: List<String>? = null,
var executionCode: Int? = null,
var triggerCode: Int? = null,
var triggerTimeFrom: LocalDateTime? = null,
var triggerTimeTo: LocalDateTime? = null,
) : BasePageRequest()
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ data class TriggerParam(
var logId: String,
@JsonFormat(pattern = DATE_TIME_PATTERN)
var triggerTime: LocalDateTime,
@JsonFormat(pattern = DATE_TIME_PATTERN)
var scheduledFireTime: LocalDateTime,
var broadcastIndex: Int = 0,
var broadcastTotal: Int = 0,
var workerAddress: String? = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ fun TJobLog.convert(): JobLog {
executionCode = executionCode,
executionMsg = executionMsg,
alarmStatus = alarmStatus,
scheduledFireTime = scheduledFireTime,
)
}

Expand All @@ -116,6 +117,7 @@ fun JobLog.convert(): TJobLog {
executionCode = executionCode,
executionMsg = executionMsg,
alarmStatus = alarmStatus,
scheduledFireTime = scheduledFireTime,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ data class TJobLog(
var triggerCode: Int,
var triggerMsg: String? = null,
var triggerType: Int,
var scheduledFireTime: LocalDateTime,

/**
* execution信息
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ class MongoJobProvider(
} ?: run {
triggerTimeTo?.let { to -> criteria.and(TJobLog::triggerTime).lte(to) }
}
executionCode?.let { criteria.and(TJobLog::executionCode).isEqualTo(it) }
triggerCode?.let { criteria.and(TJobLog::triggerCode).isEqualTo(it) }
}
val query = Query.query(criteria).with(Sort.by(Sort.Direction.DESC, TJobLog::triggerTime.name))
val total = mongoTemplate.count(query, TJobLog::class.java)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import org.slf4j.LoggerFactory
import org.springframework.beans.factory.DisposableBean
import org.springframework.beans.factory.InitializingBean
import org.springframework.context.ApplicationEventPublisher
import java.time.Instant
import java.time.LocalDateTime
import java.time.ZoneId
import java.util.concurrent.LinkedBlockingDeque
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -108,7 +110,7 @@ class DefaultJobScheduler(
logger.debug("prepare trigger job[{}]", triggerContext)
triggerThreadPool.execute {
val startTime = System.currentTimeMillis()
val triggerTime = triggerContext.scheduledFireTime ?: triggerContext.fireTime
val fireTime = triggerContext.scheduledFireTime ?: triggerContext.fireTime
try {
val job = triggerContext.job
jobParam?.let { job.jobParam = it }
Expand All @@ -134,10 +136,10 @@ class DefaultJobScheduler(
jobId,
startTime,
System.currentTimeMillis(),
triggerTime,
fireTime,
)
publisher.publishEvent(triggerEvent)
cc.updateLatency(System.currentTimeMillis() - triggerTime)
cc.updateLatency(System.currentTimeMillis() - fireTime)
}
}
}
Expand All @@ -153,7 +155,6 @@ class DefaultJobScheduler(
val triggerContext = JobTriggerContext(
job = job,
fireTime = System.currentTimeMillis(),
prevFireTime = job.lastTriggerTime,
)
trigger(triggerContext, triggerType, retryCount, jobParam, shardingParam)
}
Expand Down Expand Up @@ -181,11 +182,13 @@ class DefaultJobScheduler(
val shardingParam = if (routeStrategy == RouteStrategyEnum.SHARDING_BROADCAST) "$index/$total" else null

// 1. 保存日志
val fireTime = triggerContext.scheduledFireTime ?: triggerContext.fireTime
val jobLog = JobLog(
jobId = job.id.orEmpty(),
groupId = group.id.orEmpty(),
triggerType = triggerType.code(),
triggerTime = LocalDateTime.now(),
scheduledFireTime = Instant.ofEpochMilli(fireTime).atZone(ZoneId.systemDefault()).toLocalDateTime(),
)
val logId = jobManager.addJobLog(jobLog)
// 2. 构造trigger param
Expand All @@ -197,6 +200,7 @@ class DefaultJobScheduler(
jobTimeout = job.jobTimeout,
logId = logId,
triggerTime = jobLog.triggerTime,
scheduledFireTime = jobLog.scheduledFireTime,
broadcastIndex = index,
broadcastTotal = total,
updateTime = job.updateTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import com.tencent.devops.schedule.enums.JobModeEnum.SHELL
import com.tencent.devops.schedule.handler.K8sShellHandler
import com.tencent.devops.schedule.handler.ShellHandler
import com.tencent.devops.schedule.k8s.K8sHelper
import com.tencent.devops.schedule.pojo.ScheduleResponse
import com.tencent.devops.schedule.pojo.trigger.TriggerParam
import com.tencent.devops.schedule.thread.JobThread
import com.tencent.devops.schedule.thread.JobThreadGroup
Expand All @@ -18,7 +19,6 @@ import com.tencent.devops.web.util.SpringContextHolder
import org.slf4j.LoggerFactory
import org.springframework.beans.BeansException
import org.springframework.beans.factory.DisposableBean
import java.lang.IllegalStateException
import java.util.concurrent.ConcurrentHashMap

/**
Expand All @@ -34,7 +34,7 @@ class DefaultJobExecutor(
private val threadGroup = JobThreadGroup(workerProperties.executor.threads, serverRpcClient)
private val jobThreadRepository = ConcurrentHashMap<String, JobThread>()

override fun execute(param: TriggerParam) {
override fun execute(param: TriggerParam): ScheduleResponse {
val jobId = param.jobId
val logId = param.logId
logger.debug("prepare to execute job[$jobId], log[$logId]: {}", param)
Expand Down Expand Up @@ -68,7 +68,7 @@ class DefaultJobExecutor(
when (blockStrategy) {
BlockStrategyEnum.DISCARD_LATER -> {
if (jobThread.hasRunningJobs(jobId)) {
throw IllegalStateException("discard task $logId by block strategy[DISCARD_LATER]")
return ScheduleResponse.failed("discard by block strategy")
}
}

Expand All @@ -87,9 +87,11 @@ class DefaultJobExecutor(
}
val task = TriggerTask(jobId, handler, param)
jobThread.pushTriggerQueue(task)
return ScheduleResponse.success()
}

override fun destroy() {
logger.info("Destroying DefaultJobExecutor")
threadGroup.close()
jobThreadRepository.clear()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ data class JobContext(
* 本次任务触发时间
*/
var triggerTime: LocalDateTime,

/**
* 本次任务调度时间
* */
var scheduledFireTime: LocalDateTime,
/**
* 任务更新时间
* */
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.tencent.devops.schedule.executor

import com.tencent.devops.schedule.pojo.ScheduleResponse
import com.tencent.devops.schedule.pojo.trigger.TriggerParam

/**
Expand All @@ -10,5 +11,5 @@ interface JobExecutor {
* 提交任务
* @param param 任务触发参数
*/
fun execute(param: TriggerParam)
fun execute(param: TriggerParam): ScheduleResponse
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ class JobThread(private val serverRpcClient: ServerRpcClient) : Thread() {
}

fun toStop() {
logger.info("Stopping $name")
stop.set(true)
}

Expand All @@ -105,7 +104,7 @@ class JobThread(private val serverRpcClient: ServerRpcClient) : Thread() {
if (logId == stopLogId) {
cancelJobs.remove(task.jobId)
}
val result = JobExecutionResult.failed("cancelled by block strategy[COVER_EARLY]")
val result = JobExecutionResult.failed("cancelled by block strategy")
result.logId = logId
submitResult(task.jobId, result)
return false
Expand All @@ -118,6 +117,7 @@ class JobThread(private val serverRpcClient: ServerRpcClient) : Thread() {
jobParamMap = jobParam.readJsonString(),
logId = logId,
triggerTime = triggerTime,
scheduledFireTime = scheduledFireTime,
broadcastIndex = broadcastIndex,
broadcastTotal = broadcastTotal,
source = if (source != null) String(base64Decoder.decode(param.source)) else null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class JobThreadGroup(nThreads: Int, serverRpcClient: ServerRpcClient) : AutoClos
override fun close() {
threads.forEach {
it.toStop()
it.interrupt()
it.join()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ class WorkerRpcController(
override fun runJob(@RequestBody param: TriggerParam): ScheduleResponse {
return try {
jobExecutor.execute(param)
ScheduleResponse.success()
} catch (e: Exception) {
logger.error("execute job[$param] error: ${e.message}", e)
ScheduleResponse.failed(e.message.orEmpty())
Expand Down
Loading