diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-common/src/main/kotlin/com/tencent/devops/schedule/pojo/job/JobUpdateRequest.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-common/src/main/kotlin/com/tencent/devops/schedule/pojo/job/JobUpdateRequest.kt index e316619..9d53156 100644 --- a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-common/src/main/kotlin/com/tencent/devops/schedule/pojo/job/JobUpdateRequest.kt +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-common/src/main/kotlin/com/tencent/devops/schedule/pojo/job/JobUpdateRequest.kt @@ -63,4 +63,14 @@ data class JobUpdateRequest( * 镜像地址,容器任务需要 * */ val image: String? = null, + + /** + * 调度配置 + * */ + val scheduleConf: String? = null, + + /** + * 调度类型 + */ + val scheduleType: Int? = null, ) diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-common/src/main/kotlin/com/tencent/devops/schedule/pojo/log/JobLog.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-common/src/main/kotlin/com/tencent/devops/schedule/pojo/log/JobLog.kt index 780b589..419a14d 100644 --- a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-common/src/main/kotlin/com/tencent/devops/schedule/pojo/log/JobLog.kt +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-common/src/main/kotlin/com/tencent/devops/schedule/pojo/log/JobLog.kt @@ -2,6 +2,7 @@ package com.tencent.devops.schedule.pojo.log import com.fasterxml.jackson.annotation.JsonFormat import com.tencent.devops.schedule.constants.DATE_TIME_PATTERN +import com.tencent.devops.schedule.enums.AlarmStatusEnum import com.tencent.devops.schedule.enums.ExecutionCodeEnum import java.time.LocalDateTime @@ -36,7 +37,7 @@ data class JobLog( */ @JsonFormat(pattern = DATE_TIME_PATTERN) var triggerTime: LocalDateTime, - var triggerCode : Int = ExecutionCodeEnum.INITIALED.code(), + var triggerCode: Int = ExecutionCodeEnum.INITIALED.code(), var triggerMsg: String? = null, var triggerType: Int, @@ -51,5 +52,5 @@ data class JobLog( /** * alarm信息 */ - var alarmStatus: Int? = null + var alarmStatus: Int = AlarmStatusEnum.TODO.code(), ) diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-model-mongodb/src/main/kotlin/com/tencent/devops/schedule/mongo/config/MongoModelAutoConfiguration.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-model-mongodb/src/main/kotlin/com/tencent/devops/schedule/mongo/config/MongoModelAutoConfiguration.kt index 9e1eba1..3310f8f 100644 --- a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-model-mongodb/src/main/kotlin/com/tencent/devops/schedule/mongo/config/MongoModelAutoConfiguration.kt +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-model-mongodb/src/main/kotlin/com/tencent/devops/schedule/mongo/config/MongoModelAutoConfiguration.kt @@ -12,7 +12,12 @@ import com.tencent.devops.schedule.provider.LockProvider import com.tencent.devops.schedule.provider.WorkerProvider import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration +import org.springframework.context.event.ContextRefreshedEvent +import org.springframework.context.event.EventListener import org.springframework.data.mongodb.core.MongoTemplate +import org.springframework.data.mongodb.core.index.IndexDefinition +import org.springframework.data.mongodb.core.index.MongoPersistentEntityIndexResolver +import org.springframework.data.mongodb.core.mapping.Document import org.springframework.data.mongodb.repository.config.EnableMongoRepositories @Configuration(proxyBeanMethods = false) @@ -22,7 +27,7 @@ class MongoModelAutoConfiguration( private val logRepository: LogRepository, private val workerGroupRepository: WorkerGroupRepository, private val workerRepository: WorkerRepository, - private val mongoTemplate: MongoTemplate + private val mongoTemplate: MongoTemplate, ) { @Bean @@ -30,7 +35,7 @@ class MongoModelAutoConfiguration( return MongoJobProvider( jobRepository = jobRepository, logRepository = logRepository, - mongoTemplate = mongoTemplate + mongoTemplate = mongoTemplate, ) } @@ -39,7 +44,7 @@ class MongoModelAutoConfiguration( return MongoWorkerProvider( groupRepository = workerGroupRepository, workerRepository = workerRepository, - mongoTemplate = mongoTemplate + mongoTemplate = mongoTemplate, ) } @@ -47,4 +52,19 @@ class MongoModelAutoConfiguration( fun lockProvider(): LockProvider { return MongoLockProvider(mongoTemplate) } + + @EventListener(ContextRefreshedEvent::class) + fun initIndicesAfterStartup() { + val mappingContext = mongoTemplate.converter.mappingContext + val resolver = MongoPersistentEntityIndexResolver(mappingContext) + mappingContext.persistentEntities + .stream() + .filter { it.isAnnotationPresent(Document::class.java) } + .forEach { + val indexOps = mongoTemplate.indexOps(it.type) + resolver.resolveIndexFor(it.type).forEach { indexDefinition: IndexDefinition -> + indexOps.ensureIndex(indexDefinition) + } + } + } } diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-model-mongodb/src/main/kotlin/com/tencent/devops/schedule/mongo/model/TJobInfo.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-model-mongodb/src/main/kotlin/com/tencent/devops/schedule/mongo/model/TJobInfo.kt index 82174f2..675e533 100644 --- a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-model-mongodb/src/main/kotlin/com/tencent/devops/schedule/mongo/model/TJobInfo.kt +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-model-mongodb/src/main/kotlin/com/tencent/devops/schedule/mongo/model/TJobInfo.kt @@ -1,5 +1,8 @@ package com.tencent.devops.schedule.mongo.model +import com.tencent.devops.schedule.mongo.model.TJobInfo.Companion.NEXT_TRIGGER_TIME_IDX +import com.tencent.devops.schedule.mongo.model.TJobInfo.Companion.NEXT_TRIGGER_TIME_IDX_DEF +import org.springframework.data.mongodb.core.index.CompoundIndex import org.springframework.data.mongodb.core.mapping.Document import java.time.LocalDateTime @@ -7,6 +10,7 @@ import java.time.LocalDateTime * 任务信息模型类 */ @Document("job_info") +@CompoundIndex(name = NEXT_TRIGGER_TIME_IDX, def = NEXT_TRIGGER_TIME_IDX_DEF, background = true) data class TJobInfo( var id: String? = null, @@ -108,4 +112,9 @@ data class TJobInfo( * 镜像地址 * */ var image: String? = null, -) +) { + companion object { + const val NEXT_TRIGGER_TIME_IDX = "nextTriggerTime_idx" + const val NEXT_TRIGGER_TIME_IDX_DEF = "{'nextTriggerTime': 1}" + } +} diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-model-mongodb/src/main/kotlin/com/tencent/devops/schedule/mongo/model/TJobLog.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-model-mongodb/src/main/kotlin/com/tencent/devops/schedule/mongo/model/TJobLog.kt index 25c55a7..0265c90 100644 --- a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-model-mongodb/src/main/kotlin/com/tencent/devops/schedule/mongo/model/TJobLog.kt +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-model-mongodb/src/main/kotlin/com/tencent/devops/schedule/mongo/model/TJobLog.kt @@ -1,5 +1,17 @@ package com.tencent.devops.schedule.mongo.model +import com.tencent.devops.schedule.mongo.model.TJobLog.Companion.ALARM_STATUS_IDX +import com.tencent.devops.schedule.mongo.model.TJobLog.Companion.ALARM_STATUS_IDX_DEF +import com.tencent.devops.schedule.mongo.model.TJobLog.Companion.EXECUTION_CODE_IDX +import com.tencent.devops.schedule.mongo.model.TJobLog.Companion.EXECUTION_CODE_IDX_DEF +import com.tencent.devops.schedule.mongo.model.TJobLog.Companion.JOB_ID_IDX +import com.tencent.devops.schedule.mongo.model.TJobLog.Companion.JOB_ID_IDX_DEF +import com.tencent.devops.schedule.mongo.model.TJobLog.Companion.TRIGGER_CODE_IDX +import com.tencent.devops.schedule.mongo.model.TJobLog.Companion.TRIGGER_CODE_IDX_DEF +import com.tencent.devops.schedule.mongo.model.TJobLog.Companion.TRIGGER_TIME_IDX +import com.tencent.devops.schedule.mongo.model.TJobLog.Companion.TRIGGER_TIME_IDX_DEF +import org.springframework.data.mongodb.core.index.CompoundIndex +import org.springframework.data.mongodb.core.index.CompoundIndexes import org.springframework.data.mongodb.core.mapping.Document import java.time.LocalDateTime @@ -7,6 +19,13 @@ import java.time.LocalDateTime * 任务信息模型类 */ @Document("job_log") +@CompoundIndexes( + CompoundIndex(name = JOB_ID_IDX, def = JOB_ID_IDX_DEF, background = true), + CompoundIndex(name = TRIGGER_TIME_IDX, def = TRIGGER_TIME_IDX_DEF, background = true), + CompoundIndex(name = TRIGGER_CODE_IDX, def = TRIGGER_CODE_IDX_DEF, background = true), + CompoundIndex(name = EXECUTION_CODE_IDX, def = EXECUTION_CODE_IDX_DEF, background = true), + CompoundIndex(name = ALARM_STATUS_IDX, def = ALARM_STATUS_IDX_DEF, background = true), +) data class TJobLog( var id: String? = null, @@ -38,7 +57,7 @@ data class TJobLog( * trigger信息 */ var triggerTime: LocalDateTime, - var triggerCode : Int, + var triggerCode: Int, var triggerMsg: String? = null, var triggerType: Int, @@ -52,5 +71,18 @@ data class TJobLog( /** * alarm信息 */ - var alarmStatus: Int? = null -) + var alarmStatus: Int, +) { + companion object { + const val JOB_ID_IDX = "jobId_idx" + const val JOB_ID_IDX_DEF = "{'jobId': 1 , 'triggerTime': 1}" + const val TRIGGER_TIME_IDX = "triggerTime_idx" + const val TRIGGER_TIME_IDX_DEF = "{'triggerTime': 1}" + const val TRIGGER_CODE_IDX = "triggerCode_idx" + const val TRIGGER_CODE_IDX_DEF = "{'triggerCode': 1}" + const val EXECUTION_CODE_IDX = "executionCode_idx" + const val EXECUTION_CODE_IDX_DEF = "{'executionCode': 1}" + const val ALARM_STATUS_IDX = "alarmStatus_idx" + const val ALARM_STATUS_IDX_DEF = "{'alarmStatus': 1}" + } +} diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-model-mongodb/src/main/kotlin/com/tencent/devops/schedule/mongo/provider/MongoJobProvider.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-model-mongodb/src/main/kotlin/com/tencent/devops/schedule/mongo/provider/MongoJobProvider.kt index 1faaebb..faff00e 100644 --- a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-model-mongodb/src/main/kotlin/com/tencent/devops/schedule/mongo/provider/MongoJobProvider.kt +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-model-mongodb/src/main/kotlin/com/tencent/devops/schedule/mongo/provider/MongoJobProvider.kt @@ -34,7 +34,7 @@ import java.time.LocalDateTime class MongoJobProvider( private val jobRepository: JobRepository, private val logRepository: LogRepository, - private val mongoTemplate: MongoTemplate + private val mongoTemplate: MongoTemplate, ) : JobProvider { override fun addJob(jobInfo: JobInfo): String { return jobInfo.convert().apply { jobRepository.save(this) }.id.orEmpty() @@ -67,14 +67,14 @@ class MongoJobProvider( pageNumber = param.pageNumber, pageSize = param.pageSize, totalRecords = total, - records = records + records = records, ) } - override fun findTodoJobs(time: Long): List { + override fun findTodoJobs(time: Long, limit: Int): List { val criteria = where(TJobInfo::nextTriggerTime).lte(time) .and(TJobInfo::triggerStatus).isEqualTo(TriggerStatusEnum.RUNNING.code()) - val query = Query(criteria) + val query = Query(criteria).limit(limit) return mongoTemplate.find(query, TJobInfo::class.java).map { it.convert() } } @@ -118,7 +118,7 @@ class MongoJobProvider( pageNumber = param.pageNumber, pageSize = param.pageSize, totalRecords = total, - records = records + records = records, ) } @@ -127,11 +127,11 @@ class MongoJobProvider( val criteria = where(TJobLog::alarmStatus).isEqualTo(AlarmStatusEnum.TODO.code()) .orOperator( where(TJobLog::triggerCode).lt(TriggerCodeEnum.INITIALED.code()), - where(TJobLog::executionCode).lt(ExecutionCodeEnum.INITIALED.code()) + where(TJobLog::executionCode).lt(ExecutionCodeEnum.INITIALED.code()), ) val query = Query.query(criteria).with(pageable) query.fields().include(TJobLog::id.name) - return mongoTemplate.find(query, IdEntity::class.java).map { it.id } + return mongoTemplate.find(query, IdEntity::class.java, "job_log").map { it.id } } override fun findLostJobLogIds(triggerTime: LocalDateTime): List { @@ -140,7 +140,7 @@ class MongoJobProvider( .and(TJobLog::triggerTime).lt(triggerTime) val query = Query.query(criteria) query.fields().include(TJobLog::id.name) - return mongoTemplate.find(query, IdEntity::class.java).map { it.id } + return mongoTemplate.find(query, IdEntity::class.java, "job_log").map { it.id } } override fun deleteLogByJobId(jobId: String) { @@ -158,7 +158,7 @@ class MongoJobProvider( logId: String, executionCode: Int, executionMessage: String, - executionTime: LocalDateTime + executionTime: LocalDateTime, ): Int { val criteria = where(TJobLog::id).`is`(logId) val query = Query.query(criteria) diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-model-mongodb/src/main/kotlin/com/tencent/devops/schedule/mongo/provider/MongoLockProvider.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-model-mongodb/src/main/kotlin/com/tencent/devops/schedule/mongo/provider/MongoLockProvider.kt index da21182..ce72c38 100644 --- a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-model-mongodb/src/main/kotlin/com/tencent/devops/schedule/mongo/provider/MongoLockProvider.kt +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-model-mongodb/src/main/kotlin/com/tencent/devops/schedule/mongo/provider/MongoLockProvider.kt @@ -1,10 +1,10 @@ package com.tencent.devops.schedule.mongo.provider -import com.mongodb.MongoServerException import com.mongodb.client.result.UpdateResult import com.tencent.devops.schedule.mongo.model.TLockInfo import com.tencent.devops.schedule.provider.LockProvider import org.slf4j.LoggerFactory +import org.springframework.dao.DuplicateKeyException import org.springframework.data.mongodb.core.FindAndModifyOptions import org.springframework.data.mongodb.core.MongoTemplate import org.springframework.data.mongodb.core.query.Query @@ -60,13 +60,9 @@ class MongoLockProvider( } else { null } - } catch (e: MongoServerException) { - if (e.code == 11000) { // duplicate key - return null - } else { - throw e - } + } catch (ignore: DuplicateKeyException) { } + return null } override fun release(key: String, token: String): Boolean { diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-model/src/main/kotlin/com/tencent/devops/schedule/provider/JobProvider.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-model/src/main/kotlin/com/tencent/devops/schedule/provider/JobProvider.kt index d1e816e..1af76ec 100644 --- a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-model/src/main/kotlin/com/tencent/devops/schedule/provider/JobProvider.kt +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-model/src/main/kotlin/com/tencent/devops/schedule/provider/JobProvider.kt @@ -41,8 +41,9 @@ interface JobProvider { /** * 根据下次触发时间查询需要执行的任务 * @param time 下次触发时间 + * @param limit 返回限制数量 */ - fun findTodoJobs(time: Long): List + fun findTodoJobs(time: Long, limit: Int): List /** * 根据id查找任务 @@ -108,7 +109,7 @@ interface JobProvider { logId: String, executionCode: Int, executionMessage: String, - executionTime: LocalDateTime + executionTime: LocalDateTime, ): Int /** @@ -122,5 +123,4 @@ interface JobProvider { * @param jobId 任务id */ fun deleteLogByJobId(jobId: String) - } diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/build.gradle.kts b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/build.gradle.kts index 2a521c7..c9faf1c 100644 --- a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/build.gradle.kts +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/build.gradle.kts @@ -10,6 +10,7 @@ dependencies { api(project(":devops-boot-project:devops-boot-core:devops-schedule:devops-schedule-common")) api(project(":devops-boot-project:devops-boot-core:devops-schedule:devops-schedule-model")) api("io.jsonwebtoken:jjwt-api") + api("com.google.guava:guava") runtimeOnly("io.jsonwebtoken:jjwt-impl") runtimeOnly("io.jsonwebtoken:jjwt-jackson") compileOnly("org.springframework.cloud:spring-cloud-starter") diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/config/ScheduleServerAutoConfiguration.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/config/ScheduleServerAutoConfiguration.kt index b97f9eb..a15ca6a 100644 --- a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/config/ScheduleServerAutoConfiguration.kt +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/config/ScheduleServerAutoConfiguration.kt @@ -10,15 +10,21 @@ import com.tencent.devops.schedule.provider.LockProvider import com.tencent.devops.schedule.provider.WorkerProvider import com.tencent.devops.schedule.scheduler.DefaultJobScheduler import com.tencent.devops.schedule.scheduler.JobScheduler +import com.tencent.devops.schedule.scheduler.ScheduleServerMetricsListener +import io.micrometer.core.instrument.MeterRegistry import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.context.ApplicationEventPublisher import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.context.annotation.Import @Configuration(proxyBeanMethods = false) @EnableConfigurationProperties(ScheduleServerProperties::class) -@Import(ScheduleServerWebConfiguration::class) +@Import( + ScheduleServerWebConfiguration::class, + ScheduleServerMetricsListener::class, +) class ScheduleServerAutoConfiguration { @Bean @@ -28,9 +34,19 @@ class ScheduleServerAutoConfiguration { workerManager: WorkerManager, lockProvider: LockProvider, scheduleServerProperties: ScheduleServerProperties, - workerRpcClient: WorkerRpcClient + workerRpcClient: WorkerRpcClient, + registry: MeterRegistry, + publisher: ApplicationEventPublisher, ): JobScheduler { - return DefaultJobScheduler(jobManager, workerManager, lockProvider, scheduleServerProperties, workerRpcClient) + return DefaultJobScheduler( + jobManager, + workerManager, + lockProvider, + scheduleServerProperties, + workerRpcClient, + registry, + publisher, + ) } @Bean @@ -44,5 +60,4 @@ class ScheduleServerAutoConfiguration { fun workerManager(workerProvider: WorkerProvider): WorkerManager { return DefaultWorkerManager(workerProvider) } - } diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/config/ScheduleServerProperties.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/config/ScheduleServerProperties.kt index 614df0c..072eea3 100644 --- a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/config/ScheduleServerProperties.kt +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/config/ScheduleServerProperties.kt @@ -24,6 +24,22 @@ data class ScheduleServerProperties( */ var maxTriggerPoolSize: Int = 100, + /** + * 任务触发线程池队列大小 + */ + var maxTriggerQueueSize: Int = 1000, + + /** + * 调度延迟SLO,单位s + * */ + var slo: List = listOf(5), + + /** + * 期望的最大的调度延迟时间 + * 当实际调度延迟大于设置阈值时,则会降低调度吞吐量,以达到期望的调度延迟 + * */ + var maxScheduleLatencyMillis: Long = Long.MAX_VALUE, + /** * UI配置 */ @@ -32,7 +48,7 @@ data class ScheduleServerProperties( /** * 认证配置 */ - var auth: ScheduleServerAuthProperties = ScheduleServerAuthProperties() + var auth: ScheduleServerAuthProperties = ScheduleServerAuthProperties(), ) { companion object { const val PREFIX = "devops.schedule.server" @@ -42,7 +58,7 @@ data class ScheduleServerProperties( /** * 是否开启ui界面 */ - var enabled: Boolean = true + var enabled: Boolean = true, ) class ScheduleServerAuthProperties( diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/manager/DefaultJobManager.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/manager/DefaultJobManager.kt index 4ff5837..31743ac 100644 --- a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/manager/DefaultJobManager.kt +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/manager/DefaultJobManager.kt @@ -49,8 +49,8 @@ open class DefaultJobManager( return jobProvider.listJobPage(param) } - override fun findTodoJobs(time: Long): List { - return jobProvider.findTodoJobs(time) + override fun findTodoJobs(time: Long, limit: Int): List { + return jobProvider.findTodoJobs(time, limit) } override fun listLogPage(param: LogQueryParam): Page { @@ -144,6 +144,14 @@ open class DefaultJobManager( requireNotNull(jobInfo) with(request) { + // 验证调度参数 + scheduleType?.let { + val scheduleTypeEnum = ScheduleTypeEnum.ofCode(it) + requireNotNull(scheduleTypeEnum) + validateScheduleParameter(scheduleConf.orEmpty(), scheduleTypeEnum) + jobInfo.scheduleType = it + jobInfo.scheduleConf = scheduleConf.orEmpty() + } description?.let { jobInfo.description = description } @@ -181,6 +189,9 @@ open class DefaultJobManager( source?.let { jobInfo.source = it } + maxRetryCount?.let { + jobInfo.maxRetryCount = it + } jobInfo.updateTime = LocalDateTime.now() jobProvider.updateJob(jobInfo).also { logger.info("update job[${jobInfo.id}] success") diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/manager/JobManager.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/manager/JobManager.kt index b29a99f..4a6f597 100644 --- a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/manager/JobManager.kt +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/manager/JobManager.kt @@ -73,8 +73,9 @@ interface JobManager { /** * 加载在指定时间前触发的任务列表 * @param time 指定时间 + * @param limit 返回任务限制数量 */ - fun findTodoJobs(time: Long): List + fun findTodoJobs(time: Long, limit: Int): List /** * 添加任务日志 diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/scheduler/DefaultJobScheduler.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/scheduler/DefaultJobScheduler.kt index aba06a8..7e2771a 100644 --- a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/scheduler/DefaultJobScheduler.kt +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/scheduler/DefaultJobScheduler.kt @@ -1,5 +1,8 @@ package com.tencent.devops.schedule.scheduler +import com.google.common.cache.CacheBuilder +import com.google.common.cache.CacheLoader +import com.google.common.cache.LoadingCache import com.tencent.devops.schedule.api.WorkerRpcClient import com.tencent.devops.schedule.config.ScheduleServerProperties import com.tencent.devops.schedule.enums.BlockStrategyEnum @@ -15,14 +18,19 @@ import com.tencent.devops.schedule.pojo.trigger.TriggerParam import com.tencent.devops.schedule.pojo.worker.WorkerGroup import com.tencent.devops.schedule.provider.LockProvider import com.tencent.devops.schedule.router.Routes +import com.tencent.devops.schedule.scheduler.event.JobTriggerEvent import com.tencent.devops.schedule.scheduler.monitor.JobRetryMonitor import com.tencent.devops.schedule.scheduler.monitor.JobTodoMonitor import com.tencent.devops.schedule.scheduler.monitor.WorkerStatusMonitor +import io.micrometer.core.instrument.Gauge +import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics import org.slf4j.LoggerFactory import org.springframework.beans.factory.DisposableBean import org.springframework.beans.factory.InitializingBean +import org.springframework.context.ApplicationEventPublisher import java.time.LocalDateTime -import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.LinkedBlockingDeque import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit @@ -35,6 +43,8 @@ class DefaultJobScheduler( private val lockProvider: LockProvider, private val scheduleServerProperties: ScheduleServerProperties, private val workerRpcClient: WorkerRpcClient, + private val registry: MeterRegistry, + private val publisher: ApplicationEventPublisher, ) : JobScheduler, InitializingBean, DisposableBean { private lateinit var triggerThreadPool: ThreadPoolExecutor @@ -43,17 +53,25 @@ class DefaultJobScheduler( // private lateinit var jobLostMonitor: JobLostMonitor private lateinit var workerStatusMonitor: WorkerStatusMonitor + private var cc = JobCongestionControl(scheduleServerProperties.maxScheduleLatencyMillis) + + private var workGroupCache: LoadingCache = CacheBuilder.newBuilder() + .maximumSize(1000) + .expireAfterWrite(3, TimeUnit.SECONDS) + .build(CacheLoader.from { groupId -> groupId?.let { workerManager.findGroupById(it) } }) override fun getJobManager() = jobManager override fun getWorkerManager() = workerManager override fun start() { triggerThreadPool = createThreadPool() - jobTodoMonitor = JobTodoMonitor(this, lockProvider).apply { start() } + ExecutorServiceMetrics(triggerThreadPool, "triggerThreadPool", emptyList()).bindTo(registry) + jobTodoMonitor = JobTodoMonitor(this, lockProvider, publisher, this::determineLoadCount).apply { start() } jobRetryMonitor = JobRetryMonitor(this).apply { start() } // TODO 初版实现不去主动监控任务状态丢失的任务,触发成功则表示执行成功,执行结果由worker上报 // jobLostMonitor = JobLostMonitor(this).apply { start() } workerStatusMonitor = WorkerStatusMonitor(this).apply { start() } + monitor() logger.info("start upjob scheduler success") } @@ -80,42 +98,66 @@ class DefaultJobScheduler( * */ override fun trigger( - jobId: String, + triggerContext: JobTriggerContext, triggerType: TriggerTypeEnum, retryCount: Int?, jobParam: String?, shardingParam: String?, ) { - logger.debug("prepare trigger job[$jobId]") + val jobId = triggerContext.job.id.orEmpty() + logger.debug("prepare trigger job[{}]", triggerContext) triggerThreadPool.execute { + val startTime = System.currentTimeMillis() + val triggerTime = triggerContext.scheduledFireTime ?: triggerContext.fireTime try { - val job = jobManager.findJobById(jobId) ?: run { - logger.warn("trigger job[$jobId] failed: job not exists.") - return@execute - } + val job = triggerContext.job jobParam?.let { job.jobParam = it } val finalRetryCount = retryCount ?: job.maxRetryCount - val group = workerManager.findGroupById(job.groupId) ?: run { - logger.warn("trigger job[$jobId] failed: group[${job.groupId}] not exists.") - return@execute + val group = workGroupCache.get(job.groupId) ?: run { + throw IllegalArgumentException("group[${job.groupId}] not exists.") } val pair = resolveShardingParam(shardingParam) if (pair == null && isShardingBroadcastJob(job, group)) { repeat(group.registryList.size) { - processTrigger(job, group, triggerType, finalRetryCount, it, group.registryList.size) + processTrigger(triggerContext, group, triggerType, finalRetryCount, it, group.registryList.size) } } else { val index = pair?.first ?: 0 val total = pair?.second ?: 1 - processTrigger(job, group, triggerType, finalRetryCount, index, total) + processTrigger(triggerContext, group, triggerType, finalRetryCount, index, total) } } catch (e: Exception) { logger.error("trigger job[$jobId] error: ${e.message}", e) + } finally { + val triggerEvent = JobTriggerEvent.create( + jobId, + startTime, + System.currentTimeMillis(), + triggerTime, + ) + publisher.publishEvent(triggerEvent) + cc.updateLatency(System.currentTimeMillis() - triggerTime) } } } + override fun trigger( + jobId: String, + triggerType: TriggerTypeEnum, + retryCount: Int?, + jobParam: String?, + shardingParam: String?, + ) { + val job = jobManager.findJobById(jobId) ?: throw IllegalArgumentException("No job $jobId") + val triggerContext = JobTriggerContext( + job = job, + fireTime = System.currentTimeMillis(), + prevFireTime = job.lastTriggerTime, + ) + trigger(triggerContext, triggerType, retryCount, jobParam, shardingParam) + } + private fun isShardingBroadcastJob(job: JobInfo, group: WorkerGroup): Boolean { return job.routeStrategy == RouteStrategyEnum.SHARDING_BROADCAST.code() && group.registryList.isNotEmpty() } @@ -124,13 +166,14 @@ class DefaultJobScheduler( * 处理 */ private fun processTrigger( - job: JobInfo, + triggerContext: JobTriggerContext, group: WorkerGroup, triggerType: TriggerTypeEnum, retryCount: Int, index: Int, total: Int, ) { + val job = triggerContext.job val blockStrategy = BlockStrategyEnum.ofCode(job.blockStrategy) val routeStrategy = RouteStrategyEnum.ofCode(job.routeStrategy) requireNotNull(blockStrategy) @@ -174,8 +217,12 @@ class DefaultJobScheduler( val result = workerRpcClient.runJob(triggerParam) jobLog.triggerCode = result.code jobLog.triggerMsg = result.message - jobLog.executionCode = ExecutionCodeEnum.RUNNING.code() - logger.info("trigger job[${job.id}] success: $result") + if (result.code == TriggerCodeEnum.SUCCESS.code()) { + jobLog.executionCode = ExecutionCodeEnum.RUNNING.code() + logger.info("trigger job[${job.id}] success: $result") + } else { + logger.info("trigger job[${job.id}] failed: $result") + } } catch (e: Exception) { logger.error("trigger job[${jobLog.jobId}] error: ${e.message}", e) jobLog.triggerCode = TriggerCodeEnum.FAILED.code() @@ -194,12 +241,13 @@ class DefaultJobScheduler( * 创建任务trigger调度线程池 */ private fun createThreadPool(): ThreadPoolExecutor { + // 队列不能太长,如果下游延迟加大,会导致队列中的所有任务调度延迟都加大 return ThreadPoolExecutor( - 10, + Runtime.getRuntime().availableProcessors() * 2, scheduleServerProperties.maxTriggerPoolSize, 60L, TimeUnit.SECONDS, - LinkedBlockingQueue(1000), + LinkedBlockingDeque(scheduleServerProperties.maxTriggerQueueSize), ) { runnable -> Thread(runnable, "job-trigger-${runnable.hashCode()}") } @@ -221,6 +269,23 @@ class DefaultJobScheduler( } } + private fun monitor() { + Gauge.builder("devops.schedule.server.trigger.load", this::determineLoadCount) + .description("加载任务数") + .baseUnit("TASK") + .register(registry) + } + + private fun determineLoadCount(): Int { + val availableThreads = if (triggerThreadPool.poolSize == 0) { + // 线程池还未提交任务 + triggerThreadPool.corePoolSize + } else { + triggerThreadPool.poolSize - triggerThreadPool.activeCount + } + return maxOf(availableThreads, cc.getCwnd()) + } + companion object { private val logger = LoggerFactory.getLogger(DefaultJobScheduler::class.java) } diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/scheduler/JobCongestionControl.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/scheduler/JobCongestionControl.kt new file mode 100644 index 0000000..3843f34 --- /dev/null +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/scheduler/JobCongestionControl.kt @@ -0,0 +1,57 @@ +package com.tencent.devops.schedule.scheduler + +import org.slf4j.LoggerFactory + +/** + * 任务拥塞控制 + * 采用TCP Reno算法,开始时进入慢速启动阶段,窗口数指数上升, + * 当延迟超过阈值时,将窗口数减少至一半,然后进入拥塞避免阶段,窗口数线性增加 + * */ +class JobCongestionControl(private val maxLatency: Long) { + private var cwnd: Int = 1 // 拥塞窗口大小 + private var ssthresh: Int = 256 // 拥塞阈值 + private var state: State = State.SLOW_START + + fun updateLatency(latency: Long) { + if (latency < maxLatency) { + onGood() + } else { + onBad() + } + } + + private fun onGood() { + when (state) { + State.SLOW_START -> { + cwnd *= 2 // 每次确认时窗口大小翻倍 + if (cwnd >= ssthresh) { + state = State.CONGESTION_AVOIDANCE + } + } + + State.CONGESTION_AVOIDANCE -> { + cwnd = minOf(MAX_CWND, cwnd + 1) // 每次确认时窗口大小线性增加 + } + } + } + + private fun onBad() { + ssthresh = (cwnd / 2).coerceAtLeast(1) // 更新阈值 + cwnd = ssthresh // 重置窗口大小 + state = State.CONGESTION_AVOIDANCE // 进入拥塞避免 + } + + fun getCwnd(): Int { + return cwnd + } + + enum class State { + SLOW_START, + CONGESTION_AVOIDANCE, + } + + companion object { + private val logger = LoggerFactory.getLogger(JobCongestionControl::class.java) + private const val MAX_CWND = 1000000 + } +} diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/scheduler/JobScheduler.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/scheduler/JobScheduler.kt index f71cda4..1b931ba 100644 --- a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/scheduler/JobScheduler.kt +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/scheduler/JobScheduler.kt @@ -14,12 +14,27 @@ interface JobScheduler { */ fun start() - /** * 停止调度器 */ fun stop() + /** + * 触发任务 + * @param jobId 任务id + * @param triggerType 触发类型 + * @param retryCount 重试次数,可选。不为空时,job参数使用[jobParam] + * @param jobParam 任务参数,可选。null则使用默认job参数 + * @param shardingParam 分片参数,可选 + */ + fun trigger( + triggerContext: JobTriggerContext, + triggerType: TriggerTypeEnum, + retryCount: Int? = null, + jobParam: String? = null, + shardingParam: String? = null, + ) + /** * 触发任务 * @param jobId 任务id @@ -33,7 +48,7 @@ interface JobScheduler { triggerType: TriggerTypeEnum, retryCount: Int? = null, jobParam: String? = null, - shardingParam: String? = null + shardingParam: String? = null, ) /** diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/scheduler/JobTriggerContext.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/scheduler/JobTriggerContext.kt new file mode 100644 index 0000000..7ebf320 --- /dev/null +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/scheduler/JobTriggerContext.kt @@ -0,0 +1,29 @@ +package com.tencent.devops.schedule.scheduler + +import com.tencent.devops.schedule.pojo.job.JobInfo + +/** + * 任务触发上下文 + * */ +data class JobTriggerContext( + /** + * 任务信息 + * */ + val job: JobInfo, + /** + * 触发时间 + * */ + val fireTime: Long, + /** + * 上次触发时间 + * */ + val prevFireTime: Long? = null, + /** + * 下次触发时间 + * */ + val nextFireTime: Long? = null, + /** + * 调度触发时间 + * */ + val scheduledFireTime: Long? = null, +) diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/scheduler/ScheduleServerMetricsListener.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/scheduler/ScheduleServerMetricsListener.kt new file mode 100644 index 0000000..7e90d93 --- /dev/null +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/scheduler/ScheduleServerMetricsListener.kt @@ -0,0 +1,72 @@ +package com.tencent.devops.schedule.scheduler + +import com.tencent.devops.schedule.config.ScheduleServerProperties +import com.tencent.devops.schedule.scheduler.event.JobMisfireEvent +import com.tencent.devops.schedule.scheduler.event.JobTriggerEvent +import io.micrometer.core.instrument.Counter +import io.micrometer.core.instrument.MeterRegistry +import io.micrometer.core.instrument.Timer +import org.slf4j.LoggerFactory +import org.springframework.context.event.EventListener +import java.time.Duration + +/** + * 调度服务器监控指标监听器 + * */ +class ScheduleServerMetricsListener( + val registry: MeterRegistry, + scheduleServerProperties: ScheduleServerProperties, +) { + private val slo = scheduleServerProperties.slo.map { Duration.ofSeconds(it.toLong()) }.toTypedArray() + + @EventListener(JobMisfireEvent::class) + fun jobMisfire(event: JobMisfireEvent) { + with(event) { + // 任务超时未触发 + logger.info("Job[$jobId] is misfire") + getJobCount(CountType.MISFIRE).increment() + } + } + + @EventListener(JobTriggerEvent::class) + fun jobTrigger(event: JobTriggerEvent) { + with(event) { + logger.info("Trigger job[$jobId], elapsed ${event.duration.toMillis()} ms,job latency ${event.latency.toMillis()} ms.") + getTriggerLatencyTimer().record(event.latency) + getTriggerTimer().record(event.duration) + getJobCount(CountType.TRIGGER).increment() + } + } + + private fun getTriggerLatencyTimer(): Timer { + return Timer.builder("devops.schedule.server.trigger.latency") + .description("任务调度延迟") + .publishPercentileHistogram() + .serviceLevelObjectives(*slo) + .minimumExpectedValue(Duration.ofSeconds(1)) + .maximumExpectedValue(Duration.ofSeconds(10)) + .register(registry) + } + + private fun getTriggerTimer(): Timer { + return Timer.builder("devops.schedule.server.trigger.time") + .description("任务调度耗时") + .register(registry) + } + + private fun getJobCount(type: CountType): Counter { + return Counter.builder("devops.schedule.server.trigger.count") + .description("任务调度数量") + .tag("type", type.name) + .register(registry) + } + + enum class CountType { + MISFIRE, + TRIGGER, + } + + companion object { + private val logger = LoggerFactory.getLogger(ScheduleServerMetricsListener::class.java) + } +} diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/scheduler/event/JobEvent.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/scheduler/event/JobEvent.kt new file mode 100644 index 0000000..a798ade --- /dev/null +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/scheduler/event/JobEvent.kt @@ -0,0 +1,8 @@ +package com.tencent.devops.schedule.scheduler.event + +/** + * 任务事件 + * */ +open class JobEvent( + val jobId: String, +) diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/scheduler/event/JobMisfireEvent.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/scheduler/event/JobMisfireEvent.kt new file mode 100644 index 0000000..9876495 --- /dev/null +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/scheduler/event/JobMisfireEvent.kt @@ -0,0 +1,6 @@ +package com.tencent.devops.schedule.scheduler.event + +/** + * 任务调度错过事件 + * */ +class JobMisfireEvent(jobId: String) : JobEvent(jobId) diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/scheduler/event/JobTriggerEvent.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/scheduler/event/JobTriggerEvent.kt new file mode 100644 index 0000000..e351045 --- /dev/null +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/scheduler/event/JobTriggerEvent.kt @@ -0,0 +1,28 @@ +package com.tencent.devops.schedule.scheduler.event + +import java.time.Duration + +/** + * 任务触发事件 + * */ +class JobTriggerEvent( + /** + * 触发耗时 + * */ + val duration: Duration, + /** + * 触发的任务延迟 + * */ + val latency: Duration, + jobId: String, +) : JobEvent(jobId) { + companion object { + fun create(jobId: String, triggerStartTime: Long, triggerEndTime: Long, triggerTime: Long): JobTriggerEvent { + return JobTriggerEvent( + Duration.ofMillis(triggerEndTime - triggerStartTime), + Duration.ofMillis(triggerEndTime - triggerTime), + jobId, + ) + } + } +} diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/scheduler/monitor/JobTodoMonitor.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/scheduler/monitor/JobTodoMonitor.kt index c116846..c5dc7e6 100644 --- a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/scheduler/monitor/JobTodoMonitor.kt +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-server/src/main/kotlin/com/tencent/devops/schedule/scheduler/monitor/JobTodoMonitor.kt @@ -4,53 +4,59 @@ import com.tencent.devops.schedule.constants.JOB_LOAD_LOCK_KEY import com.tencent.devops.schedule.constants.PRE_LOAD_TIME import com.tencent.devops.schedule.enums.MisfireStrategyEnum import com.tencent.devops.schedule.enums.TriggerStatusEnum -import com.tencent.devops.schedule.enums.TriggerTypeEnum +import com.tencent.devops.schedule.enums.TriggerTypeEnum.CRON +import com.tencent.devops.schedule.enums.TriggerTypeEnum.MISFIRE import com.tencent.devops.schedule.pojo.job.JobInfo import com.tencent.devops.schedule.provider.LockProvider import com.tencent.devops.schedule.scheduler.JobScheduler +import com.tencent.devops.schedule.scheduler.JobTriggerContext +import com.tencent.devops.schedule.scheduler.event.JobMisfireEvent import com.tencent.devops.schedule.utils.alignTime import com.tencent.devops.schedule.utils.computeNextTriggerTime import com.tencent.devops.schedule.utils.sleep import com.tencent.devops.schedule.utils.terminate import org.slf4j.LoggerFactory +import org.springframework.context.ApplicationEventPublisher import java.time.Instant import java.time.LocalDateTime import java.time.ZoneId import java.util.concurrent.ConcurrentHashMap import kotlin.concurrent.thread -import kotlin.system.measureTimeMillis /** * 任务加载器 */ open class JobTodoMonitor( private val jobScheduler: JobScheduler, - private val lockProvider: LockProvider + private val lockProvider: LockProvider, + private val publisher: ApplicationEventPublisher, + private val loadCountFunction: () -> Int, ) { private val jobManager = jobScheduler.getJobManager() - private val timeRingMap: MutableMap> = ConcurrentHashMap() + private val timeRingMap: MutableMap> = ConcurrentHashMap() private lateinit var loaderThread: Thread private lateinit var timeRingThread: Thread private var loaderThreadRunning = true private var timeRingThreadRunning = true - fun start() { loaderThread = thread( isDaemon = true, - name = "job-todo-loader-thread" + name = "job-todo-loader-thread", ) { alignTime(PRE_LOAD_TIME) while (loaderThreadRunning) { - val elapsed = measureTimeMillis { loadJobs() } + val start = System.currentTimeMillis() + val loads = loadJobs() + val elapsed = System.currentTimeMillis() - start if (elapsed < 1000) { - alignTime(PRE_LOAD_TIME) + alignTime(if (loads > 0) 1000 else PRE_LOAD_TIME) } } } timeRingThread = thread( isDaemon = true, - name = "job-time-ring-thread" + name = "job-time-ring-thread", ) { while (timeRingThreadRunning) { alignTime(1000) @@ -68,7 +74,7 @@ open class JobTodoMonitor( var hasRingData = false if (timeRingMap.isNotEmpty()) { for (second in timeRingMap.keys) { - if(timeRingMap[second]?.isNotEmpty() == true) { + if (timeRingMap[second]?.isNotEmpty() == true) { hasRingData = true break } @@ -90,24 +96,27 @@ open class JobTodoMonitor( // 加锁 lockToken = lockProvider.acquire(JOB_LOAD_LOCK_KEY, PRE_LOAD_TIME) if (lockToken == null) { - return count + return 0 } - val now = System.currentTimeMillis() - val jobs = jobManager.findTodoJobs(now + PRE_LOAD_TIME) + val jobs = jobManager.findTodoJobs(System.currentTimeMillis() + PRE_LOAD_TIME, loadCountFunction()) count = jobs.size for (job in jobs) { + // 每次都获取新的当前时间戳,可以提高任务精度 + val now = System.currentTimeMillis() val nextTriggerTime = job.nextTriggerTime if (nextTriggerTime < now - PRE_LOAD_TIME) { // 超出执行时间,任务过期 handleMisfireJob(job) } else if (nextTriggerTime <= now) { // 未超出执行时间,立即触发,计算延迟触发时长 - triggerJob(job, now) + triggerJob(job) } else { // 还未到执行时间,time-ring 触发 pushTimeRingJob(job) } - jobManager.updateJobSchedule(job) + } + jobs.forEach { + jobManager.updateJobSchedule(it) } } catch (e: Exception) { logger.error("load jobs failed: $e", e) @@ -122,15 +131,22 @@ open class JobTodoMonitor( private fun triggerTimeRingJobs() { try { + val start = System.currentTimeMillis() val second = LocalDateTime.now().second - // 向前处理2秒 - val jobs = mutableListOf() - repeat(3) { + logger.trace("Second $second") + // 避免调度时间过长,导致任务丢失,所以这里向前跨度处理2秒 + val jobs = mutableListOf() + repeat(TICK_RANGE) { jobs.addAll(timeRingMap.remove((second + 60 - it) % 60).orEmpty()) } jobs.forEach { - jobScheduler.trigger(it, TriggerTypeEnum.CRON) + jobScheduler.trigger(it, CRON) } + val elapsed = System.currentTimeMillis() - start + if (elapsed > TICK_RANGE * 1000) { + logger.warn("Too much tasks at slot $second") + } + logger.trace("Second $second completed,trigger ${jobs.size} jobs, elapsed $elapsed ms.") jobs.clear() } catch (e: Exception) { logger.error("trigger time ring jobs failed: $e", e) @@ -141,27 +157,28 @@ open class JobTodoMonitor( * 处理过期任务 */ private fun handleMisfireJob(job: JobInfo) { + val triggerContext = triggerFired(job) when (MisfireStrategyEnum.ofCode(job.misfireStrategy)) { MisfireStrategyEnum.RETRY -> { - logger.warn("job[${job.id}] is misfire, retry") - jobScheduler.trigger(job.id.orEmpty(), TriggerTypeEnum.MISFIRE) + logger.warn("${job.id} is misfire, retry") + jobScheduler.trigger(triggerContext, MISFIRE) } + MisfireStrategyEnum.IGNORE -> { - logger.warn("job[${job.id}] is misfire, ignore") + logger.warn("${job.id} is misfire, ignore") // do nothing } } - // fresh next - generateNextTriggerTime(job) + publisher.publishEvent(JobMisfireEvent(job.id.orEmpty())) } /** * 触发任务 */ - private fun triggerJob(job: JobInfo, now: Long) { - jobScheduler.trigger(job.id.orEmpty(), TriggerTypeEnum.CRON) - generateNextTriggerTime(job) - if (job.triggerStatus == TriggerStatusEnum.RUNNING.code() && job.nextTriggerTime <= now + PRE_LOAD_TIME) { + private fun triggerJob(job: JobInfo) { + val triggerContext = triggerFired(job) + jobScheduler.trigger(triggerContext, CRON) + if (job.triggerStatus == TriggerStatusEnum.RUNNING.code() && job.nextTriggerTime <= System.currentTimeMillis() + PRE_LOAD_TIME) { pushTimeRingJob(job) } } @@ -171,10 +188,10 @@ open class JobTodoMonitor( */ private fun pushTimeRingJob(job: JobInfo) { val ringSecond = (job.nextTriggerTime / 1000 % 60).toInt() - val timeRingItem = timeRingMap.computeIfAbsent(ringSecond) { mutableListOf() } - timeRingItem.add(job.id.orEmpty()) val from = Instant.ofEpochMilli(job.nextTriggerTime).atZone(ZoneId.systemDefault()).toLocalDateTime() - generateNextTriggerTime(job, from) + val triggerContext = triggerFired(job, from) + val timeRingItem = timeRingMap.computeIfAbsent(ringSecond) { mutableListOf() } + timeRingItem.add(triggerContext) } private fun generateNextTriggerTime(job: JobInfo, from: LocalDateTime = LocalDateTime.now()) { @@ -185,7 +202,7 @@ open class JobTodoMonitor( job.nextTriggerTime = 0 logger.warn( "failed to generate next trigger time for job[${job.id}]: scheduleType={${job.scheduleType}}, " + - "scheduleConf={${job.scheduleConf}}" + "scheduleConf={${job.scheduleConf}}", ) return } @@ -193,7 +210,20 @@ open class JobTodoMonitor( job.nextTriggerTime = nextTriggerTime } + private fun triggerFired(job: JobInfo, from: LocalDateTime = LocalDateTime.now()): JobTriggerContext { + val prevFireTime = job.lastTriggerTime + generateNextTriggerTime(job, from) + return JobTriggerContext( + job = job, + fireTime = System.currentTimeMillis(), + prevFireTime = prevFireTime, + nextFireTime = job.nextTriggerTime, + scheduledFireTime = job.lastTriggerTime, + ) + } + companion object { private val logger = LoggerFactory.getLogger(JobTodoMonitor::class.java) + private const val TICK_RANGE = 3 // 时间轮处理任务范围,这里的3表示,每次可以处理3个刻度,即过去2s到当前这一秒 } } diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/config/ScheduleWorkerProperties.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/config/ScheduleWorkerProperties.kt index 9788184..4c04c1d 100644 --- a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/config/ScheduleWorkerProperties.kt +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/config/ScheduleWorkerProperties.kt @@ -41,21 +41,10 @@ data class ScheduleWorkerProperties( } class ScheduleWorkerExecutorProperties( - - /** - * 最大线程数 - */ - var maximumPoolSize: Int = 100, - /** * 核心线程数 */ - var corePoolSize: Int = 1, - - /** - * 线程存活时间,单位秒 - */ - var keepAliveTime: Long = 60, + var threads: Int = Runtime.getRuntime().availableProcessors() * 2, ) class ScheduleWorkerServerProperties( diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/executor/DefaultJobExecutor.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/executor/DefaultJobExecutor.kt index 998dff7..ac1cf0d 100644 --- a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/executor/DefaultJobExecutor.kt +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/executor/DefaultJobExecutor.kt @@ -12,10 +12,13 @@ import com.tencent.devops.schedule.handler.ShellHandler import com.tencent.devops.schedule.k8s.K8sHelper import com.tencent.devops.schedule.pojo.trigger.TriggerParam import com.tencent.devops.schedule.thread.JobThread +import com.tencent.devops.schedule.thread.JobThreadGroup +import com.tencent.devops.schedule.thread.TriggerTask import com.tencent.devops.web.util.SpringContextHolder import org.slf4j.LoggerFactory import org.springframework.beans.BeansException import org.springframework.beans.factory.DisposableBean +import java.lang.IllegalStateException import java.util.concurrent.ConcurrentHashMap /** @@ -26,12 +29,10 @@ class DefaultJobExecutor( private val serverRpcClient: ServerRpcClient, ) : JobExecutor, DisposableBean { - init { - Companion.serverRpcClient = serverRpcClient - } - private val k8sShellHandler: K8sShellHandler by lazy { createK8sHandler() } private val shellHandler: ShellHandler = ShellHandler(workerProperties.sourcePath) + private val threadGroup = JobThreadGroup(workerProperties.executor.threads, serverRpcClient) + private val jobThreadRepository = ConcurrentHashMap() override fun execute(param: TriggerParam) { val jobId = param.jobId @@ -66,16 +67,14 @@ class DefaultJobExecutor( val blockStrategy = BlockStrategyEnum.ofCode(param.blockStrategy) when (blockStrategy) { BlockStrategyEnum.DISCARD_LATER -> { - if (jobThread.running.get()) { - logger.warn("discard task $logId") - return + if (jobThread.hasRunningJobs(jobId)) { + throw IllegalStateException("discard task $logId by block strategy[DISCARD_LATER]") } } BlockStrategyEnum.COVER_EARLY -> { - if (jobThread.running.get()) { - logger.warn("cover early $logId") - jobThread = null + if (jobThread.hasRunningJobs(jobId)) { + jobThread.removeEarlyJob(jobId) } } @@ -83,28 +82,26 @@ class DefaultJobExecutor( // 入队 } } + } else { + jobThread = registerJobThread(jobId) } - if (jobThread == null) { - jobThread = registerJobThread(jobId, handler) - } - jobThread.pushTriggerQueue(param) + val task = TriggerTask(jobId, handler, param) + jobThread.pushTriggerQueue(task) } override fun destroy() { - jobThreadRepository.values.forEach { - logger.info("Destroying job thread ${it.name}") - val oldJobThread = removeJobThread(it.jobId) - if (oldJobThread != null) { - try { - oldJobThread.join() - } catch (e: Exception) { - logger.error("JobThread destroy(join) error, jobId:{}", oldJobThread.jobId) - } - } - } + threadGroup.close() jobThreadRepository.clear() } + private fun registerJobThread(jobId: String): JobThread { + return jobThreadRepository.computeIfAbsent(jobId) { threadGroup.next() } + } + + private fun loadJobThread(jobId: String): JobThread? { + return jobThreadRepository[jobId] + } + private fun createK8sHandler(): K8sShellHandler { val k8sProperties = workerProperties.k8s val k8sClient = K8sHelper.createClient(k8sProperties) @@ -113,26 +110,5 @@ class DefaultJobExecutor( companion object { private val logger = LoggerFactory.getLogger(DefaultJobExecutor::class.java) - private val jobThreadRepository = ConcurrentHashMap() - private lateinit var serverRpcClient: ServerRpcClient - fun registerJobThread(jobId: String, handler: JobHandler): JobThread { - val newJobThread = JobThread(jobId, handler, serverRpcClient) - newJobThread.start() - jobThreadRepository.putIfAbsent(jobId, newJobThread)?.toStop() - return newJobThread - } - - fun removeJobThread(jobId: String): JobThread? { - val oldJobThread = jobThreadRepository.remove(jobId) - if (oldJobThread != null) { - oldJobThread.toStop() - return oldJobThread - } - return null - } - - fun loadJobThread(jobId: String): JobThread? { - return jobThreadRepository[jobId] - } } } diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/thread/JobThread.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/thread/JobThread.kt index 2b16490..76ab167 100644 --- a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/thread/JobThread.kt +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/thread/JobThread.kt @@ -1,53 +1,55 @@ package com.tencent.devops.schedule.thread import com.tencent.devops.schedule.api.ServerRpcClient -import com.tencent.devops.schedule.executor.DefaultJobExecutor import com.tencent.devops.schedule.executor.JobContext -import com.tencent.devops.schedule.executor.JobHandler import com.tencent.devops.schedule.pojo.job.JobExecutionResult import com.tencent.devops.schedule.pojo.trigger.TriggerParam import com.tencent.devops.utils.jackson.readJsonString import org.slf4j.LoggerFactory import java.util.Base64 -import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.LinkedBlockingDeque import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicLong /** * 任务线程 * */ -class JobThread( - val jobId: String, - private val jobHandler: JobHandler, - private val serverRpcClient: ServerRpcClient, -) : Thread() { +class JobThread(private val serverRpcClient: ServerRpcClient) : Thread() { private val triggerLogIdSet = mutableSetOf() - private val triggerQueue = LinkedBlockingQueue() + private val triggerQueue = LinkedBlockingDeque() private val stop = AtomicBoolean(false) - private var idleTimes = 0 private var base64Decoder = Base64.getDecoder() - var running = AtomicBoolean(false) + + /** + * 待执行任务计数 + * */ + private val pendingTaskCount = ConcurrentHashMap() + + /** + * 取消任务,key为需要取消的任务id,value为设置时队尾的logId + * + * 实现在队列中快速的取消任务,如果遍历取消指定job的任务效率会很差,这里通过设置相关标志位,来决定任务是否应该执行 + * */ + private val cancelJobs = ConcurrentHashMap() init { - name = "JobThread-$jobId" + name = "JobThread-${threadId.incrementAndGet()}" } override fun run() { - logger.info("$name started") while (!stop.get()) { - idleTimes++ - running.set(false) try { - val triggerParam = triggerQueue.poll(3, TimeUnit.SECONDS) - if (triggerParam != null) { - running.set(true) - idleTimes = 0 + val task = triggerQueue.poll(3, TimeUnit.SECONDS) + if (task != null && shouldRun(task)) { + val triggerParam = task.param val logId = triggerParam.logId - triggerLogIdSet.remove(logId) val context = buildJobContext(triggerParam) // 执行任务逻辑,获取结果 val result = try { - jobHandler.execute(context) + task.jobHandler.execute(context) } catch (e: Throwable) { logger.error("execute job log[$logId] error: ${e.message}", e) JobExecutionResult.failed(e.message.orEmpty()) @@ -55,16 +57,7 @@ class JobThread( result.logId = logId logger.info("complete job log[$logId]: $result") // 上报任务结果 - try { - serverRpcClient.submitResult(result) - logger.info("submit job log[$logId] result success") - } catch (e: Exception) { - logger.error("submit job log[$logId] result error: ${e.message}", e) - } - } else if (idleTimes > 30 && triggerQueue.size == 0) { - // 超过最大空闲事件 - logger.info("executor idle times over limit") - DefaultJobExecutor.removeJobThread(jobId) + submitResult(task.jobId, result) } } catch (e: Exception) { if (!stop.get()) { @@ -72,29 +65,52 @@ class JobThread( } } } - logger.info("$name stopped") + while (triggerQueue.size > 0) { + val task = triggerQueue.poll() + val result = JobExecutionResult.failed("job not executed, because thread is stopped.") + result.logId = task.param.logId + submitResult(task.jobId, result) + } } - fun pushTriggerQueue(triggerParam: TriggerParam): Boolean { - if (triggerLogIdSet.contains(triggerParam.logId)) { + fun pushTriggerQueue(task: TriggerTask): Boolean { + if (triggerLogIdSet.contains(task.param.logId)) { return false } - triggerQueue.add(triggerParam) - triggerLogIdSet.add(triggerParam.logId) + triggerQueue.add(task) + triggerLogIdSet.add(task.param.logId) + pendingTaskCount.getOrPut(task.jobId) { AtomicInteger() }.incrementAndGet() return true } fun toStop() { logger.info("Stopping $name") stop.set(true) - if (running.get()) { - logger.info("$name is running now,waiting...") - while (running.get()) { - // empty - } + } + + fun removeEarlyJob(jobId: String) { + triggerQueue.peekLast()?.let { + cancelJobs[jobId] = it.param.logId } } + fun hasRunningJobs(jobId: String): Boolean { + val count = pendingTaskCount[jobId]?.get() ?: 0 + return count > 0 + } + + private fun shouldRun(task: TriggerTask): Boolean { + val stopLogId = cancelJobs[task.jobId] ?: return true + val logId = task.param.logId + if (logId == stopLogId) { + cancelJobs.remove(task.jobId) + } + val result = JobExecutionResult.failed("cancelled by block strategy[COVER_EARLY]") + result.logId = logId + submitResult(task.jobId, result) + return false + } + private fun buildJobContext(param: TriggerParam): JobContext { with(param) { return JobContext( @@ -111,7 +127,20 @@ class JobThread( } } + private fun submitResult(jobId: String, result: JobExecutionResult) { + val logId = result.logId + try { + triggerLogIdSet.remove(logId) + pendingTaskCount[jobId]?.decrementAndGet() + serverRpcClient.submitResult(result) + logger.info("submit job log[$logId] result success") + } catch (e: Exception) { + logger.error("submit job log[$logId] result error: ${e.message}", e) + } + } + companion object { private val logger = LoggerFactory.getLogger(JobThread::class.java) + private val threadId = AtomicLong() } } diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/thread/JobThreadGroup.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/thread/JobThreadGroup.kt new file mode 100644 index 0000000..dc42a9c --- /dev/null +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/thread/JobThreadGroup.kt @@ -0,0 +1,32 @@ +package com.tencent.devops.schedule.thread + +import com.tencent.devops.schedule.api.ServerRpcClient +import java.util.concurrent.atomic.AtomicLong +import kotlin.math.abs + +/** + * 任务线程组 + * */ +class JobThreadGroup(nThreads: Int, serverRpcClient: ServerRpcClient) : AutoCloseable { + private val threads: MutableList = mutableListOf() + private val idx = AtomicLong() + + init { + for (i in 0 until nThreads) { + val jobThread = JobThread(serverRpcClient) + jobThread.start() + threads.add(jobThread) + } + } + + fun next(): JobThread { + return threads[abs((idx.getAndIncrement() % threads.size).toDouble()).toInt()] + } + + override fun close() { + threads.forEach { + it.toStop() + it.join() + } + } +} diff --git a/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/thread/TriggerTask.kt b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/thread/TriggerTask.kt new file mode 100644 index 0000000..1dc2cd3 --- /dev/null +++ b/devops-boot-project/devops-boot-core/devops-schedule/devops-schedule-worker/src/main/kotlin/com/tencent/devops/schedule/thread/TriggerTask.kt @@ -0,0 +1,10 @@ +package com.tencent.devops.schedule.thread + +import com.tencent.devops.schedule.executor.JobHandler +import com.tencent.devops.schedule.pojo.trigger.TriggerParam + +data class TriggerTask( + val jobId: String, + val jobHandler: JobHandler, + val param: TriggerParam, +) diff --git a/devops-boot-sample/boot-schedule-server-sample/build.gradle.kts b/devops-boot-sample/boot-schedule-server-sample/build.gradle.kts index d01f7f7..d2feab4 100644 --- a/devops-boot-sample/boot-schedule-server-sample/build.gradle.kts +++ b/devops-boot-sample/boot-schedule-server-sample/build.gradle.kts @@ -1,4 +1,5 @@ dependencies { api("com.tencent.devops:devops-boot-starter-service") api("com.tencent.devops:devops-boot-starter-schedule-server") + api("io.micrometer:micrometer-registry-prometheus") } diff --git a/devops-boot-sample/boot-schedule-worker-cloud-sample/build.gradle.kts b/devops-boot-sample/boot-schedule-worker-cloud-sample/build.gradle.kts index 1ee048d..99e1226 100644 --- a/devops-boot-sample/boot-schedule-worker-cloud-sample/build.gradle.kts +++ b/devops-boot-sample/boot-schedule-worker-cloud-sample/build.gradle.kts @@ -1,4 +1,5 @@ dependencies { api("com.tencent.devops:devops-boot-starter-service") api("com.tencent.devops:devops-boot-starter-schedule-worker") + api("io.micrometer:micrometer-registry-prometheus") }