Skip to content

Commit

Permalink
Merge pull request #225 from felixncheng/master
Browse files Browse the repository at this point in the history
feat: 优化schedule #221
  • Loading branch information
felixncheng authored Sep 24, 2024
2 parents 57f2194 + 44bdd73 commit 05f05f6
Show file tree
Hide file tree
Showing 29 changed files with 648 additions and 188 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,14 @@ data class JobUpdateRequest(
* 镜像地址,容器任务需要
* */
val image: String? = null,

/**
* 调度配置
* */
val scheduleConf: String? = null,

/**
* 调度类型
*/
val scheduleType: Int? = null,
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.tencent.devops.schedule.pojo.log

import com.fasterxml.jackson.annotation.JsonFormat
import com.tencent.devops.schedule.constants.DATE_TIME_PATTERN
import com.tencent.devops.schedule.enums.AlarmStatusEnum
import com.tencent.devops.schedule.enums.ExecutionCodeEnum
import java.time.LocalDateTime

Expand Down Expand Up @@ -36,7 +37,7 @@ data class JobLog(
*/
@JsonFormat(pattern = DATE_TIME_PATTERN)
var triggerTime: LocalDateTime,
var triggerCode : Int = ExecutionCodeEnum.INITIALED.code(),
var triggerCode: Int = ExecutionCodeEnum.INITIALED.code(),
var triggerMsg: String? = null,
var triggerType: Int,

Expand All @@ -51,5 +52,5 @@ data class JobLog(
/**
* alarm信息
*/
var alarmStatus: Int? = null
var alarmStatus: Int = AlarmStatusEnum.TODO.code(),
)
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@ import com.tencent.devops.schedule.provider.LockProvider
import com.tencent.devops.schedule.provider.WorkerProvider
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.event.ContextRefreshedEvent
import org.springframework.context.event.EventListener
import org.springframework.data.mongodb.core.MongoTemplate
import org.springframework.data.mongodb.core.index.IndexDefinition
import org.springframework.data.mongodb.core.index.MongoPersistentEntityIndexResolver
import org.springframework.data.mongodb.core.mapping.Document
import org.springframework.data.mongodb.repository.config.EnableMongoRepositories

@Configuration(proxyBeanMethods = false)
Expand All @@ -22,15 +27,15 @@ class MongoModelAutoConfiguration(
private val logRepository: LogRepository,
private val workerGroupRepository: WorkerGroupRepository,
private val workerRepository: WorkerRepository,
private val mongoTemplate: MongoTemplate
private val mongoTemplate: MongoTemplate,
) {

@Bean
fun jobProvider(): JobProvider {
return MongoJobProvider(
jobRepository = jobRepository,
logRepository = logRepository,
mongoTemplate = mongoTemplate
mongoTemplate = mongoTemplate,
)
}

Expand All @@ -39,12 +44,27 @@ class MongoModelAutoConfiguration(
return MongoWorkerProvider(
groupRepository = workerGroupRepository,
workerRepository = workerRepository,
mongoTemplate = mongoTemplate
mongoTemplate = mongoTemplate,
)
}

@Bean
fun lockProvider(): LockProvider {
return MongoLockProvider(mongoTemplate)
}

@EventListener(ContextRefreshedEvent::class)
fun initIndicesAfterStartup() {
val mappingContext = mongoTemplate.converter.mappingContext
val resolver = MongoPersistentEntityIndexResolver(mappingContext)
mappingContext.persistentEntities
.stream()
.filter { it.isAnnotationPresent(Document::class.java) }
.forEach {
val indexOps = mongoTemplate.indexOps(it.type)
resolver.resolveIndexFor(it.type).forEach { indexDefinition: IndexDefinition ->
indexOps.ensureIndex(indexDefinition)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package com.tencent.devops.schedule.mongo.model

import com.tencent.devops.schedule.mongo.model.TJobInfo.Companion.NEXT_TRIGGER_TIME_IDX
import com.tencent.devops.schedule.mongo.model.TJobInfo.Companion.NEXT_TRIGGER_TIME_IDX_DEF
import org.springframework.data.mongodb.core.index.CompoundIndex
import org.springframework.data.mongodb.core.mapping.Document
import java.time.LocalDateTime

/**
* 任务信息模型类
*/
@Document("job_info")
@CompoundIndex(name = NEXT_TRIGGER_TIME_IDX, def = NEXT_TRIGGER_TIME_IDX_DEF, background = true)
data class TJobInfo(

var id: String? = null,
Expand Down Expand Up @@ -108,4 +112,9 @@ data class TJobInfo(
* 镜像地址
* */
var image: String? = null,
)
) {
companion object {
const val NEXT_TRIGGER_TIME_IDX = "nextTriggerTime_idx"
const val NEXT_TRIGGER_TIME_IDX_DEF = "{'nextTriggerTime': 1}"
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,31 @@
package com.tencent.devops.schedule.mongo.model

import com.tencent.devops.schedule.mongo.model.TJobLog.Companion.ALARM_STATUS_IDX
import com.tencent.devops.schedule.mongo.model.TJobLog.Companion.ALARM_STATUS_IDX_DEF
import com.tencent.devops.schedule.mongo.model.TJobLog.Companion.EXECUTION_CODE_IDX
import com.tencent.devops.schedule.mongo.model.TJobLog.Companion.EXECUTION_CODE_IDX_DEF
import com.tencent.devops.schedule.mongo.model.TJobLog.Companion.JOB_ID_IDX
import com.tencent.devops.schedule.mongo.model.TJobLog.Companion.JOB_ID_IDX_DEF
import com.tencent.devops.schedule.mongo.model.TJobLog.Companion.TRIGGER_CODE_IDX
import com.tencent.devops.schedule.mongo.model.TJobLog.Companion.TRIGGER_CODE_IDX_DEF
import com.tencent.devops.schedule.mongo.model.TJobLog.Companion.TRIGGER_TIME_IDX
import com.tencent.devops.schedule.mongo.model.TJobLog.Companion.TRIGGER_TIME_IDX_DEF
import org.springframework.data.mongodb.core.index.CompoundIndex
import org.springframework.data.mongodb.core.index.CompoundIndexes
import org.springframework.data.mongodb.core.mapping.Document
import java.time.LocalDateTime

/**
* 任务信息模型类
*/
@Document("job_log")
@CompoundIndexes(
CompoundIndex(name = JOB_ID_IDX, def = JOB_ID_IDX_DEF, background = true),
CompoundIndex(name = TRIGGER_TIME_IDX, def = TRIGGER_TIME_IDX_DEF, background = true),
CompoundIndex(name = TRIGGER_CODE_IDX, def = TRIGGER_CODE_IDX_DEF, background = true),
CompoundIndex(name = EXECUTION_CODE_IDX, def = EXECUTION_CODE_IDX_DEF, background = true),
CompoundIndex(name = ALARM_STATUS_IDX, def = ALARM_STATUS_IDX_DEF, background = true),
)
data class TJobLog(

var id: String? = null,
Expand Down Expand Up @@ -38,7 +57,7 @@ data class TJobLog(
* trigger信息
*/
var triggerTime: LocalDateTime,
var triggerCode : Int,
var triggerCode: Int,
var triggerMsg: String? = null,
var triggerType: Int,

Expand All @@ -52,5 +71,18 @@ data class TJobLog(
/**
* alarm信息
*/
var alarmStatus: Int? = null
)
var alarmStatus: Int,
) {
companion object {
const val JOB_ID_IDX = "jobId_idx"
const val JOB_ID_IDX_DEF = "{'jobId': 1 , 'triggerTime': 1}"
const val TRIGGER_TIME_IDX = "triggerTime_idx"
const val TRIGGER_TIME_IDX_DEF = "{'triggerTime': 1}"
const val TRIGGER_CODE_IDX = "triggerCode_idx"
const val TRIGGER_CODE_IDX_DEF = "{'triggerCode': 1}"
const val EXECUTION_CODE_IDX = "executionCode_idx"
const val EXECUTION_CODE_IDX_DEF = "{'executionCode': 1}"
const val ALARM_STATUS_IDX = "alarmStatus_idx"
const val ALARM_STATUS_IDX_DEF = "{'alarmStatus': 1}"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import java.time.LocalDateTime
class MongoJobProvider(
private val jobRepository: JobRepository,
private val logRepository: LogRepository,
private val mongoTemplate: MongoTemplate
private val mongoTemplate: MongoTemplate,
) : JobProvider {
override fun addJob(jobInfo: JobInfo): String {
return jobInfo.convert().apply { jobRepository.save(this) }.id.orEmpty()
Expand Down Expand Up @@ -67,14 +67,14 @@ class MongoJobProvider(
pageNumber = param.pageNumber,
pageSize = param.pageSize,
totalRecords = total,
records = records
records = records,
)
}

override fun findTodoJobs(time: Long): List<JobInfo> {
override fun findTodoJobs(time: Long, limit: Int): List<JobInfo> {
val criteria = where(TJobInfo::nextTriggerTime).lte(time)
.and(TJobInfo::triggerStatus).isEqualTo(TriggerStatusEnum.RUNNING.code())
val query = Query(criteria)
val query = Query(criteria).limit(limit)
return mongoTemplate.find(query, TJobInfo::class.java).map { it.convert() }
}

Expand Down Expand Up @@ -118,7 +118,7 @@ class MongoJobProvider(
pageNumber = param.pageNumber,
pageSize = param.pageSize,
totalRecords = total,
records = records
records = records,
)
}

Expand All @@ -127,11 +127,11 @@ class MongoJobProvider(
val criteria = where(TJobLog::alarmStatus).isEqualTo(AlarmStatusEnum.TODO.code())
.orOperator(
where(TJobLog::triggerCode).lt(TriggerCodeEnum.INITIALED.code()),
where(TJobLog::executionCode).lt(ExecutionCodeEnum.INITIALED.code())
where(TJobLog::executionCode).lt(ExecutionCodeEnum.INITIALED.code()),
)
val query = Query.query(criteria).with(pageable)
query.fields().include(TJobLog::id.name)
return mongoTemplate.find(query, IdEntity::class.java).map { it.id }
return mongoTemplate.find(query, IdEntity::class.java, "job_log").map { it.id }
}

override fun findLostJobLogIds(triggerTime: LocalDateTime): List<String> {
Expand All @@ -140,7 +140,7 @@ class MongoJobProvider(
.and(TJobLog::triggerTime).lt(triggerTime)
val query = Query.query(criteria)
query.fields().include(TJobLog::id.name)
return mongoTemplate.find(query, IdEntity::class.java).map { it.id }
return mongoTemplate.find(query, IdEntity::class.java, "job_log").map { it.id }
}

override fun deleteLogByJobId(jobId: String) {
Expand All @@ -158,7 +158,7 @@ class MongoJobProvider(
logId: String,
executionCode: Int,
executionMessage: String,
executionTime: LocalDateTime
executionTime: LocalDateTime,
): Int {
val criteria = where(TJobLog::id).`is`(logId)
val query = Query.query(criteria)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.tencent.devops.schedule.mongo.provider

import com.mongodb.MongoServerException
import com.mongodb.client.result.UpdateResult
import com.tencent.devops.schedule.mongo.model.TLockInfo
import com.tencent.devops.schedule.provider.LockProvider
import org.slf4j.LoggerFactory
import org.springframework.dao.DuplicateKeyException
import org.springframework.data.mongodb.core.FindAndModifyOptions
import org.springframework.data.mongodb.core.MongoTemplate
import org.springframework.data.mongodb.core.query.Query
Expand Down Expand Up @@ -60,13 +60,9 @@ class MongoLockProvider(
} else {
null
}
} catch (e: MongoServerException) {
if (e.code == 11000) { // duplicate key
return null
} else {
throw e
}
} catch (ignore: DuplicateKeyException) {
}
return null
}

override fun release(key: String, token: String): Boolean {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ interface JobProvider {
/**
* 根据下次触发时间查询需要执行的任务
* @param time 下次触发时间
* @param limit 返回限制数量
*/
fun findTodoJobs(time: Long): List<JobInfo>
fun findTodoJobs(time: Long, limit: Int): List<JobInfo>

/**
* 根据id查找任务
Expand Down Expand Up @@ -108,7 +109,7 @@ interface JobProvider {
logId: String,
executionCode: Int,
executionMessage: String,
executionTime: LocalDateTime
executionTime: LocalDateTime,
): Int

/**
Expand All @@ -122,5 +123,4 @@ interface JobProvider {
* @param jobId 任务id
*/
fun deleteLogByJobId(jobId: String)

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ dependencies {
api(project(":devops-boot-project:devops-boot-core:devops-schedule:devops-schedule-common"))
api(project(":devops-boot-project:devops-boot-core:devops-schedule:devops-schedule-model"))
api("io.jsonwebtoken:jjwt-api")
api("com.google.guava:guava")
runtimeOnly("io.jsonwebtoken:jjwt-impl")
runtimeOnly("io.jsonwebtoken:jjwt-jackson")
compileOnly("org.springframework.cloud:spring-cloud-starter")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,21 @@ import com.tencent.devops.schedule.provider.LockProvider
import com.tencent.devops.schedule.provider.WorkerProvider
import com.tencent.devops.schedule.scheduler.DefaultJobScheduler
import com.tencent.devops.schedule.scheduler.JobScheduler
import com.tencent.devops.schedule.scheduler.ScheduleServerMetricsListener
import io.micrometer.core.instrument.MeterRegistry
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.ApplicationEventPublisher
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Import

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(ScheduleServerProperties::class)
@Import(ScheduleServerWebConfiguration::class)
@Import(
ScheduleServerWebConfiguration::class,
ScheduleServerMetricsListener::class,
)
class ScheduleServerAutoConfiguration {

@Bean
Expand All @@ -28,9 +34,19 @@ class ScheduleServerAutoConfiguration {
workerManager: WorkerManager,
lockProvider: LockProvider,
scheduleServerProperties: ScheduleServerProperties,
workerRpcClient: WorkerRpcClient
workerRpcClient: WorkerRpcClient,
registry: MeterRegistry,
publisher: ApplicationEventPublisher,
): JobScheduler {
return DefaultJobScheduler(jobManager, workerManager, lockProvider, scheduleServerProperties, workerRpcClient)
return DefaultJobScheduler(
jobManager,
workerManager,
lockProvider,
scheduleServerProperties,
workerRpcClient,
registry,
publisher,
)
}

@Bean
Expand All @@ -44,5 +60,4 @@ class ScheduleServerAutoConfiguration {
fun workerManager(workerProvider: WorkerProvider): WorkerManager {
return DefaultWorkerManager(workerProvider)
}

}
Loading

0 comments on commit 05f05f6

Please sign in to comment.