perf:拉取数据+重试

This commit is contained in:
xiang
2025-07-29 21:48:31 +08:00
parent 7408d05eaf
commit 39af53d1de
5 changed files with 57 additions and 240 deletions

View File

@@ -47,6 +47,11 @@
<artifactId>xservice-cache</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.xiang</groupId>
<artifactId>xservice-http-starter</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@@ -1,34 +1,18 @@
package com.xiang.xservice.fwd.schedule;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.xiang.xservice.basic.common.req.BaseRequest;
import com.xiang.xservice.basic.config.MyThreadFactory;
import com.xiang.xservice.basic.utils.DateUtils;
import com.xiang.xservice.fwd.entity.pojo.FAudienceConfig;
import com.xiang.xservice.fwd.entity.pojo.FPerformConfig;
import com.xiang.xservice.fwd.entity.pojo.FPerformProjectInfo;
import com.xiang.xservice.fwd.entity.pojo.FPerformSeatInfo;
import com.xiang.xservice.fwd.entity.pojo.FUserConfig;
import com.xiang.xservice.fwd.entity.pojo.*;
import com.xiang.xservice.fwd.entity.resp.http.audience.AudienceMemberInfo;
import com.xiang.xservice.fwd.entity.resp.http.perform.Perform;
import com.xiang.xservice.fwd.entity.resp.http.perform.PerformDetail;
import com.xiang.xservice.fwd.entity.resp.http.perform.PerformInfo;
import com.xiang.xservice.fwd.entity.resp.http.perform.ProjectInfoResp;
import com.xiang.xservice.fwd.entity.resp.http.perform.ProjectList;
import com.xiang.xservice.fwd.entity.resp.http.perform.ProjectsResp;
import com.xiang.xservice.fwd.entity.resp.http.perform.SeatPlan;
import com.xiang.xservice.fwd.entity.resp.http.perform.SeatPlanStatus;
import com.xiang.xservice.fwd.mapper.FwdAudienceConfigMapper;
import com.xiang.xservice.fwd.mapper.FwdPerformConfigMapper;
import com.xiang.xservice.fwd.mapper.FwdPerformProjectInfoMapper;
import com.xiang.xservice.fwd.mapper.FwdPerformSeatInfoMapper;
import com.xiang.xservice.fwd.mapper.FwdUserConfigMapper;
import com.xiang.xservice.fwd.entity.resp.http.perform.*;
import com.xiang.xservice.fwd.mapper.*;
import com.xiang.xservice.fwd.service.IPerformServiceHttp;
import com.xiang.xservice.http.helper.HttpRequestHelper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@@ -39,7 +23,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -65,6 +48,7 @@ public class PullDataFromFWDJob {
private final FwdPerformSeatInfoMapper performSeatInfoMapper;
private final FwdUserConfigMapper userConfigMapper;
private final FwdAudienceConfigMapper audienceConfigMapper;
private final HttpRequestHelper httpRequestHelper;
private final ExecutorService es =
new ThreadPoolExecutor(
@@ -76,168 +60,6 @@ public class PullDataFromFWDJob {
new MyThreadFactory("fwd-pull-data-http", Boolean.TRUE),
new ThreadPoolExecutor.AbortPolicy());
/**
* 定时任务 每日1点爬取芬玩岛数据演出列表数据
*/
// @Scheduled(cron = "0 0 1 1/1 * ?")
@PostMapping("/pullProjectsDataJob")
public void pullProjectsDataJob() {
int i = 0;
while (true) {
i++;
BaseRequest request = new BaseRequest();
request.setCurrent(i);
request.setPageSize(50);
try {
ProjectsResp showProjectsFromHttp = performServiceHttp.getShowProjectsFromHttp(request);
if (Objects.isNull(showProjectsFromHttp)) {
return;
}
if (CollectionUtils.isEmpty(showProjectsFromHttp.getList())) {
break;
}
List<ProjectList> projectList = showProjectsFromHttp.getList();
List<CompletableFuture> futureList = Lists.newArrayList();
for (ProjectList project : projectList) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
ProjectInfoResp projectInfoFromHttp = performServiceHttp.getProjectInfoFromHttp(project.getProjectId());
Perform performsByProjectIdFromHttp = performServiceHttp.getPerformsByProjectIdFromHttp(project.getProjectId());
if (Objects.nonNull(projectInfoFromHttp) && Objects.nonNull(performsByProjectIdFromHttp)) {
FPerformProjectInfo projectInfo = performProjectInfoMapper.getProjectByProjectId(project.getProjectId());
if (Objects.nonNull(projectInfo)) {
// 更新projectInfo的信息
buildUpdateProjectInfo(project, projectInfo, projectInfoFromHttp);
performProjectInfoMapper.update(projectInfo);
List<FPerformSeatInfo> fPerformSeatInfoInsertList = Lists.newArrayList();
if (CollectionUtils.isNotEmpty(performsByProjectIdFromHttp.getPerformInfos())) {
for (PerformInfo performInfo : performsByProjectIdFromHttp.getPerformInfos()) {
if (CollectionUtils.isNotEmpty(performInfo.getPerformInfo())) {
for (PerformDetail performDetail : performInfo.getPerformInfo()) {
if (CollectionUtils.isNotEmpty(performDetail.getSeatPlans())) {
List<Long> seatPlanIds = performDetail.getSeatPlans().stream().map(SeatPlan::getSeatPlanId).toList();
Map<Long, Boolean> seatMap = getSeatMap(seatPlanIds);
if (MapUtils.isEmpty(seatMap)) continue;
for (SeatPlan seatPlan : performDetail.getSeatPlans()) {
if (!seatMap.containsKey(seatPlan.getSeatPlanId())) {
continue;
}
FPerformSeatInfo fPerformSeatInfo = new FPerformSeatInfo();
fPerformSeatInfo.setSeatPlanId(seatPlan.getSeatPlanId());
fPerformSeatInfo.setSeatPlanName(seatPlan.getSeatPlanName());
fPerformSeatInfo.setPerformId(seatPlan.getPerformId());
fPerformSeatInfo.setPerformName(seatPlan.getPerformName());
fPerformSeatInfo.setStopSale(seatPlan.getStopSale());
fPerformSeatInfo.setShelfStatus(seatPlan.getShelfStatus());
fPerformSeatInfo.setPrice(seatPlan.getPrice());
fPerformSeatInfo.setDiscountPrice(seatPlan.getDiscountPrice());
fPerformSeatInfo.setSubStatus(seatPlan.getSubStatus());
fPerformSeatInfo.setQuantity(seatPlan.getQuantity());
fPerformSeatInfo.setStatus(seatPlan.getStatus());
fPerformSeatInfo.setMaxSellStock(seatPlan.getMaxSellStock());
fPerformSeatInfo.setSoldStock(seatPlan.getSoldStock());
fPerformSeatInfo.setLeftStock(seatPlan.getLeftStock());
fPerformSeatInfo.setAbleSaleQuantity(seatPlan.getAbleSaleQuantity());
fPerformSeatInfo.setAshShow(seatPlan.getAshShow());
fPerformSeatInfo.setAshShowDesc(seatPlan.getAshShowDesc());
fPerformSeatInfo.setSelectable(seatPlan.getSelectable());
fPerformSeatInfo.setDisplay(seatPlan.getDisplay());
fPerformSeatInfo.setAvailableTicketQuantity(seatPlan.getAvailableTicketQuantity());
fPerformSeatInfo.setAvailableAllTicketQuantity(seatPlan.getAvailableAllTicketQuantity());
if (StringUtils.isNotBlank(seatPlan.getSaleTime())) {
fPerformSeatInfo.setSaleTime(DateUtils.getTimeFromStr(seatPlan.getSaleTime()));
}
fPerformSeatInfo.setProjectId(project.getProjectId());
fPerformSeatInfo.setSoldOut(seatMap.get(seatPlan.getSeatPlanId()) ? 1 : 0);
FPerformSeatInfo seatInfo = performSeatInfoMapper.getPerformSeatInfoBySeatIdAndPerformIdAndProjectId(fPerformSeatInfo.getSeatPlanId(), fPerformSeatInfo.getPerformId(), project.getProjectId());
if (Objects.nonNull(seatInfo)) {
fPerformSeatInfo.setId(seatInfo.getId());
performSeatInfoMapper.update(fPerformSeatInfo);
} else {
fPerformSeatInfoInsertList.add(fPerformSeatInfo);
}
}
}
}
}
}
}
if (CollectionUtils.isNotEmpty(fPerformSeatInfoInsertList)) {
performSeatInfoMapper.batchSave(fPerformSeatInfoInsertList);
}
} else {
// 新增projectInfo的信息
saveProjectInfo(project, projectInfoFromHttp);
List<FPerformSeatInfo> fPerformSeatInfos = Lists.newArrayList();
if (CollectionUtils.isNotEmpty(performsByProjectIdFromHttp.getPerformInfos())) {
for (PerformInfo performInfo : performsByProjectIdFromHttp.getPerformInfos()) {
if (CollectionUtils.isNotEmpty(performInfo.getPerformInfo())) {
for (PerformDetail performDetail : performInfo.getPerformInfo()) {
if (CollectionUtils.isNotEmpty(performDetail.getSeatPlans())) {
List<Long> seatPlanIds = performDetail.getSeatPlans().stream().map(SeatPlan::getSeatPlanId).toList();
Map<Long, Boolean> seatMap = getSeatMap(seatPlanIds);
if (MapUtils.isEmpty(seatMap)) continue;
for (SeatPlan seatPlan : performDetail.getSeatPlans()) {
if (!seatMap.containsKey(seatPlan.getSeatPlanId())) {
continue;
}
FPerformSeatInfo fPerformSeatInfo = new FPerformSeatInfo();
fPerformSeatInfo.setSeatPlanId(seatPlan.getSeatPlanId());
fPerformSeatInfo.setSeatPlanName(seatPlan.getSeatPlanName());
fPerformSeatInfo.setPerformId(seatPlan.getPerformId());
fPerformSeatInfo.setPerformName(seatPlan.getPerformName());
fPerformSeatInfo.setStopSale(seatPlan.getStopSale());
fPerformSeatInfo.setShelfStatus(seatPlan.getShelfStatus());
fPerformSeatInfo.setPrice(seatPlan.getPrice());
fPerformSeatInfo.setDiscountPrice(seatPlan.getDiscountPrice());
fPerformSeatInfo.setSubStatus(seatPlan.getSubStatus());
fPerformSeatInfo.setQuantity(seatPlan.getQuantity());
fPerformSeatInfo.setStatus(seatPlan.getStatus());
fPerformSeatInfo.setMaxSellStock(seatPlan.getMaxSellStock());
fPerformSeatInfo.setSoldStock(seatPlan.getSoldStock());
fPerformSeatInfo.setLeftStock(seatPlan.getLeftStock());
fPerformSeatInfo.setAbleSaleQuantity(seatPlan.getAbleSaleQuantity());
fPerformSeatInfo.setAshShow(seatPlan.getAshShow());
fPerformSeatInfo.setAshShowDesc(seatPlan.getAshShowDesc());
fPerformSeatInfo.setSelectable(seatPlan.getSelectable());
fPerformSeatInfo.setDisplay(seatPlan.getDisplay());
fPerformSeatInfo.setAvailableTicketQuantity(seatPlan.getAvailableTicketQuantity());
fPerformSeatInfo.setAvailableAllTicketQuantity(seatPlan.getAvailableAllTicketQuantity());
if (StringUtils.isNotBlank(seatPlan.getSaleTime())) {
fPerformSeatInfo.setSaleTime(DateUtils.getTimeFromStr(seatPlan.getSaleTime()));
}
fPerformSeatInfo.setProjectId(project.getProjectId());
fPerformSeatInfo.setSoldOut(seatMap.get(seatPlan.getSeatPlanId()) ? 1 : 0);
}
}
}
}
}
}
if (CollectionUtils.isNotEmpty(fPerformSeatInfos)) {
performSeatInfoMapper.batchSave(fPerformSeatInfos);
}
}
}
}, es);
futureList.add(future);
}
CompletableFuture[] futureArr = futureList.toArray(futureList.toArray(new CompletableFuture[0]));
CompletableFuture.allOf(futureArr).join();
Thread.sleep(1000);
} catch (Exception e) {
log.error("拉取数据失败", e);
} finally {
System.gc();
}
}
}
private void saveProjectInfo(ProjectList project, ProjectInfoResp projectInfoFromHttp) {
FPerformProjectInfo fPerformProjectInfo = new FPerformProjectInfo();
fPerformProjectInfo.setNameDisplay(projectInfoFromHttp.getNameDisplay());
@@ -277,7 +99,7 @@ public class PullDataFromFWDJob {
return;
}
for (FPerformConfig performConfig : availablePerform) {
Perform performs = performServiceHttp.getPerformsByProjectIdFromHttp(performConfig.getProjectId());
Perform performs = httpRequestHelper.fetchWithRetry(() -> performServiceHttp.getPerformsByProjectIdFromHttp(performConfig.getProjectId()), "seat-perform-info");
if (Objects.isNull(performs)) {
continue;
}
@@ -295,7 +117,7 @@ public class PullDataFromFWDJob {
continue;
}
Map<Long, SeatPlan> map = seatPlans.stream().collect(Collectors.toMap(SeatPlan::getSeatPlanId, Function.identity(), (a, b) -> a));
List<SeatPlanStatus> planStatusFromHttp = performServiceHttp.getSeatPlanStatusFromHttp(new ArrayList<>(map.keySet()));
List<SeatPlanStatus> planStatusFromHttp = httpRequestHelper.fetchWithRetry(() -> performServiceHttp.getSeatPlanStatusFromHttp(new ArrayList<>(map.keySet())), "seat-seat-status");
List<FPerformSeatInfo> seatInfoAddList = Lists.newArrayList();
for (SeatPlanStatus seatPlanStatus : planStatusFromHttp) {
if (!seatPlanStatus.getSoldOutFlag() && seatPlanStatus.getStandbyStatus() == 10) {
@@ -383,7 +205,7 @@ public class PullDataFromFWDJob {
baseRequest.setCurrent(pageNum);
baseRequest.setPageSize(50);
// 查询所有的演出
ProjectsResp projectsResp = performServiceHttp.getShowProjectsFromHttp(baseRequest);
ProjectsResp projectsResp = httpRequestHelper.fetchWithRetry(() -> performServiceHttp.getShowProjectsFromHttp(baseRequest), "fetch-projects");
if (Objects.isNull(projectsResp)) {
break;
}
@@ -392,14 +214,13 @@ public class PullDataFromFWDJob {
break;
}
for (ProjectList projectList : projectLists) {
List<SeatPlan> seatPlans = Lists.newArrayList();
// 查询演出的详情
ProjectInfoResp projectInfoFromHttp = performServiceHttp.getProjectInfoFromHttp(projectList.getProjectId());
ProjectInfoResp projectInfoFromHttp = httpRequestHelper.fetchWithRetry(() -> performServiceHttp.getProjectInfoFromHttp(projectList.getProjectId()), "fetch-project-info");
if (Objects.isNull(projectInfoFromHttp)) {
continue;
}
// 查询演出信息
Perform performsByProjectIdFromHttp = performServiceHttp.getPerformsByProjectIdFromHttp(projectList.getProjectId());
Perform performsByProjectIdFromHttp = httpRequestHelper.fetchWithRetry(() -> performServiceHttp.getPerformsByProjectIdFromHttp(projectList.getProjectId()), "fetch-perform-by-project-id");
if (Objects.isNull(performsByProjectIdFromHttp)) {
continue;
}
@@ -421,47 +242,42 @@ public class PullDataFromFWDJob {
continue;
}
// 查询演出座位档次信息
List<FPerformSeatInfo> insertList = Lists.newCopyOnWriteArrayList();
List<FPerformSeatInfo> updateList = Lists.newCopyOnWriteArrayList();
for (PerformInfo performInfo : performInfos) {
List<PerformDetail> performs = performInfo.getPerformInfo();
if (CollectionUtils.isEmpty(performs)) {
continue;
}
for (PerformDetail perform : performs) {
seatPlans.addAll(perform.getSeatPlans());
if (CollectionUtils.isEmpty(perform.getSeatPlans())) {
continue;
}
List<Long> seatPlanIds = perform.getSeatPlans().stream().map(SeatPlan::getSeatPlanId).collect(Collectors.toList());
List<SeatPlanStatus> seatPlanStatusList = httpRequestHelper.fetchWithRetry(() -> performServiceHttp.getSeatPlanStatusFromHttp(seatPlanIds), "fetch-seat-plan-status");
if (CollectionUtils.isEmpty(seatPlanStatusList)) {
continue;
}
Map<Long, SeatPlanStatus> map = seatPlanStatusList.stream().collect(Collectors.toMap(SeatPlanStatus::getSeatPlanId, Function.identity(), (a, b) -> a));
Map<Long, FPerformSeatInfo> existSeatMap = performSeatInfoMapper.getPerformSeatInfoBySeatIds(seatPlanIds)
.stream()
.collect(Collectors.toMap(FPerformSeatInfo::getSeatPlanId, Function.identity(), (a, b) -> a));
for (SeatPlan seatPlan : perform.getSeatPlans()) {
if (map.containsKey(seatPlan.getSeatPlanId())) {
SeatPlanStatus value = map.get(seatPlan.getSeatPlanId());
if (existSeatMap.containsKey(seatPlan.getSeatPlanId())) {
// 更新
FPerformSeatInfo performSeatInfo = existSeatMap.get(seatPlan.getSeatPlanId());
performSeatInfo.setSoldStock(value.getSoldOutFlag() ? 1 : 0);
updateList.add(performSeatInfo);
} else {
// 新增
saveSeatInfo(projectList, seatPlan.getSeatPlanId(), value, seatPlan, insertList);
}
}
}
}
}
Map<Long, SeatPlan> seatPlanMap = seatPlans.stream().collect(Collectors.toMap(SeatPlan::getSeatPlanId, Function.identity(), (a, b) -> a));
List<Long> seatPlanIds = new ArrayList<>(seatPlanMap.keySet());
List<SeatPlanStatus> seatPlanStatusFromHttp = performServiceHttp.getSeatPlanStatusFromHttp(seatPlanIds);
if (CollectionUtils.isEmpty(seatPlanStatusFromHttp)) {
continue;
}
Map<Long, SeatPlanStatus> seatMap = seatPlanStatusFromHttp.stream()
.collect(Collectors.toMap(SeatPlanStatus::getSeatPlanId, Function.identity(), (a, b) -> a));
List<FPerformSeatInfo> insertList = Lists.newCopyOnWriteArrayList();
List<FPerformSeatInfo> updateList = Lists.newCopyOnWriteArrayList();
Map<Long, FPerformSeatInfo> existSeatMap = performSeatInfoMapper.getPerformSeatInfoBySeatIds(seatPlanIds)
.stream()
.collect(Collectors.toMap(FPerformSeatInfo::getSeatPlanId, Function.identity(), (a, b) -> a));
seatMap.forEach((k, v) -> {
if (v.getSoldOutFlag()) {
return;
}
if (existSeatMap.containsKey(k)) {
FPerformSeatInfo fPerformSeatInfo = existSeatMap.get(k);
if (v.getStandbyStatus().equals(10)) {
fPerformSeatInfo.setSoldStock(0);
}
updateList.add(fPerformSeatInfo);
} else {
SeatPlan seatPlan = seatPlanMap.get(k);
if (Objects.isNull(seatPlan)) {
return;
}
saveSeatInfo(projectList, k, v, seatPlan, insertList);
}
});
if (!CollectionUtils.isEmpty(insertList)) {
performSeatInfoMapper.batchSave(insertList);
}
@@ -472,11 +288,6 @@ public class PullDataFromFWDJob {
}
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.error("" , e);
}
}
private static void saveSeatInfo(ProjectList projectList, Long seatPlanId, SeatPlanStatus seatPlanStatus, SeatPlan seatPlan, List<FPerformSeatInfo> insertList) {
@@ -502,10 +313,14 @@ public class PullDataFromFWDJob {
fPerformSeatInfo.setDisplay(seatPlan.getDisplay());
fPerformSeatInfo.setAvailableTicketQuantity(seatPlan.getAvailableTicketQuantity());
fPerformSeatInfo.setAvailableAllTicketQuantity(seatPlan.getAvailableAllTicketQuantity());
fPerformSeatInfo.setSaleTime(DateUtils.getTimeFromStr(seatPlan.getSaleTime()));
if (StringUtils.isNotBlank(seatPlan.getSaleTime())) {
fPerformSeatInfo.setSaleTime(DateUtils.getTimeFromStr(seatPlan.getSaleTime()));
}
fPerformSeatInfo.setProjectId(projectList.getProjectId());
if (!seatPlanStatus.getSoldOutFlag() && seatPlanStatus.getStandbyStatus().equals(10)) {
fPerformSeatInfo.setSoldStock(0);
fPerformSeatInfo.setSoldOut(0);
} else {
fPerformSeatInfo.setSoldOut(1);
}
insertList.add(fPerformSeatInfo);
}
@@ -536,12 +351,4 @@ public class PullDataFromFWDJob {
projectInfo.setPreSaleTime(DateUtils.getDateTimeFromStr(projectInfoFromHttp.getPreSaleTime()));
}
}
private Map<Long, Boolean> getSeatMap(List<Long> seatPlanIds) {
List<SeatPlanStatus> statusFromHttp = performServiceHttp.getSeatPlanStatusFromHttp(seatPlanIds);
if (CollectionUtils.isEmpty(statusFromHttp)) {
return Maps.newHashMap();
}
return statusFromHttp.stream().collect(Collectors.toMap(SeatPlanStatus::getSeatPlanId, SeatPlanStatus::getSoldOutFlag, (a, b) -> a));
}
}

View File

@@ -146,7 +146,7 @@ public class FundInfoQueryJob {
}
log.info("查询的港股基金涨跌幅数据:{}", JSONObject.toJSONString(result));
result = result.stream().sorted(Comparator.comparing(FundMessage::getChange)).collect(Collectors.toList());
StringBuilder sb = new StringBuilder("今天的股行情:\n");
StringBuilder sb = new StringBuilder("今天的股行情:\n");
for (FundMessage fundMessage : result) {
sb.append("基金名称:")
.append(fundMessage.getName())
@@ -165,7 +165,7 @@ public class FundInfoQueryJob {
/**
* 基金涨跌幅5分钟超过2%重点通知
*/
@Scheduled(cron = "0 0/5 * * * ? ")
@Scheduled(cron = "0 0/5 9,10,11,13,14,15 * * ? ")
public void queryFundEmergencyJob() throws Exception {
// 周六周日过滤
if (Objects.equals(LocalDateTime.now().getDayOfWeek(), DayOfWeek.SATURDAY) ||

View File

@@ -16,6 +16,7 @@ spring:
max-idle: 8
min-idle: 0
max-wait: 1000
aliyun:
dns:
RR:

View File

@@ -23,3 +23,7 @@ server:
spring:
profiles:
active: local
http:
maxAttempts: 10
sleepMs: 200