Browse Source

fix(workhours): 月度写入加限流退避防丢数据 + 外部员工关联项目经理

问题一:月度全量/增量写入按人散落丢数据(吕加冕缺 6-16/6-25 等)
- 根因:concurrentUpsert 10 线程裸冲宜搭写接口触发「请求过于频繁」,
  紧贴重试 3 次全部命中同一限流窗口 → 单条记录被永久丢弃;增量同步只
  补近 2 天改档案的人,丢的不会自愈。同文件 backfillCfEmployee 早已
  修过同一问题,但月度主写入一直漏补。
- 修复:concurrentUpsert 加 RateLimiter.create(20.0) + 重试间
  Thread.sleep(1000*(retry+1)) 退避(与 cf 回填同款处方)。

问题二:外部员工 Manager 字段空缺(之前仅内部员工取钉钉直属主管)
- 新增 queryProjectPmMap:扫项目档案全量 + 遍历成员子表,按 userId
  匹配 PM;仅命中「排除已下线(成员子表 dateField_mo6s11tc < 目标月
  1 号)后唯一在线项目 + PM 非空」才写,多项目/无项目/PM 空均留空,
  不兜底 Raymond、不做离职校验(与前端 TimeCard 刻意不同:后端只是
  记数据,非审批找活人)。
- queryManagerMap 改双参:内部走钉钉主管、外部走 PM 匹配;
  syncOneEmployeeOneDay 外部分支也同步走 PM 匹配。
- 子表全量取数:内联 <50 直接用、==50 走 ydService.queryDetails
  递归取全。**NPE 兜底**:基座 _queryDetails 对宜搭返回 data=null 的
  子表查询触发 ArrayList.addAll(null) NPE(ApprovalWriteBack 用同款
  模式从未撞上——审批子表总有数据;但项目档案部分项目成员==50 触发了
  此边界)。NPE 时退回内联 50 行(可能漏第 51+ 行成员,业务可接受
  + 日志告警便于后续排查)。

配置:
- WHConf 加 formUuidProject 字段;dev yml 与 prod yml workhours
  块均加同字段(顺手补 prod yml 历史缺失的 workhours 完整块——本
  次新加 formUuidProject 若仅落 dev yml 在 prod profile 下加载不到)。

真机验证(已通过):
- /workhours/sync?month=2026-06 → fail=0 / success=16380 /
  Manager 落地 519(内部 431 + 外部 88) / 耗时 862s ≈ 20 QPS
- 外部员工 PM 匹配统计:275 待匹配 / 88 命中 / 0 多项目跳过 /
  0 PM 空 / 187 未匹配项目

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
malk 2 days ago
parent
commit
a3dadf0187

+ 3 - 0
mjava-akdsbeisen/src/main/java/com/malk/server/workhours/WHConf.java

@@ -17,6 +17,9 @@ public class WHConf {
 
     private String formUuidRequiredHours;
 
+    // prd 外部员工关联项目经理:项目档案表(含成员子表),用于按 userId 匹配 PM 写入 Manager 字段
+    private String formUuidProject;
+
     // prd 审批回写:工时汇总表 + 两类审批单
     private String formUuidWorkHoursSummary;
 

+ 188 - 22
mjava-akdsbeisen/src/main/java/com/malk/service/workhours/WorkHoursCalcService.java

@@ -8,6 +8,7 @@ import com.malk.server.aliwork.YDParam;
 import com.malk.server.dingtalk.DDR_New;
 import com.malk.server.workhours.WHConf;
 import com.malk.service.aliwork.YDClient;
+import com.malk.service.aliwork.YDService;
 import com.malk.service.dingtalk.DDClient;
 import com.malk.service.dingtalk.DDClient_Contacts;
 import lombok.extern.slf4j.Slf4j;
@@ -30,6 +31,9 @@ public class WorkHoursCalcService {
     @Autowired
     private YDClient ydClient;
 
+    @Autowired
+    private YDService ydService;
+
     @Autowired
     private WHConf whConf;
 
@@ -43,6 +47,12 @@ public class WorkHoursCalcService {
     private static final int THREAD_POOL_SIZE = 10;
     private static final int MAX_RETRY = 2;
 
+    // prd 外部员工关联项目经理:项目档案成员子表字段
+    private static final String PROJECT_SUB_TABLE = "tableField_mkowyn6d";      // 成员子表(人员明细)
+    private static final String PROJECT_SUB_MEMBER = "employeeField_mmbfe0ij";  // 子表-员工(成员)
+    private static final String PROJECT_SUB_PM = "employeeField_mkoxpswf";      // 子表-项目经理
+    private static final String PROJECT_SUB_OFFLINE = "dateField_mo6s11tc";     // 子表-下线时间
+
     /**
      * 主入口:计算指定月份每个工作日的应填报工时并写入宜搭(按天维度,每条记录8h)
      *
@@ -70,7 +80,7 @@ public class WorkHoursCalcService {
         }
 
         // 2. 查询直属主管(仅内部员工调用钉钉 API)
-        Map<String, String> managerMap = queryManagerMap(personnelMap);
+        Map<String, String> managerMap = queryManagerMap(personnelMap, targetMonth);
         log.info("获取到{}名员工的直属主管", managerMap.size());
         stats.put("managerCount", managerMap.size());
 
@@ -117,7 +127,7 @@ public class WorkHoursCalcService {
         }
 
         // 2. 查询直属主管(仅内部员工)
-        Map<String, String> managerMap = queryManagerMap(personnelMap);
+        Map<String, String> managerMap = queryManagerMap(personnelMap, today);
 
         // 3~4 查询节假日和工作日
         Map<LocalDate, String> holidayRules = queryHolidayRules(String.valueOf(year));
@@ -160,7 +170,7 @@ public class WorkHoursCalcService {
         }
         result.put("personnelInfo", info);
 
-        // 2. 内部员工查主管(外部员工跳过)
+        // 2. Manager 取值:内部=钉钉直属主管, 外部=项目档案唯一在线项目 PM
         String managerId = null;
         if ("内部".equals(String.valueOf(info.get("radioField_mkow4ydo")))) {
             try {
@@ -175,6 +185,10 @@ public class WorkHoursCalcService {
             } catch (Exception e) {
                 log.warn("获取员工{}直属主管失败: {}", userId, e.getMessage());
             }
+        } else {
+            // prd 外部员工:仅排除已下线后唯一在线项目+PM非空才写,否则留空
+            Map<String, String> pmMap = queryProjectPmMap(Collections.singleton(userId), workDay);
+            managerId = pmMap.get(userId);
         }
         result.put("managerId", managerId);
 
@@ -238,7 +252,7 @@ public class WorkHoursCalcService {
         }
 
         // 3. 主管
-        Map<String, String> managerMap = queryManagerMap(subset);
+        Map<String, String> managerMap = queryManagerMap(subset, targetMonth);
         stats.put("managerCount", managerMap.size());
 
         // 4. 节假日 + 工作日
@@ -501,6 +515,9 @@ public class WorkHoursCalcService {
                                    List<LocalDate> workingDays) {
         AtomicInteger successCount = new AtomicInteger(0);
         AtomicInteger failCount = new AtomicInteger(0);
+        // fixme: 宜搭写接口有突发 QPS 上限,10 线程裸跑会零星触发「请求过于频繁」导致单条记录被丢(按人散落缺日,
+        //        如吕加冕缺 6-16/6-25)。与 backfillCfEmployee 同款处方:20 QPS 全局限流 + 重试退避,补齐丢失记录。
+        RateLimiter yidaLimiter = RateLimiter.create(20.0);
         ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
         try {
             List<Future<?>> futures = new ArrayList<>();
@@ -515,12 +532,19 @@ public class WorkHoursCalcService {
                         boolean written = false;
                         for (int retry = 0; retry <= MAX_RETRY; retry++) {
                             try {
+                                yidaLimiter.acquire();
                                 upsertDailyHours(empId, mgrId, workDay, info);
                                 written = true;
                                 break;
                             } catch (Exception e) {
                                 if (retry < MAX_RETRY) {
                                     log.warn("员工{} {} 写入失败(第{}次重试)", empId, workDay, retry + 1);
+                                    // fixme: 「请求过于频繁」是瞬时突发限流,退避后再试才能补齐,紧贴重试会命中同一限流窗口
+                                    try {
+                                        Thread.sleep(1000L * (retry + 1));
+                                    } catch (InterruptedException ie) {
+                                        Thread.currentThread().interrupt();
+                                    }
                                 } else {
                                     log.error("员工{} {} 写入失败(已重试{}次)", empId, workDay, MAX_RETRY, e);
                                 }
@@ -724,41 +748,183 @@ public class WorkHoursCalcService {
     }
 
     /**
-     * 批量查询直属主管(仅内部员工调用钉钉 API,外部员工跳过避免无效查询)
+     * 批量查询应填报工时的 Manager 字段值:
+     * - 内部员工:钉钉 API 取 manager_userid(直属主管)
+     * - 外部员工:项目档案成员子表匹配 PM,仅「排除已下线后唯一在线项目 + PM 非空」才写,否则留空
+     * ppExt: 与前端 TimeCard 刻意不同 —— 不做 PM 离职探活、不兜底 Raymond(后端只记数据,非审批找活人)
      *
+     * @param personnelMap 全量人员档案
+     * @param targetMonth  目标月份(用于判定项目下线时间:下线时间 &lt; 目标月 1 号视为本月已下线)
      * @return Map<employeeId, managerUserId>
      */
     @SuppressWarnings("unchecked")
-    private Map<String, String> queryManagerMap(Map<String, Map<String, Object>> personnelMap) {
-        String accessToken = ddClient.getAccessToken();
+    private Map<String, String> queryManagerMap(Map<String, Map<String, Object>> personnelMap, LocalDate targetMonth) {
         Map<String, String> managerMap = new HashMap<>();
-        int skipCount = 0;
+        Set<String> externalIds = new HashSet<>();
+        String accessToken = ddClient.getAccessToken();
+        int internalCount = 0;
 
+        // 内部员工:钉钉直属主管;外部员工:先收集起来批量匹配项目档案 PM
         for (Map.Entry<String, Map<String, Object>> entry : personnelMap.entrySet()) {
             String empId = entry.getKey();
             Object attr = entry.getValue().get("radioField_mkow4ydo");
 
-            // 仅内部员工查询直属主管,外部员工跳过
-            if (!"内部".equals(String.valueOf(attr))) {
-                skipCount++;
-                continue;
+            if ("内部".equals(String.valueOf(attr))) {
+                internalCount++;
+                try {
+                    Map userInfo = ddClient_contacts.getUserInfoById(accessToken, empId);
+                    if (userInfo != null && userInfo.get("manager_userid") != null) {
+                        String mgrId = String.valueOf(userInfo.get("manager_userid"));
+                        if (!mgrId.isEmpty() && !"null".equals(mgrId)) {
+                            managerMap.put(empId, mgrId);
+                        }
+                    }
+                } catch (Exception e) {
+                    log.warn("获取员工{}直属主管失败: {}", empId, e.getMessage());
+                }
+            } else {
+                externalIds.add(empId);
             }
+        }
 
-            try {
-                Map userInfo = ddClient_contacts.getUserInfoById(accessToken, empId);
-                if (userInfo != null && userInfo.get("manager_userid") != null) {
-                    String mgrId = String.valueOf(userInfo.get("manager_userid"));
-                    if (!mgrId.isEmpty() && !"null".equals(mgrId)) {
-                        managerMap.put(empId, mgrId);
+        // 外部员工批量 PM 匹配(targetMonth 为 null 则跳过 PM 匹配,Manager 留空)
+        if (!externalIds.isEmpty() && targetMonth != null) {
+            Map<String, String> pmMap = queryProjectPmMap(externalIds, targetMonth);
+            managerMap.putAll(pmMap);
+        }
+
+        log.info("Manager 匹配完成: 内部{}人取直属主管, 外部{}人匹配PM, 总落地{}条",
+                internalCount, externalIds.size(), managerMap.size());
+        return managerMap;
+    }
+
+    /**
+     * 外部员工项目经理批量匹配:扫项目档案全量 + 遍历成员子表,仅命中「排除已下线后唯一在线项目 + PM 非空」的写入
+     * ppExt: 子表 &lt;50 行用内联、==50 走 ydService.queryDetails 取全(与 ApprovalWriteBackService.resolveDetailRows 同款)
+     *
+     * @param externalIds 外部员工 userId 集合
+     * @param targetMonth 目标月份(下线时间 &lt; 目标月 1 号 → 该项目本月已下线,排除)
+     * @return Map<外部员工 userId, 项目 PM userId>,仅含命中条件的外部员工
+     */
+    @SuppressWarnings("unchecked")
+    private Map<String, String> queryProjectPmMap(Set<String> externalIds, LocalDate targetMonth) {
+        Map<String, String> result = new HashMap<>();
+        if (externalIds == null || externalIds.isEmpty()) {
+            return result;
+        }
+
+        long monthStartMs = targetMonth.withDayOfMonth(1).atStartOfDay(ZoneId.systemDefault()).toInstant().toEpochMilli();
+
+        // 每个外部员工 → 在线项目集合(projectInstId → pmUserId,pmUserId 可能 null)
+        Map<String, Map<String, String>> userProjects = new HashMap<>();
+
+        String appType = whConf.getYidaAppType();
+        String systemToken = whConf.getYidaSystemToken();
+        String formUuidProject = whConf.getFormUuidProject();
+        int currentPage = 1;
+        int pageSize = YDConf.PAGE_SIZE_LIMIT;
+        long totalCount;
+
+        do {
+            DDR_New page = ydClient.queryData(YDParam.builder()
+                    .appType(appType)
+                    .systemToken(systemToken)
+                    .formUuid(formUuidProject)
+                    .currentPage(currentPage)
+                    .pageSize(pageSize)
+                    .build(), YDConf.FORM_QUERY.retrieve_list_all);
+
+            totalCount = page.getTotalCount();
+            List<Map> dataList = (List<Map>) page.getData();
+            if (dataList == null || dataList.isEmpty()) break;
+
+            for (Map item : dataList) {
+                Object instIdObj = item.get("formInstanceId");
+                if (instIdObj == null) continue;
+                String projectInstId = String.valueOf(instIdObj);
+
+                Map<String, Object> formData = (Map<String, Object>) item.get("formData");
+                if (formData == null) continue;
+
+                List<Map> subRows = resolveProjectMembers(projectInstId, (List<Map>) formData.get(PROJECT_SUB_TABLE));
+                for (Map row : subRows) {
+                    String memberId = extractEmployeeId(row, PROJECT_SUB_MEMBER);
+                    if (memberId == null || !externalIds.contains(memberId)) continue;
+
+                    // 下线时间判定:解析得到时间戳且 < 目标月首日 → 已下线,排除;解析不到/空 → 不过滤
+                    Object offlineObj = row.get(PROJECT_SUB_OFFLINE);
+                    if (offlineObj != null) {
+                        String s = String.valueOf(offlineObj).trim();
+                        if (!s.isEmpty()) {
+                            try {
+                                long offlineMs = Long.parseLong(s);
+                                if (offlineMs < monthStartMs) continue;
+                            } catch (NumberFormatException ignore) {
+                                // 解析失败视为无下线时间,不过滤
+                            }
+                        }
                     }
+
+                    String pmId = extractEmployeeId(row, PROJECT_SUB_PM);
+                    userProjects.computeIfAbsent(memberId, k -> new HashMap<>())
+                            .put(projectInstId, pmId);
                 }
-            } catch (Exception e) {
-                log.warn("获取员工{}直属主管失败: {}", empId, e.getMessage());
             }
+            currentPage++;
+        } while ((long) (currentPage - 1) * pageSize < totalCount);
+
+        // 命中条件:去重项目数 == 1 且 PM 非空
+        int multiCount = 0, noPmCount = 0, missCount = 0;
+        for (String uid : externalIds) {
+            Map<String, String> projects = userProjects.get(uid);
+            if (projects == null || projects.isEmpty()) {
+                missCount++;
+                continue;
+            }
+            if (projects.size() > 1) {
+                multiCount++;
+                continue;
+            }
+            String pmId = projects.values().iterator().next();
+            if (pmId == null || pmId.isEmpty()) {
+                noPmCount++;
+                continue;
+            }
+            result.put(uid, pmId);
         }
 
-        log.info("直属主管查询完成: 内部员工{}人, 外部跳过{}人", managerMap.size(), skipCount);
-        return managerMap;
+        log.info("外部员工PM匹配: 待匹配{}人, 命中{}人, 多项目跳过{}人, PM为空跳过{}人, 未匹配项目{}人",
+                externalIds.size(), result.size(), multiCount, noPmCount, missCount);
+        return result;
+    }
+
+    /**
+     * 项目档案成员子表取全:内联 &lt;50 直接用、==50 走 queryDetails 取全、null/NPE 兜底
+     * ppExt: 与 ApprovalWriteBackService.resolveDetailRows 规则同根,但本场景需多一层 NPE 兜底。
+     * fixme: 基座 _queryDetails 内 `details.addAll((List) ddr.getData())` 对宜搭返回 data=null 的子表查询会 NPE
+     *        (ApprovalWriteBack 用相同模式从未撞上——审批子表总有数据;但项目档案部分项目成员==50 触发了此边界)。
+     *        NPE 时退回内联的 50 行(可能漏第 51+ 行成员,业务可接受 + 日志告警便于后续排查)。
+     */
+    private List<Map> resolveProjectMembers(String projectInstId, List<Map> inlineRows) {
+        if (inlineRows == null) {
+            return Collections.emptyList();
+        }
+        if (inlineRows.size() < 50) {
+            return inlineRows;
+        }
+        try {
+            List<Map> full = ydService.queryDetails(YDParam.builder()
+                    .appType(whConf.getYidaAppType())
+                    .systemToken(whConf.getYidaSystemToken())
+                    .formInstanceId(projectInstId)
+                    .tableFieldId(PROJECT_SUB_TABLE)
+                    .pageNumber(1)
+                    .build());
+            return full != null ? full : Collections.emptyList();
+        } catch (NullPointerException npe) {
+            log.warn("项目档案{} 子表 queryDetails 触发基座 NPE(data=null), 退回内联 {} 行", projectInstId, inlineRows.size());
+            return inlineRows;
+        }
     }
 
     // ==================== 数据写入 ====================

+ 2 - 0
mjava-akdsbeisen/src/main/resources/application-dev.yml

@@ -70,6 +70,8 @@ workhours:
   formUuidHoliday: "FORM-BMG66GA16LC4CNCZLUUTKAX0G27U3YTMC27NM1"
   formUuidPersonnel: "FORM-CCEEE5D461694CBAB5999A8C2926D0C1RXQP"
   formUuidRequiredHours: "FORM-D9144092C2C24C93A1B02A8EEED0663FFD8G"
+  # 外部员工关联项目经理:项目档案表(含成员子表 tableField_mkowyn6d)
+  formUuidProject: "FORM-A4A04FCDE2024B23B386504954760F55HIB9"
   # 审批回写:工时汇总表 + 两类审批单(当前指向测试环境,正式见注释)
   formUuidWorkHoursSummary: "FORM-87E51B67EBC1461C8247ED7D5D3E3DD829KC"
   formUuidApproval: "FORM-230BB68C42304492A242CA278EF4A853LYIY"        # 正式 FORM-DDA1683645954101890AF6BA723705F9M0XV

+ 21 - 0
mjava-akdsbeisen/src/main/resources/application-prod.yml

@@ -49,3 +49,24 @@ beisen:
   formUuidLeave: "FORM-D15D90EA85234A77B94564110F5D24424ERL"
   yidaAppType: "APP_ZQ3I7XO2RSHDJ4QDEVNB"
   yidaSystemToken: "FOD66381NOS25MERLN2UK92FY96Y21UMHD7LM36S"
+
+workhours:
+  formUuidHoliday: "FORM-BMG66GA16LC4CNCZLUUTKAX0G27U3YTMC27NM1"
+  formUuidPersonnel: "FORM-CCEEE5D461694CBAB5999A8C2926D0C1RXQP"
+  formUuidRequiredHours: "FORM-D9144092C2C24C93A1B02A8EEED0663FFD8G"
+  # 外部员工关联项目经理:项目档案表(含成员子表 tableField_mkowyn6d)
+  formUuidProject: "FORM-A4A04FCDE2024B23B386504954760F55HIB9"
+  # 审批回写:工时汇总表 + 两类审批单(当前指向测试环境,正式见注释)
+  formUuidWorkHoursSummary: "FORM-87E51B67EBC1461C8247ED7D5D3E3DD829KC"
+  formUuidApproval: "FORM-230BB68C42304492A242CA278EF4A853LYIY"        # 正式 FORM-DDA1683645954101890AF6BA723705F9M0XV
+  formUuidOtherApproval: "FORM-4828E0E40CD34038825E8C6E25417B2718NF"   # 正式 FORM-BA14F6322DBD470F8CF84CCE131DEA31TIJS
+  yidaAppType: "APP_ZQ3I7XO2RSHDJ4QDEVNB"
+  yidaSystemToken: "FOD66381NOS25MERLN2UK92FY96Y21UMHD7LM36S"
+  # 审批回写同步状态字段:两类审批表 fieldId 一致(业务方建字段时同名同 ID)
+  # 同步状态 SelectField 选项必须为「全部成功 / 部分失败 / 全部失败」,与 ApprovalWriteBackService.syncStatus 计算口径对齐
+  approvalSyncStatusField: "selectField_mq58cd5p"
+  approvalSyncTotalField: "numberField_mq58cd5q"
+  approvalSyncFailField: "numberField_mq58cd5r"
+  otherApprovalSyncStatusField: "selectField_mq58cd5p"
+  otherApprovalSyncTotalField: "numberField_mq58cd5q"
+  otherApprovalSyncFailField: "numberField_mq58cd5r"