Преглед на файлове

fix(personnel-sync): 添加 QPS 限速 + 重试退避 + 定时任务并发保护

- PersonnelSyncConf: 新增 ddApiQps(50)/yidaApiQps(30) 配置, 可在 yml 覆盖
- PersonnelSyncServiceImpl: Guava RateLimiter 分别限速钉钉 user/get 和宜搭写接口;
  重试改为指数退避 (1s/2s/4s 上限 8s), 解决即时重试加剧 QPS 超限问题
- PersonnelSyncTimer: AtomicBoolean 防止上一轮未完成时下一轮并发触发 (会使 QPS 翻倍)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
malk преди 1 месец
родител
ревизия
c46dc37dcb

+ 4 - 0
mjava-akdsbeisen/src/main/java/com/malk/server/personnel/PersonnelSyncConf.java

@@ -56,4 +56,8 @@ public class PersonnelSyncConf {
     // 并发与重试
     private int concurrency = 10;
     private int maxRetry = 2;
+
+    // QPS 管控: 钉钉官方限 60/s, 宜搭写接口留余量
+    private double ddApiQps = 50.0;    // enrichManagers user/get 限速 (官方 60, 留 10 余量)
+    private double yidaApiQps = 30.0;  // 宜搭写接口限速 (10 并发下单线 QPS 峰值需控制)
 }

+ 28 - 0
mjava-akdsbeisen/src/main/java/com/malk/service/personnel/impl/PersonnelSyncServiceImpl.java

@@ -2,6 +2,7 @@ package com.malk.service.personnel.impl;
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
+import com.google.common.util.concurrent.RateLimiter;
 import com.malk.server.aliwork.YDConf;
 import com.malk.server.aliwork.YDParam;
 import com.malk.server.dingtalk.DDR_New;
@@ -14,6 +15,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import javax.annotation.PostConstruct;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -44,6 +47,17 @@ public class PersonnelSyncServiceImpl implements PersonnelSyncService {
     @Autowired
     private PersonnelSyncConf conf;
 
+    /** 钉钉 user/get 限速器 (官方 60 QPS, 默认留 10 余量) */
+    private RateLimiter ddRateLimiter;
+    /** 宜搭写接口限速器 (防止并发线程合计超限) */
+    private RateLimiter yidaRateLimiter;
+
+    @PostConstruct
+    private void initRateLimiters() {
+        ddRateLimiter = RateLimiter.create(conf.getDdApiQps());
+        yidaRateLimiter = RateLimiter.create(conf.getYidaApiQps());
+    }
+
     private static final String ACTION_CREATE = "CREATE";
     private static final String ACTION_UPDATE = "UPDATE";
     private static final String ACTION_MARK_OFF = "MARK_OFF";
@@ -515,6 +529,18 @@ public class PersonnelSyncServiceImpl implements PersonnelSyncService {
     private void executeAction(Action a, WriteStats stats) {
         int attempt = 0;
         while (true) {
+            // 指数退避: 第 1 次不等, 之后 1s/2s/4s... 最长 8s
+            if (attempt > 0) {
+                long backoffMs = Math.min(1000L * (1L << (attempt - 1)), 8000L);
+                try {
+                    Thread.sleep(backoffMs);
+                } catch (InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                    stats.failed.incrementAndGet();
+                    return;
+                }
+            }
+            yidaRateLimiter.acquire();
             try {
                 if (ACTION_CREATE.equals(a.type)) {
                     ydClient.operateData(YDParam.builder()
@@ -548,6 +574,7 @@ public class PersonnelSyncServiceImpl implements PersonnelSyncService {
                     stats.failed.incrementAndGet();
                     return;
                 }
+                log.info("[PersonnelSync] 写入重试 userid={} action={} attempt={} err={}", a.userid, a.type, attempt, ex.getMessage());
             }
         }
     }
@@ -610,6 +637,7 @@ public class PersonnelSyncServiceImpl implements PersonnelSyncService {
             Object uid = u.get("userid");
             if (uid == null) continue;
             try {
+                ddRateLimiter.acquire();
                 Map detail = ddClient_contacts.getUserInfoById(token, String.valueOf(uid));
                 Object mgr = detail == null ? null : detail.get("manager_userid");
                 if (mgr != null && notBlank(String.valueOf(mgr))) {

+ 11 - 0
mjava-akdsbeisen/src/main/java/com/malk/timer/PersonnelSyncTimer.java

@@ -7,6 +7,8 @@ import org.springframework.context.annotation.Configuration;
 import org.springframework.scheduling.annotation.EnableScheduling;
 import org.springframework.scheduling.annotation.Scheduled;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 /**
  * 钉钉 -> 宜搭 人员档案 定时全量增量同步
  * 每天 03:00 与 13:00 各跑一次 fullSync (limit 取配置 personnel-sync.limitFirstN, 生产为 0 即真·全量)
@@ -19,14 +21,23 @@ public class PersonnelSyncTimer {
     @Autowired
     private PersonnelSyncService personnelSyncService;
 
+    // fixme: 防止上一轮未完成时下一轮重入 (两轮并发会使 QPS 翻倍)
+    private final AtomicBoolean running = new AtomicBoolean(false);
+
     @Scheduled(cron = "0 0 3,13 * * ?")
     public void dailyFullSync() {
+        if (!running.compareAndSet(false, true)) {
+            log.warn("[PersonnelSync] 上次定时同步尚未结束,跳过本次触发");
+            return;
+        }
         log.info("[PersonnelSync] 定时同步任务开始");
         try {
             java.util.Map<String, Object> stats = personnelSyncService.fullSync(null);
             log.info("[PersonnelSync] 定时同步任务完成 {}", stats);
         } catch (Exception e) {
             log.error("[PersonnelSync] 定时同步任务失败", e);
+        } finally {
+            running.set(false);
         }
     }
 }