feat:动态调整cron和开关

This commit is contained in:
Xiang
2026-01-09 16:43:04 +08:00
parent 51f0803eab
commit afa610ecaa
18 changed files with 360 additions and 17 deletions

View File

@@ -11,4 +11,5 @@ import java.util.List;
public interface AdminJobClient {
List<JobConfigDO> fetchJobByAppName(String appName);
List<JobConfigDO> fetchJobByAppNameAndVersion(String appName, Integer version);
}

View File

@@ -42,4 +42,25 @@ public class HttpAdminJobClientImpl implements AdminJobClient{
}
return Lists.newArrayList();
}
@Override
public List<JobConfigDO> fetchJobByAppNameAndVersion(String appName, Integer version) {
String address = xxzJobProperties.getAdminAddress();
address = address + "/open/quartz/fetch/listByVersion";
Map<String, String> params = Maps.newHashMap();
params.put("appName", appName);
params.put("currVersion", String.valueOf(version));
String resp = HttpHelper.doGet(address, null, params);
if (StringUtils.isNotBlank(resp)) {
JSONObject jsonObject = JSON.parseObject(resp);
if (Objects.nonNull(jsonObject)) {
String data = JSON.toJSONString(jsonObject.get("data"));
if (StringUtils.isNotBlank(data)) {
return JSON.parseArray(data, JobConfigDO.class);
}
}
}
return Lists.newArrayList();
}
}

View File

@@ -2,11 +2,15 @@ package com.xiang.core.quartz.config;
import com.xiang.core.quartz.executor.JobExecutor;
import com.xiang.core.quartz.model.XxzJobProperties;
import com.xiang.core.quartz.trigger.JobTrigger;
import com.xiang.core.quartz.trigger.TriggerRoute;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import java.util.List;
/**
* @Author: xiang
* @Date: 2026-01-06 09:29
@@ -20,4 +24,9 @@ public class XxzJobAutoConfiguration {
public JobExecutor jobExecutor() {
return new JobExecutor();
}
@Bean
public TriggerRoute triggerRoute(List<JobTrigger> triggers) {
return new TriggerRoute(triggers);
}
}

View File

@@ -15,16 +15,15 @@ import org.springframework.stereotype.Component;
public class JobExecutor {
private static final Logger log = LoggerFactory.getLogger(JobExecutor.class);
/**
* v1版本 本地反射调用
* @param jobDefinition
* @param job
*/
public void executor(JobDefinition jobDefinition) {
public void executor(JobDefinition job) {
try {
jobDefinition.getMethod().invoke(jobDefinition.getBean());
job.getMethod().invoke(job.getBean());
} catch (Exception e) {
log.error("xxz-job execute error, job={}", jobDefinition.getName(), e);
log.error("xxz-job execute error, job={}", job.getName(), e);
}
}
}

View File

@@ -30,13 +30,6 @@ public class JobDefinitionHolder {
public static JobDefinition getOne(String name) {
return MAP.get(name);
}
public static void updateOne(JobDefinition jobDefinition) {
if (MAP.containsKey(jobDefinition.getName())) {
MAP.put(jobDefinition.getName(), jobDefinition);
}
}
public static void delOne(JobDefinition jobDefinition) {
MAP.remove(jobDefinition.getName());
}

View File

@@ -0,0 +1,38 @@
package com.xiang.core.quartz.schedule;
import com.xiang.core.quartz.api.AdminJobClient;
import com.xiang.core.quartz.holder.JobDefinitionHolder;
import com.xiang.core.quartz.model.JobConfigDO;
import com.xiang.core.quartz.model.JobDefinition;
import com.xiang.core.quartz.model.XxzJobProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @Author: xiang
* @Date: 2026-01-06 15:59
*/
@Component
public class JobConfigRefreshTask {
@Autowired
private AdminJobClient adminJobClient;
@Autowired
private XxzJobProperties xxzJobProperties;
@Autowired
private JobScheduler jobScheduler;
@Scheduled(fixedDelay = 5000)
public void refresh() {
List<JobConfigDO> jobConfigDOS = adminJobClient.fetchJobByAppName(xxzJobProperties.getAppName());
for (JobConfigDO jobConfigDO : jobConfigDOS) {
JobDefinition jobDefinition = JobDefinitionHolder.getOne(jobConfigDO.getBeanName());
jobScheduler.refresh(jobDefinition, jobConfigDO);
}
}
}

View File

@@ -1,12 +1,19 @@
package com.xiang.core.quartz.schedule;
import com.xiang.core.quartz.executor.JobExecutor;
import com.google.common.collect.Maps;
import com.xiang.core.quartz.model.JobConfigDO;
import com.xiang.core.quartz.model.JobDefinition;
import com.xiang.core.quartz.trigger.TriggerRoute;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ScheduledFuture;
/**
* @Author: xiang
* @Date: 2026-01-06 09:47
@@ -15,12 +22,49 @@ import org.springframework.stereotype.Component;
@RequiredArgsConstructor
public class JobScheduler {
private final TaskScheduler taskScheduler;
private final JobExecutor jobExecutor;
private final TriggerRoute triggerRoute;
private final Map<String, ScheduledFuture<?>> scheduledTasks = Maps.newConcurrentMap();
public void schedule(JobDefinition job) {
taskScheduler.schedule(
() -> jobExecutor.executor(job),
public ScheduledFuture<?> schedule(JobDefinition job) {
return taskScheduler.schedule(
() -> triggerRoute.route(job).trigger(job),
new CronTrigger(job.getCron())
);
}
public synchronized void refresh(JobDefinition job, JobConfigDO cfg) {
String jobName = job.getName();
// 1⃣ 未配置 or 未启用 → 停止
if (Objects.isNull(cfg) || Objects.equals(cfg.getJobSwitch(), 0)) {
job.setCron(null);
cancel(jobName);
return;
}
String newCron = cfg.getCron();
String oldCron = job.getCron();
// 2⃣ cron 未变 & 已存在 → 不动
if (scheduledTasks.containsKey(jobName) && StringUtils.equals(newCron, oldCron)) {
return;
}
// 3⃣ 先取消旧任务
cancel(jobName);
// 4⃣ 创建新任务
job.setCron(newCron);
ScheduledFuture<?> future = this.schedule(job);
scheduledTasks.put(jobName, future);
}
private void cancel(String jobName) {
ScheduledFuture<?> future = scheduledTasks.remove(jobName);
if (future != null) {
// false 不中断当前任务,等待任务完成
future.cancel(true);
}
}
}

View File

@@ -0,0 +1,61 @@
package com.xiang.core.quartz.trigger;
import com.xiang.core.quartz.executor.JobExecutor;
import com.xiang.core.quartz.model.JobDefinition;
import com.xiang.core.quartz.model.XxzJobProperties;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;
/**
* @Author: xiang
* @Date: 2026-01-07 09:33
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class DistributeJobTrigger implements JobTrigger {
private final RedissonClient redissonClient;
private final JobExecutor jobExecutor;
private final XxzJobProperties xxzJobProperties;
@Override
public void trigger(JobDefinition jobDefinition) {
if (!jobDefinition.isEnabled() || StringUtils.isEmpty(jobDefinition.getCron())) {
log.info("任务:{}已关闭,无需执行!", jobDefinition.getName());
return;
}
// 校验是否支持分布式,这里暂时忽略,先支持单机进行
String key = "xxzJob:" + xxzJobProperties.getAppName() + jobDefinition.getName();
RLock lock = redissonClient.getLock(key);
boolean flag = lock.tryLock();
try {
if (flag) {
jobExecutor.executor(jobDefinition);
} else {
log.info("其他实例正在执行任务!");
}
} catch (Exception e) {
log.info("xxz-Job{}获取锁失败", jobDefinition.getName());
} finally {
// 释放锁
if (flag && lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
@Override
public void cancel(String jobName) {
}
@Override
public TriggerType type() {
return TriggerType.DISTRIBUTE;
}
}

View File

@@ -0,0 +1,16 @@
package com.xiang.core.quartz.trigger;
import com.xiang.core.quartz.model.JobDefinition;
/**
* @Author: xiang
* @Date: 2026-01-07 09:27
*/
public interface JobTrigger {
void trigger(JobDefinition jobDefinition);
void cancel(String jobName);
TriggerType type();
}

View File

@@ -0,0 +1,57 @@
package com.xiang.core.quartz.trigger;
import com.xiang.core.quartz.executor.JobExecutor;
import com.xiang.core.quartz.model.JobDefinition;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @Author: xiang
* @Date: 2026-01-07 09:28
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class LocalJobTrigger implements JobTrigger {
private final JobExecutor jobExecutor;
private final ConcurrentHashMap<String, AtomicBoolean> running = new ConcurrentHashMap<>();
@Override
public void trigger(JobDefinition job) {
if (!job.isEnabled()) {
log.info("job {} disabled, skip execution", job.getName());
return;
}
AtomicBoolean flag = running.computeIfAbsent(
job.getName(), k -> new AtomicBoolean(false)
);
if (!flag.compareAndSet(false, true)) {
log.warn("job {} is already running, skip", job.getName());
return;
}
try {
jobExecutor.executor(job);
} catch (Exception e) {
log.error("xxz-job execute error, job={}", job.getName(), e);
} finally {
flag.set(false);
}
}
@Override
public void cancel(String jobName) {
running.remove(jobName);
}
@Override
public TriggerType type() {
return TriggerType.LOCAL;
}
}

View File

@@ -0,0 +1,29 @@
package com.xiang.core.quartz.trigger;
import com.xiang.core.quartz.model.JobDefinition;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
/**
* @Author: xiang
* @Date: 2026-01-07 09:48
*/
public class TriggerRoute {
private final Map<TriggerType, JobTrigger> triggerMap;
public TriggerRoute(List<JobTrigger> triggers) {
triggerMap = new EnumMap<>(TriggerType.class);
for (JobTrigger trigger : triggers) {
triggerMap.put(trigger.type(), trigger);
}
}
public JobTrigger route(JobDefinition job) {
return job.isDistributed()
? triggerMap.get(TriggerType.DISTRIBUTE)
: triggerMap.get(TriggerType.LOCAL);
}
}

View File

@@ -0,0 +1,19 @@
package com.xiang.core.quartz.trigger;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
/**
* @Author: xiang
* @Date: 2026-01-07 09:46
*/
@Getter
@RequiredArgsConstructor
public enum TriggerType {
LOCAL("单机"),
DISTRIBUTE("分布式"),
;
private final String desc;
}