diff --git a/xservice-quartz-admin/src/main/java/com/xiang/app/quartz/admin/domain/entity/JobConfigDO.java b/xservice-quartz-admin/src/main/java/com/xiang/app/quartz/admin/domain/entity/JobConfigDO.java index ccff15a..d269354 100644 --- a/xservice-quartz-admin/src/main/java/com/xiang/app/quartz/admin/domain/entity/JobConfigDO.java +++ b/xservice-quartz-admin/src/main/java/com/xiang/app/quartz/admin/domain/entity/JobConfigDO.java @@ -81,4 +81,9 @@ public class JobConfigDO implements Serializable { * 上次执行时间 */ private LocalDateTime lastRunningTime; + + /** + * 版本 + */ + private Integer version; } diff --git a/xservice-quartz-admin/src/main/java/com/xiang/app/quartz/admin/manage/IQuartzConfigManage.java b/xservice-quartz-admin/src/main/java/com/xiang/app/quartz/admin/manage/IQuartzConfigManage.java index d2a5c12..d08c39e 100644 --- a/xservice-quartz-admin/src/main/java/com/xiang/app/quartz/admin/manage/IQuartzConfigManage.java +++ b/xservice-quartz-admin/src/main/java/com/xiang/app/quartz/admin/manage/IQuartzConfigManage.java @@ -14,6 +14,7 @@ public interface IQuartzConfigManage extends IService { List selectByAppName(String name); + List selectByAppNameAndVersion(String name, Integer version); List loadEnabledJobs(); diff --git a/xservice-quartz-admin/src/main/java/com/xiang/app/quartz/admin/manage/QuartzConfigManageImpl.java b/xservice-quartz-admin/src/main/java/com/xiang/app/quartz/admin/manage/QuartzConfigManageImpl.java index e3489e5..bbd0d3c 100644 --- a/xservice-quartz-admin/src/main/java/com/xiang/app/quartz/admin/manage/QuartzConfigManageImpl.java +++ b/xservice-quartz-admin/src/main/java/com/xiang/app/quartz/admin/manage/QuartzConfigManageImpl.java @@ -25,6 +25,15 @@ public class QuartzConfigManageImpl extends ServiceImpl selectByAppNameAndVersion(String name, Integer version) { + LambdaQueryWrapper lqw = Wrappers.lambdaQuery(); + lqw.eq(JobConfigDO::getApplicationName, name); + lqw.eq(JobConfigDO::getDelFlag, 0); + lqw.gt(JobConfigDO::getVersion, version); + return baseMapper.selectList(lqw); + } + @Override public List loadEnabledJobs() { LambdaQueryWrapper lqw = Wrappers.lambdaQuery(); diff --git a/xservice-quartz-admin/src/main/java/com/xiang/app/quartz/admin/server/XxzJobFetchController.java b/xservice-quartz-admin/src/main/java/com/xiang/app/quartz/admin/server/XxzJobFetchController.java index b0ac82a..29d888d 100644 --- a/xservice-quartz-admin/src/main/java/com/xiang/app/quartz/admin/server/XxzJobFetchController.java +++ b/xservice-quartz-admin/src/main/java/com/xiang/app/quartz/admin/server/XxzJobFetchController.java @@ -26,4 +26,9 @@ public class XxzJobFetchController { List jobsByAppName = taskConfigService.getJobsByAppName(appName); return Result.success(jobsByAppName); } + + @GetMapping("/listByVersion") + public Result listByVersion(@RequestParam("appName") String appName, @RequestParam("currVersion") Integer currVersion) { + return Result.success(taskConfigService.getJobsByAppNameAndVersion(appName, currVersion)); + } } diff --git a/xservice-quartz-admin/src/main/java/com/xiang/app/quartz/admin/service/ITaskConfigService.java b/xservice-quartz-admin/src/main/java/com/xiang/app/quartz/admin/service/ITaskConfigService.java index 7c307b0..c90588c 100644 --- a/xservice-quartz-admin/src/main/java/com/xiang/app/quartz/admin/service/ITaskConfigService.java +++ b/xservice-quartz-admin/src/main/java/com/xiang/app/quartz/admin/service/ITaskConfigService.java @@ -25,5 +25,18 @@ public interface ITaskConfigService { */ boolean registerTasks(List request); + /** + * 根据应用获取任务 + * @param appName + * @return + */ List getJobsByAppName(String appName); + + /** + * 根据应用名称和版本获取任务 + * @param appName + * @param version + * @return + */ + List getJobsByAppNameAndVersion(String appName, Integer version); } diff --git a/xservice-quartz-admin/src/main/java/com/xiang/app/quartz/admin/service/impl/TaskConfigServiceImpl.java b/xservice-quartz-admin/src/main/java/com/xiang/app/quartz/admin/service/impl/TaskConfigServiceImpl.java index e7c2a6f..c55bf5e 100644 --- a/xservice-quartz-admin/src/main/java/com/xiang/app/quartz/admin/service/impl/TaskConfigServiceImpl.java +++ b/xservice-quartz-admin/src/main/java/com/xiang/app/quartz/admin/service/impl/TaskConfigServiceImpl.java @@ -72,6 +72,29 @@ public class TaskConfigServiceImpl implements ITaskConfigService { return quartzConfigManage.selectByAppName(appName); } + @Override + public List getJobsByAppNameAndVersion(String appName, Integer version) { + List jobConfigDOS = quartzConfigManage.selectByAppName(appName); + Map> map = Maps.newHashMap(); + if (CollectionUtils.isNotEmpty(jobConfigDOS)) { + map.putAll(jobConfigDOS.stream().collect(Collectors.groupingBy(JobConfigDO::getBeanName))); + } + Map result = Maps.newHashMap(); + map.forEach((k, v) -> { + v.forEach(item -> { + if (result.containsKey(k)) { + JobConfigDO jobConfigDO = result.get(k); + if (item.getVersion() > jobConfigDO.getVersion()) { + result.put(k, item); + } + } else { + result.put(k, item); + } + }); + }); + return result.values().stream().toList(); + } + private static void putDataIntoDo(TaskRegisterRequest item, JobConfigDO jobConfigDO, String applicationName) { jobConfigDO.setApplicationName(applicationName); jobConfigDO.setApplicationAddress(item.getApplicationAddress()); diff --git a/xservice-quartz-core/src/main/java/com/xiang/core/quartz/api/AdminJobClient.java b/xservice-quartz-core/src/main/java/com/xiang/core/quartz/api/AdminJobClient.java index 7a533ac..16add99 100644 --- a/xservice-quartz-core/src/main/java/com/xiang/core/quartz/api/AdminJobClient.java +++ b/xservice-quartz-core/src/main/java/com/xiang/core/quartz/api/AdminJobClient.java @@ -11,4 +11,5 @@ import java.util.List; public interface AdminJobClient { List fetchJobByAppName(String appName); + List fetchJobByAppNameAndVersion(String appName, Integer version); } diff --git a/xservice-quartz-core/src/main/java/com/xiang/core/quartz/api/HttpAdminJobClientImpl.java b/xservice-quartz-core/src/main/java/com/xiang/core/quartz/api/HttpAdminJobClientImpl.java index ece15c0..0222d4e 100644 --- a/xservice-quartz-core/src/main/java/com/xiang/core/quartz/api/HttpAdminJobClientImpl.java +++ b/xservice-quartz-core/src/main/java/com/xiang/core/quartz/api/HttpAdminJobClientImpl.java @@ -42,4 +42,25 @@ public class HttpAdminJobClientImpl implements AdminJobClient{ } return Lists.newArrayList(); } + + @Override + public List fetchJobByAppNameAndVersion(String appName, Integer version) { + String address = xxzJobProperties.getAdminAddress(); + address = address + "/open/quartz/fetch/listByVersion"; + Map params = Maps.newHashMap(); + params.put("appName", appName); + params.put("currVersion", String.valueOf(version)); + String resp = HttpHelper.doGet(address, null, params); + + if (StringUtils.isNotBlank(resp)) { + JSONObject jsonObject = JSON.parseObject(resp); + if (Objects.nonNull(jsonObject)) { + String data = JSON.toJSONString(jsonObject.get("data")); + if (StringUtils.isNotBlank(data)) { + return JSON.parseArray(data, JobConfigDO.class); + } + } + } + return Lists.newArrayList(); + } } diff --git a/xservice-quartz-core/src/main/java/com/xiang/core/quartz/config/XxzJobAutoConfiguration.java b/xservice-quartz-core/src/main/java/com/xiang/core/quartz/config/XxzJobAutoConfiguration.java index e54977f..642c65e 100644 --- a/xservice-quartz-core/src/main/java/com/xiang/core/quartz/config/XxzJobAutoConfiguration.java +++ b/xservice-quartz-core/src/main/java/com/xiang/core/quartz/config/XxzJobAutoConfiguration.java @@ -2,11 +2,15 @@ package com.xiang.core.quartz.config; import com.xiang.core.quartz.executor.JobExecutor; import com.xiang.core.quartz.model.XxzJobProperties; +import com.xiang.core.quartz.trigger.JobTrigger; +import com.xiang.core.quartz.trigger.TriggerRoute; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableScheduling; +import java.util.List; + /** * @Author: xiang * @Date: 2026-01-06 09:29 @@ -20,4 +24,9 @@ public class XxzJobAutoConfiguration { public JobExecutor jobExecutor() { return new JobExecutor(); } + + @Bean + public TriggerRoute triggerRoute(List triggers) { + return new TriggerRoute(triggers); + } } diff --git a/xservice-quartz-core/src/main/java/com/xiang/core/quartz/executor/JobExecutor.java b/xservice-quartz-core/src/main/java/com/xiang/core/quartz/executor/JobExecutor.java index 87841fc..1347e49 100644 --- a/xservice-quartz-core/src/main/java/com/xiang/core/quartz/executor/JobExecutor.java +++ b/xservice-quartz-core/src/main/java/com/xiang/core/quartz/executor/JobExecutor.java @@ -15,16 +15,15 @@ import org.springframework.stereotype.Component; public class JobExecutor { private static final Logger log = LoggerFactory.getLogger(JobExecutor.class); - /** * v1版本 本地反射调用 - * @param jobDefinition + * @param job */ - public void executor(JobDefinition jobDefinition) { + public void executor(JobDefinition job) { try { - jobDefinition.getMethod().invoke(jobDefinition.getBean()); + job.getMethod().invoke(job.getBean()); } catch (Exception e) { - log.error("xxz-job execute error, job={}", jobDefinition.getName(), e); + log.error("xxz-job execute error, job={}", job.getName(), e); } } } diff --git a/xservice-quartz-core/src/main/java/com/xiang/core/quartz/holder/JobDefinitionHolder.java b/xservice-quartz-core/src/main/java/com/xiang/core/quartz/holder/JobDefinitionHolder.java index dd57373..3303b0e 100644 --- a/xservice-quartz-core/src/main/java/com/xiang/core/quartz/holder/JobDefinitionHolder.java +++ b/xservice-quartz-core/src/main/java/com/xiang/core/quartz/holder/JobDefinitionHolder.java @@ -30,13 +30,6 @@ public class JobDefinitionHolder { public static JobDefinition getOne(String name) { return MAP.get(name); } - - public static void updateOne(JobDefinition jobDefinition) { - if (MAP.containsKey(jobDefinition.getName())) { - MAP.put(jobDefinition.getName(), jobDefinition); - } - } - public static void delOne(JobDefinition jobDefinition) { MAP.remove(jobDefinition.getName()); } diff --git a/xservice-quartz-core/src/main/java/com/xiang/core/quartz/schedule/JobConfigRefreshTask.java b/xservice-quartz-core/src/main/java/com/xiang/core/quartz/schedule/JobConfigRefreshTask.java new file mode 100644 index 0000000..fda250a --- /dev/null +++ b/xservice-quartz-core/src/main/java/com/xiang/core/quartz/schedule/JobConfigRefreshTask.java @@ -0,0 +1,38 @@ +package com.xiang.core.quartz.schedule; + +import com.xiang.core.quartz.api.AdminJobClient; +import com.xiang.core.quartz.holder.JobDefinitionHolder; +import com.xiang.core.quartz.model.JobConfigDO; +import com.xiang.core.quartz.model.JobDefinition; +import com.xiang.core.quartz.model.XxzJobProperties; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * @Author: xiang + * @Date: 2026-01-06 15:59 + */ +@Component +public class JobConfigRefreshTask { + + @Autowired + private AdminJobClient adminJobClient; + @Autowired + private XxzJobProperties xxzJobProperties; + @Autowired + private JobScheduler jobScheduler; + @Scheduled(fixedDelay = 5000) + public void refresh() { + List jobConfigDOS = adminJobClient.fetchJobByAppName(xxzJobProperties.getAppName()); + + for (JobConfigDO jobConfigDO : jobConfigDOS) { + JobDefinition jobDefinition = JobDefinitionHolder.getOne(jobConfigDO.getBeanName()); + jobScheduler.refresh(jobDefinition, jobConfigDO); + } + } + + +} diff --git a/xservice-quartz-core/src/main/java/com/xiang/core/quartz/schedule/JobScheduler.java b/xservice-quartz-core/src/main/java/com/xiang/core/quartz/schedule/JobScheduler.java index 334b9c9..d7bcdc2 100644 --- a/xservice-quartz-core/src/main/java/com/xiang/core/quartz/schedule/JobScheduler.java +++ b/xservice-quartz-core/src/main/java/com/xiang/core/quartz/schedule/JobScheduler.java @@ -1,12 +1,19 @@ package com.xiang.core.quartz.schedule; -import com.xiang.core.quartz.executor.JobExecutor; +import com.google.common.collect.Maps; +import com.xiang.core.quartz.model.JobConfigDO; import com.xiang.core.quartz.model.JobDefinition; +import com.xiang.core.quartz.trigger.TriggerRoute; import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.StringUtils; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.support.CronTrigger; import org.springframework.stereotype.Component; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ScheduledFuture; + /** * @Author: xiang * @Date: 2026-01-06 09:47 @@ -15,12 +22,49 @@ import org.springframework.stereotype.Component; @RequiredArgsConstructor public class JobScheduler { private final TaskScheduler taskScheduler; - private final JobExecutor jobExecutor; + private final TriggerRoute triggerRoute; + private final Map> scheduledTasks = Maps.newConcurrentMap(); - public void schedule(JobDefinition job) { - taskScheduler.schedule( - () -> jobExecutor.executor(job), + public ScheduledFuture schedule(JobDefinition job) { + return taskScheduler.schedule( + () -> triggerRoute.route(job).trigger(job), new CronTrigger(job.getCron()) ); } + + public synchronized void refresh(JobDefinition job, JobConfigDO cfg) { + String jobName = job.getName(); + + // 1️⃣ 未配置 or 未启用 → 停止 + if (Objects.isNull(cfg) || Objects.equals(cfg.getJobSwitch(), 0)) { + job.setCron(null); + cancel(jobName); + return; + } + + String newCron = cfg.getCron(); + String oldCron = job.getCron(); + + // 2️⃣ cron 未变 & 已存在 → 不动 + if (scheduledTasks.containsKey(jobName) && StringUtils.equals(newCron, oldCron)) { + return; + } + + // 3️⃣ 先取消旧任务 + cancel(jobName); + // 4️⃣ 创建新任务 + job.setCron(newCron); + ScheduledFuture future = this.schedule(job); + scheduledTasks.put(jobName, future); + + } + + private void cancel(String jobName) { + ScheduledFuture future = scheduledTasks.remove(jobName); + if (future != null) { + // false 不中断当前任务,等待任务完成 + future.cancel(true); + + } + } } diff --git a/xservice-quartz-core/src/main/java/com/xiang/core/quartz/trigger/DistributeJobTrigger.java b/xservice-quartz-core/src/main/java/com/xiang/core/quartz/trigger/DistributeJobTrigger.java new file mode 100644 index 0000000..ac1b17b --- /dev/null +++ b/xservice-quartz-core/src/main/java/com/xiang/core/quartz/trigger/DistributeJobTrigger.java @@ -0,0 +1,61 @@ +package com.xiang.core.quartz.trigger; + +import com.xiang.core.quartz.executor.JobExecutor; +import com.xiang.core.quartz.model.JobDefinition; +import com.xiang.core.quartz.model.XxzJobProperties; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.redisson.api.RLock; +import org.redisson.api.RedissonClient; +import org.springframework.stereotype.Component; + +/** + * @Author: xiang + * @Date: 2026-01-07 09:33 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class DistributeJobTrigger implements JobTrigger { + + private final RedissonClient redissonClient; + private final JobExecutor jobExecutor; + private final XxzJobProperties xxzJobProperties; + + @Override + public void trigger(JobDefinition jobDefinition) { + if (!jobDefinition.isEnabled() || StringUtils.isEmpty(jobDefinition.getCron())) { + log.info("任务:{}已关闭,无需执行!", jobDefinition.getName()); + return; + } + // 校验是否支持分布式,这里暂时忽略,先支持单机进行 + String key = "xxzJob:" + xxzJobProperties.getAppName() + jobDefinition.getName(); + RLock lock = redissonClient.getLock(key); + boolean flag = lock.tryLock(); + try { + if (flag) { + jobExecutor.executor(jobDefinition); + } else { + log.info("其他实例正在执行任务!"); + } + } catch (Exception e) { + log.info("xxz-Job:{}获取锁失败", jobDefinition.getName()); + } finally { + // 释放锁 + if (flag && lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } + } + + @Override + public void cancel(String jobName) { + + } + + @Override + public TriggerType type() { + return TriggerType.DISTRIBUTE; + } +} diff --git a/xservice-quartz-core/src/main/java/com/xiang/core/quartz/trigger/JobTrigger.java b/xservice-quartz-core/src/main/java/com/xiang/core/quartz/trigger/JobTrigger.java new file mode 100644 index 0000000..d789da1 --- /dev/null +++ b/xservice-quartz-core/src/main/java/com/xiang/core/quartz/trigger/JobTrigger.java @@ -0,0 +1,16 @@ +package com.xiang.core.quartz.trigger; + +import com.xiang.core.quartz.model.JobDefinition; + +/** + * @Author: xiang + * @Date: 2026-01-07 09:27 + */ +public interface JobTrigger { + + void trigger(JobDefinition jobDefinition); + + void cancel(String jobName); + + TriggerType type(); +} diff --git a/xservice-quartz-core/src/main/java/com/xiang/core/quartz/trigger/LocalJobTrigger.java b/xservice-quartz-core/src/main/java/com/xiang/core/quartz/trigger/LocalJobTrigger.java new file mode 100644 index 0000000..3cfc868 --- /dev/null +++ b/xservice-quartz-core/src/main/java/com/xiang/core/quartz/trigger/LocalJobTrigger.java @@ -0,0 +1,57 @@ +package com.xiang.core.quartz.trigger; + +import com.xiang.core.quartz.executor.JobExecutor; +import com.xiang.core.quartz.model.JobDefinition; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * @Author: xiang + * @Date: 2026-01-07 09:28 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class LocalJobTrigger implements JobTrigger { + + private final JobExecutor jobExecutor; + private final ConcurrentHashMap running = new ConcurrentHashMap<>(); + @Override + public void trigger(JobDefinition job) { + if (!job.isEnabled()) { + log.info("job {} disabled, skip execution", job.getName()); + return; + } + + AtomicBoolean flag = running.computeIfAbsent( + job.getName(), k -> new AtomicBoolean(false) + ); + + if (!flag.compareAndSet(false, true)) { + log.warn("job {} is already running, skip", job.getName()); + return; + } + + try { + jobExecutor.executor(job); + } catch (Exception e) { + log.error("xxz-job execute error, job={}", job.getName(), e); + } finally { + flag.set(false); + } + } + + @Override + public void cancel(String jobName) { + running.remove(jobName); + } + + @Override + public TriggerType type() { + return TriggerType.LOCAL; + } +} diff --git a/xservice-quartz-core/src/main/java/com/xiang/core/quartz/trigger/TriggerRoute.java b/xservice-quartz-core/src/main/java/com/xiang/core/quartz/trigger/TriggerRoute.java new file mode 100644 index 0000000..7d6e3c5 --- /dev/null +++ b/xservice-quartz-core/src/main/java/com/xiang/core/quartz/trigger/TriggerRoute.java @@ -0,0 +1,29 @@ +package com.xiang.core.quartz.trigger; + +import com.xiang.core.quartz.model.JobDefinition; + +import java.util.EnumMap; +import java.util.List; +import java.util.Map; + +/** + * @Author: xiang + * @Date: 2026-01-07 09:48 + */ +public class TriggerRoute { + + private final Map triggerMap; + + public TriggerRoute(List triggers) { + triggerMap = new EnumMap<>(TriggerType.class); + for (JobTrigger trigger : triggers) { + triggerMap.put(trigger.type(), trigger); + } + } + + public JobTrigger route(JobDefinition job) { + return job.isDistributed() + ? triggerMap.get(TriggerType.DISTRIBUTE) + : triggerMap.get(TriggerType.LOCAL); + } +} diff --git a/xservice-quartz-core/src/main/java/com/xiang/core/quartz/trigger/TriggerType.java b/xservice-quartz-core/src/main/java/com/xiang/core/quartz/trigger/TriggerType.java new file mode 100644 index 0000000..d7a3251 --- /dev/null +++ b/xservice-quartz-core/src/main/java/com/xiang/core/quartz/trigger/TriggerType.java @@ -0,0 +1,19 @@ +package com.xiang.core.quartz.trigger; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +/** + * @Author: xiang + * @Date: 2026-01-07 09:46 + */ +@Getter +@RequiredArgsConstructor +public enum TriggerType { + + LOCAL("单机"), + DISTRIBUTE("分布式"), + + ; + private final String desc; +}