Skip to content

Commit

Permalink
Merge pull request #3327 from wangyu096/issue_3324
Browse files Browse the repository at this point in the history
feat: 优化分库分表迁移过程中,task_instance_id 动态查询条件构造逻辑 #3324
  • Loading branch information
wangyu096 authored Dec 12, 2024
2 parents d43a523 + 83828eb commit f893700
Show file tree
Hide file tree
Showing 15 changed files with 415 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,7 @@ public void assertRequiredInitParam(String paramName) {
public boolean checkRequiredContextParam(ToggleEvaluateContext context, String paramName) {
boolean checkResult = true;
if (context.getParam(paramName) == null) {
String msg = MessageFormatter.format(
"Context param {} is required for evaluate", paramName).getMessage();
log.warn(msg);
log.info("Context param {} is required for evaluate", paramName);
checkResult = false;
}
return checkResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public FilterRegistrationBean repeatableRSRRFilterRegister() {
FilterRegistrationBean<RepeatableReadWriteServletRequestResponseFilter> registration =
new FilterRegistrationBean<>();
registration.setFilter(repeatableRRRFilter());
registration.addUrlPatterns("/esb/api/*");
registration.addUrlPatterns("/esb/api/*", "/service/*", "/web/*");
registration.setName("repeatableReadRequestResponseFilter");
registration.setOrder(0);
return registration;
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public TaskContext getTaskContext(String username, TaskContextQuery contextQuery
InternalResponse<ServiceStepInstanceDTO> resp = serviceStepInstanceResource.getStepInstance(
username,
contextQuery.getAppId(),
contextQuery.getTaskInstanceId(),
contextQuery.getStepInstanceId()
);
if (resp.isSuccess()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,16 @@
@SmartFeignClient(value = "job-execute", contextId = "stepInstanceResource")
@InternalAPI
public interface ServiceStepInstanceResource {
@GetMapping("/service/stepInstance/appIds/{appId}/stepInstanceIds/{stepInstanceId}")
@GetMapping("/service/app/{appId}/taskInstance/{taskInstanceId}/stepInstance/{stepInstanceId}")
InternalResponse<ServiceStepInstanceDTO> getStepInstance(
@RequestHeader("username")
String username,
@ApiParam(value = "作业平台业务ID", required = true)
@PathVariable(value = "appId")
Long appId,
@ApiParam(value = "作业实例ID", name = "taskInstanceId", required = true)
@PathVariable("taskInstanceId")
Long taskInstanceId,
@ApiParam(value = "步骤实例ID", name = "stepInstanceId", required = true)
@PathVariable("stepInstanceId")
Long stepInstanceId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,13 @@ public ServiceStepInstanceResourceImpl(TaskInstanceAccessProcessor taskInstanceA
}

@Override
public InternalResponse<ServiceStepInstanceDTO> getStepInstance(String username, Long appId, Long stepInstanceId) {
public InternalResponse<ServiceStepInstanceDTO> getStepInstance(String username,
Long appId,
Long taskInstanceId,
Long stepInstanceId) {
try {
StepInstanceDTO stepInstance = stepInstanceService.getStepInstanceDetail(appId, stepInstanceId);
StepInstanceDTO stepInstance = stepInstanceService.getStepInstanceDetail(appId,
taskInstanceId, stepInstanceId);
taskInstanceAccessProcessor.processBeforeAccess(username, appId, stepInstance.getTaskInstanceId());
return InternalResponse.buildSuccessResp(stepInstance.toServiceStepInstanceDTO());
} catch (NotFoundException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,7 @@ public Response<StepExecutionDetailV2VO> getStepExecutionResult(String username,
String orderField,
Integer order) {
StepExecutionResultQuery query = StepExecutionResultQuery.builder()
.taskInstanceId(taskInstanceId)
.stepInstanceId(stepInstanceId)
.executeCount(executeCount)
.batch(batch == null ? null : (batch == 0 ? null : batch))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,26 @@

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.tencent.bk.job.common.WatchableThreadPoolExecutor;
import com.tencent.bk.job.execute.util.ContextExecutorService;
import io.micrometer.core.instrument.MeterRegistry;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Slf4j
@Configuration(value = "jobExecuteExecutorConfig")
public class ExecutorConfiguration {

@Bean("logExportExecutor")
public ThreadPoolExecutor logExportExecutor(MeterRegistry meterRegistry) {
public ExecutorService logExportExecutor(MeterRegistry meterRegistry) {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("log-export-thread-%d").build();
return new WatchableThreadPoolExecutor(
return ContextExecutorService.wrap(new WatchableThreadPoolExecutor(
meterRegistry,
"logExportExecutor",
10,
Expand All @@ -53,25 +54,25 @@ public ThreadPoolExecutor logExportExecutor(MeterRegistry meterRegistry) {
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
threadFactory
);
));
}

@Bean("getHostsByTopoExecutor")
public ThreadPoolExecutor getHostsByTopoExecutor(MeterRegistry meterRegistry) {
return new WatchableThreadPoolExecutor(
public ExecutorService getHostsByTopoExecutor(MeterRegistry meterRegistry) {
return ContextExecutorService.wrap(new WatchableThreadPoolExecutor(
meterRegistry,
"getHostsByTopoExecutor",
50,
100,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>()
);
));
}

@Bean("getHostTopoPathExecutor")
public ThreadPoolExecutor getHostTopoPathExecutor(MeterRegistry meterRegistry) {
return new WatchableThreadPoolExecutor(
public ExecutorService getHostTopoPathExecutor(MeterRegistry meterRegistry) {
return ContextExecutorService.wrap(new WatchableThreadPoolExecutor(
meterRegistry,
"getHostTopoPathExecutor",
5,
Expand All @@ -87,14 +88,14 @@ public ThreadPoolExecutor getHostTopoPathExecutor(MeterRegistry meterRegistry) {
Thread.currentThread().getName());
r.run();
}
);
));
}

@Bean("localFileDownloadExecutor")
public ThreadPoolExecutor localFileDownloadExecutor(LocalFileConfigForExecute localFileConfigForExecute,
MeterRegistry meterRegistry) {
public ExecutorService localFileDownloadExecutor(LocalFileConfigForExecute localFileConfigForExecute,
MeterRegistry meterRegistry) {
int concurrency = localFileConfigForExecute.getDownloadConcurrency();
return new WatchableThreadPoolExecutor(
return ContextExecutorService.wrap(new WatchableThreadPoolExecutor(
meterRegistry,
"localFileDownloadExecutor",
concurrency,
Expand All @@ -110,12 +111,12 @@ public ThreadPoolExecutor localFileDownloadExecutor(LocalFileConfigForExecute lo
Thread.currentThread().getName());
r.run();
}
);
));
}

@Bean("localFileWatchExecutor")
public ThreadPoolExecutor localFileWatchExecutor(MeterRegistry meterRegistry) {
return new WatchableThreadPoolExecutor(
public ExecutorService localFileWatchExecutor(MeterRegistry meterRegistry) {
return ContextExecutorService.wrap(new WatchableThreadPoolExecutor(
meterRegistry,
"localFileWatchExecutor",
0,
Expand All @@ -131,19 +132,19 @@ public ThreadPoolExecutor localFileWatchExecutor(MeterRegistry meterRegistry) {
Thread.currentThread().getName());
r.run();
}
);
));
}

@Bean("shutdownExecutor")
public ThreadPoolExecutor shutdownExecutor(MeterRegistry meterRegistry) {
return new WatchableThreadPoolExecutor(
public ExecutorService shutdownExecutor(MeterRegistry meterRegistry) {
return ContextExecutorService.wrap(new WatchableThreadPoolExecutor(
meterRegistry,
"shutdownExecutor",
10,
20,
120,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>()
);
));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
package com.tencent.bk.job.execute.dao.impl;

import com.tencent.bk.job.common.model.dto.ResourceScope;
import com.tencent.bk.job.common.util.JobContextUtil;
import com.tencent.bk.job.common.util.toggle.ToggleEvaluateContext;
import com.tencent.bk.job.common.util.toggle.ToggleStrategyContextParams;
import com.tencent.bk.job.common.util.toggle.feature.FeatureIdConstants;
Expand All @@ -51,33 +50,37 @@ public class TaskInstanceIdDynamicCondition {

public static Condition build(Long taskInstanceId,
Function<Long, Condition> taskInstanceIdConditionBuilder) {
ToggleEvaluateContext toggleEvaluateContext;
JobExecuteContext jobExecuteContext = JobExecuteContextThreadLocalRepo.get();
if (jobExecuteContext == null) {
log.info("TaskInstanceIdDynamicCondition : Empty JobExecuteContext!");
log.info("TaskInstanceIdDynamicCondition : EmptyJobExecuteContext!");
// JobExecuteContext 正常应该不会为 null 。为了不影响请求正常处理,忽略错误,直接返回 TRUE Condition
// (不会影响 DAO 查询,task_instance_id 仅作为分片功能实用,实际业务数据关系并不强依赖 task_instance_id)
return DSL.trueCondition();
}
ResourceScope resourceScope = jobExecuteContext.getResourceScope();
if (resourceScope == null) {
log.info("TaskInstanceIdDynamicCondition : Empty resource scope!");
// 无法根据业务决定是否使用 task_instance_id 作为查询条件。为了不影响请求正常处理,直接返回 TRUE Condition
// (不会影响 DAO 查询,task_instance_id 仅作为分片功能,实际业务数据关系并不强依赖 task_instance_id)
return DSL.trueCondition();
toggleEvaluateContext = ToggleEvaluateContext.EMPTY;
} else {
ResourceScope resourceScope = jobExecuteContext.getResourceScope();
if (resourceScope != null) {
toggleEvaluateContext = ToggleEvaluateContext.builder()
.addContextParam(ToggleStrategyContextParams.CTX_PARAM_RESOURCE_SCOPE, resourceScope);
} else {
log.info("TaskInstanceIdDynamicCondition : EmptyResourceScope!");
toggleEvaluateContext = ToggleEvaluateContext.EMPTY;
}
}
if (FeatureToggle.checkFeature(
FeatureIdConstants.DAO_ADD_TASK_INSTANCE_ID,
ToggleEvaluateContext.builder()
.addContextParam(ToggleStrategyContextParams.CTX_PARAM_RESOURCE_SCOPE,
JobContextUtil.getAppResourceScope()))) {

if (FeatureToggle.checkFeature(FeatureIdConstants.DAO_ADD_TASK_INSTANCE_ID, toggleEvaluateContext)) {
if (taskInstanceId == null || taskInstanceId <= 0L) {
log.info("TaskInstanceIdDynamicCondition : Invalid taskInstanceId {}", taskInstanceId);
log.info("TaskInstanceIdDynamicCondition : InvalidTaskInstanceId : {}", taskInstanceId);
// 为了不影响兼容性,忽略错误
return DSL.trueCondition();
} else {
// 为了便于观察和排查,暂时设定为 INFO 级别,等后续正式交付再改成 DEBUG
log.info("TaskInstanceIdDynamicCondition: UseTaskInstanceIdCondition");
return taskInstanceIdConditionBuilder.apply(taskInstanceId);
}
} else {
// 为了便于观察和排查,暂时设定为 INFO 级别,等后续正式交付再改成 DEBUG
log.info("TaskInstanceIdDynamicCondition: IgnoreTaskInstanceIdCondition");
return DSL.trueCondition();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,15 @@ public final void onEvent(Message<? extends JobMessage> message) {
}

private void beforeHandleMessage(Message<? extends JobMessage> message) {
log.info("beforeHandleMessage");
MessageHeaders headers = message.getHeaders();
String jobExecuteContextJson = (String) headers.get(JobExecuteContext.KEY);
if (StringUtils.isNotEmpty(jobExecuteContextJson)) {
log.info("setJobExecuteContextThreadLocalRepo");
JobExecuteContextThreadLocalRepo.set(JsonUtils.fromJson(jobExecuteContextJson,
JobExecuteContext.class));
}
}

private void afterHandle(Message<? extends JobMessage> message) {
log.info("afterHandleMessage");
JobExecuteContextThreadLocalRepo.unset();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

Expand All @@ -52,8 +53,8 @@ public class ArtifactoryLocalFilePrepareTask implements JobTaskContext {
private final String artifactoryRepo;
private final String jobStorageRootPath;
private final List<Future<Boolean>> futureList = new ArrayList<>();
private final ThreadPoolExecutor localFileDownloadExecutor;
private final ThreadPoolExecutor localFileWatchExecutor;
private final ExecutorService localFileDownloadExecutor;
private final ExecutorService localFileWatchExecutor;
public static Future<?> localFileWatchFuture = null;

public ArtifactoryLocalFilePrepareTask(
Expand All @@ -65,8 +66,8 @@ public ArtifactoryLocalFilePrepareTask(
String artifactoryProject,
String artifactoryRepo,
String jobStorageRootPath,
ThreadPoolExecutor localFileDownloadExecutor,
ThreadPoolExecutor localFileWatchExecutor
ExecutorService localFileDownloadExecutor,
ExecutorService localFileWatchExecutor
) {
this.stepInstance = stepInstance;
this.isForRetry = isForRetry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

@Slf4j
Expand All @@ -60,8 +61,8 @@ public class LocalFilePrepareService {
private final StepInstanceService stepInstanceService;
private final ArtifactoryClient artifactoryClient;
private final Map<String, ArtifactoryLocalFilePrepareTask> taskMap = new ConcurrentHashMap<>();
private final ThreadPoolExecutor localFileDownloadExecutor;
private final ThreadPoolExecutor localFileWatchExecutor;
private final ExecutorService localFileDownloadExecutor;
private final ExecutorService localFileWatchExecutor;

@Autowired
public LocalFilePrepareService(FileDistributeConfig fileDistributeConfig,
Expand All @@ -70,8 +71,8 @@ public LocalFilePrepareService(FileDistributeConfig fileDistributeConfig,
AgentService agentService,
StepInstanceService stepInstanceService,
@Qualifier("jobArtifactoryClient") ArtifactoryClient artifactoryClient,
@Qualifier("localFileDownloadExecutor") ThreadPoolExecutor localFileDownloadExecutor,
@Qualifier("localFileWatchExecutor") ThreadPoolExecutor localFileWatchExecutor) {
@Qualifier("localFileDownloadExecutor") ExecutorService localFileDownloadExecutor,
@Qualifier("localFileWatchExecutor") ExecutorService localFileWatchExecutor) {
this.fileDistributeConfig = fileDistributeConfig;
this.artifactoryConfig = artifactoryConfig;
this.localFileConfigForExecute = localFileConfigForExecute;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,9 @@ private static final class StopTask implements Runnable {
@Override
public void run() {
Span span = tracer1.nextSpan(task.getTraceContext()).name("stop-task");

try (Tracer.SpanInScope ignored = tracer1.withSpan(span.start())) {
JobExecuteContextThreadLocalRepo.set(task.getJobExecuteContext());
log.info("Begin to stop task, task: {}", task.getResultHandleTask());
task.getResultHandleTask().stop();
log.info("Stop task successfully, task: {}", task.getResultHandleTask());
Expand All @@ -406,6 +408,7 @@ public void run() {
if (span != null) {
span.end();
}
JobExecuteContextThreadLocalRepo.unset();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.tencent.bk.job.execute.engine.result.ha.ResultHandleLimiter;
import com.tencent.bk.job.execute.engine.result.ha.ResultHandleTaskKeepaliveManager;
import com.tencent.bk.job.execute.monitor.ExecuteMetricNames;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.sleuth.Span;
import org.springframework.cloud.sleuth.Tracer;
Expand Down Expand Up @@ -77,6 +78,7 @@ public class ScheduledContinuousResultHandleTask extends DelayedTask {
/**
* 作业执行上下文信息
*/
@Getter
private final JobExecuteContext jobExecuteContext;

/**
Expand Down
Loading

0 comments on commit f893700

Please sign in to comment.