|
@@ -1,16 +1,18 @@
|
|
|
package com.nexo.job.service;
|
|
package com.nexo.job.service;
|
|
|
|
|
|
|
|
-import com.alibaba.fastjson2.JSONObject;
|
|
|
|
|
-import com.nexo.module.api.douyin.RemoteDouYinUserInfoService;
|
|
|
|
|
|
|
+import com.nexo.job.mapper.NexoDouyinUserInfoJobMapper;
|
|
|
import com.nexo.module.api.douyin.domain.NexoDouyinUserInfo;
|
|
import com.nexo.module.api.douyin.domain.NexoDouyinUserInfo;
|
|
|
import com.nexo.module.api.douyin.utils.DouYinUtils;
|
|
import com.nexo.module.api.douyin.utils.DouYinUtils;
|
|
|
-import com.xxl.job.core.context.XxlJobContext;
|
|
|
|
|
|
|
+import com.xxl.job.core.context.XxlJobHelper;
|
|
|
import com.xxl.job.core.handler.annotation.XxlJob;
|
|
import com.xxl.job.core.handler.annotation.XxlJob;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
-import org.apache.dubbo.config.annotation.DubboReference;
|
|
|
|
|
import org.springframework.stereotype.Service;
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
|
|
|
|
+import javax.annotation.Resource;
|
|
|
|
|
+import java.util.Date;
|
|
|
import java.util.List;
|
|
import java.util.List;
|
|
|
|
|
+import java.util.concurrent.*;
|
|
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -28,26 +30,128 @@ import java.util.stream.Collectors;
|
|
|
@Service
|
|
@Service
|
|
|
public class SampleService {
|
|
public class SampleService {
|
|
|
|
|
|
|
|
- @DubboReference
|
|
|
|
|
- private RemoteDouYinUserInfoService remoteDouYinUserInfoService;
|
|
|
|
|
|
|
+ @Resource
|
|
|
|
|
+ private NexoDouyinUserInfoJobMapper nexoDouyinUserInfoMapper;
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 抖音直播监控任务专用线程池
|
|
|
|
|
+ * <p>
|
|
|
|
|
+ * 线程池配置说明:
|
|
|
|
|
+ * - 核心线程数:5,保持常驻线程数量
|
|
|
|
|
+ * - 最大线程数:10,高峰期可扩展的最大线程数
|
|
|
|
|
+ * - 空闲线程存活时间:60秒,超过核心线程数的空闲线程将在60秒后回收
|
|
|
|
|
+ * - 工作队列:有界阻塞队列,容量1000,用于缓存待处理任务
|
|
|
|
|
+ * - 线程工厂:自定义线程命名,便于日志追踪和问题定位
|
|
|
|
|
+ * - 拒绝策略:CallerRunsPolicy,当队列满且线程达最大值时,由调用线程执行任务,避免任务丢失
|
|
|
|
|
+ * <p>
|
|
|
|
|
+ * 适用场景:
|
|
|
|
|
+ * - 批量处理抖音用户数据更新
|
|
|
|
|
+ * - 并发执行网络请求和数据库操作
|
|
|
|
|
+ * - 提高定时任务执行效率
|
|
|
|
|
+ */
|
|
|
|
|
+ private static final ExecutorService executor = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000), new ThreadFactory() {
|
|
|
|
|
+ private final AtomicInteger threadNumber = new AtomicInteger(1);
|
|
|
|
|
+
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public Thread newThread(Runnable r) {
|
|
|
|
|
+ Thread t = new Thread(r, "douyin-live-monitor-" + threadNumber.getAndIncrement());
|
|
|
|
|
+ t.setDaemon(false);
|
|
|
|
|
+ return t;
|
|
|
|
|
+ }
|
|
|
|
|
+ }, new ThreadPoolExecutor.CallerRunsPolicy());
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 抖音直播监控
|
|
|
|
|
|
|
+ * 抖音直播监控定时任务
|
|
|
|
|
+ * <p>
|
|
|
|
|
+ * 功能说明:
|
|
|
|
|
+ * 1. 获取所有需要更新的抖音用户信息列表
|
|
|
|
|
+ * 2. 使用线程池并发处理每个用户的数据更新
|
|
|
|
|
+ * 3. 调用抖音API获取最新用户信息
|
|
|
|
|
+ * 4. 批量更新数据库中对应的用户记录
|
|
|
|
|
+ * 5. 统计成功和失败数量,记录执行日志
|
|
|
|
|
+ * <p>
|
|
|
|
|
+ * 并发策略:
|
|
|
|
|
+ * - 使用预配置的线程池(核心30线程,最大50线程)并发执行
|
|
|
|
|
+ * - 通过CountDownLatch等待所有任务完成
|
|
|
|
|
+ * - 使用AtomicInteger保证计数的线程安全
|
|
|
|
|
+ * <p>
|
|
|
|
|
+ * 异常处理:
|
|
|
|
|
+ * - 单个用户处理失败不影响其他用户
|
|
|
|
|
+ * - 详细记录失败原因便于问题排查
|
|
|
|
|
+ * - 捕获中断异常并恢复中断状态
|
|
|
|
|
+ *
|
|
|
|
|
+ * @throws Exception 任务执行异常
|
|
|
*/
|
|
*/
|
|
|
@XxlJob("douyinLiveMonitor")
|
|
@XxlJob("douyinLiveMonitor")
|
|
|
public void douyinLiveMonitor() throws Exception {
|
|
public void douyinLiveMonitor() throws Exception {
|
|
|
- List<NexoDouyinUserInfo> updateInfoList = remoteDouYinUserInfoService.getUpdateInfoList(null);
|
|
|
|
|
|
|
+ long startTime = new Date().getTime();
|
|
|
|
|
+ List<NexoDouyinUserInfo> updateInfoList = nexoDouyinUserInfoMapper.getUpdateInfoList(null);
|
|
|
|
|
+ XxlJobHelper.log("更新用户数:{}", updateInfoList.size());
|
|
|
|
|
+
|
|
|
|
|
+ if (updateInfoList.isEmpty()) {
|
|
|
|
|
+ XxlJobHelper.log("没有需要更新的用户数据");
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ AtomicInteger successCount = new AtomicInteger(0);
|
|
|
|
|
+ AtomicInteger failCount = new AtomicInteger(0);
|
|
|
|
|
+ CountDownLatch latch = new CountDownLatch(updateInfoList.size());
|
|
|
|
|
+
|
|
|
for (int i = 0; i < updateInfoList.size(); i++) {
|
|
for (int i = 0; i < updateInfoList.size(); i++) {
|
|
|
- NexoDouyinUserInfo info = updateInfoList.get(i);
|
|
|
|
|
- List<String> strings = remoteDouYinUserInfoService.getUpdateInfoList(info.getSecUid()).stream().map(item -> item.getSecUid()).collect(Collectors.toList());
|
|
|
|
|
|
|
+ final NexoDouyinUserInfo info = updateInfoList.get(i);
|
|
|
|
|
+ final int index = i + 1;
|
|
|
|
|
|
|
|
|
|
+ executor.submit(() -> {
|
|
|
|
|
+ try {
|
|
|
|
|
+ long time1 = new Date().getTime();
|
|
|
|
|
+ XxlJobHelper.log("开始处理第{}条数据:{} - {} - {}", index, info.getUserId(), info.getNickname(), info.getSecUid());
|
|
|
|
|
+
|
|
|
|
|
+ NexoDouyinUserInfo userInfo = DouYinUtils.getUserInfo(info.getSecUid());
|
|
|
|
|
+ List<Long> longs = nexoDouyinUserInfoMapper.getUpdateInfoList(info.getSecUid()).stream().map(item -> item.getId()).collect(Collectors.toList());
|
|
|
|
|
+
|
|
|
|
|
+ XxlJobHelper.log("user_id {} 昵称 {} 需要更新 {} 条用户记录", info.getUserId(), info.getNickname(), longs.size());
|
|
|
|
|
+
|
|
|
|
|
+ for (Long id : longs) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ userInfo.setId(id);
|
|
|
|
|
+ userInfo.setJobInfoStatus(null);
|
|
|
|
|
+ userInfo.setJobLiveStatus(null);
|
|
|
|
|
+ userInfo.setJobVideoStatus(null);
|
|
|
|
|
+ userInfo.setCreateBy(null);
|
|
|
|
|
+ userInfo.setCreateTime(null);
|
|
|
|
|
+ userInfo.setDeptId(null);
|
|
|
|
|
+ userInfo.setUpdateTime(new Date());
|
|
|
|
|
+
|
|
|
|
|
+ nexoDouyinUserInfoMapper.updateById(userInfo);
|
|
|
|
|
+ successCount.incrementAndGet();
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ failCount.incrementAndGet();
|
|
|
|
|
+ XxlJobHelper.log("更新用户记录失败,ID: {}, 错误: {}", id, e.getMessage());
|
|
|
|
|
+ log.error("更新用户记录失败,ID: {}", id, e);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ long costTime = (new Date().getTime() - time1) / 1000;
|
|
|
|
|
+ XxlJobHelper.log("第{}条数据处理完成,耗时 {} 秒", index, costTime);
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ failCount.incrementAndGet();
|
|
|
|
|
+ XxlJobHelper.log("处理第{}条数据失败:{}, 错误: {}", index, info.getSecUid(), e.getMessage());
|
|
|
|
|
+ log.error("处理第{}条数据失败:{}", index, info.getSecUid(), e);
|
|
|
|
|
+ } finally {
|
|
|
|
|
+ latch.countDown();
|
|
|
|
|
+ }
|
|
|
|
|
+ });
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ try {
|
|
|
|
|
+ latch.await();
|
|
|
|
|
+ long totalTime = (new Date().getTime() - startTime) / 1000;
|
|
|
|
|
+ XxlJobHelper.log("所有数据处理完成!总计: {}, 成功: {}, 失败: {}, 总耗时: {} 秒", updateInfoList.size(), successCount.get(), failCount.get(), totalTime);
|
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
|
|
+ XxlJobHelper.log("任务被中断:{}", e.getMessage());
|
|
|
|
|
+ log.error("任务被中断", e);
|
|
|
}
|
|
}
|
|
|
- // 通过上下文获取任务参数
|
|
|
|
|
- String param = XxlJobContext.getXxlJobContext().getJobParam();
|
|
|
|
|
- JSONObject parsed = JSONObject.parseObject(param);
|
|
|
|
|
- String sec_uid = parsed.getString("sec_uid");
|
|
|
|
|
- NexoDouyinUserInfo userInfo = DouYinUtils.getUserInfo(sec_uid);
|
|
|
|
|
- remoteDouYinUserInfoService.updateDouYinUserInfo(userInfo);
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+
|
|
|
}
|
|
}
|