Bläddra i källkod

feat(workhours): 增量定时改每天3次 + 新增是否cf员工字段及存量回填

- 定时增量同步由每日1次(凌晨3点)改为每日3次(07:00/12:15/18:30),
  用 @Scheduled 可重复注解挂3条cron(单条cron时/分独立无法表达离散时点);月度全量不变
- 新增「是否cf员工」字段:人员档案 textField_mow9w7d8 -> 应填报工时 textField_mpp7a2k7,
  查询/写入逻辑同步,小批量回显样本加该字段
- 新增 backfillCfEmployee(dryRun) + GET /workhours/backfill-cf 回填存量记录cf字段:
  按应填报日期逐月分区查询(绕宜搭search 3万条上限),update 用 useLatestVersion=true
  刷新到含新字段的最新表单版本 + ignoreEmpty=false,20 QPS限流 + 失败重试退避,幂等可重跑
- 已执行: 扫描42829/更新26054/失败0,dryRun校验 toUpdate=0

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
malk 2 veckor sedan
förälder
incheckning
31e1ea03b5

+ 24 - 0
mjava-akdsbeisen/src/main/java/com/malk/controller/WorkHoursController.java

@@ -99,6 +99,30 @@ public class WorkHoursController {
         return result;
     }
 
+    /**
+     * 回填存量应填报工时的「是否cf员工」字段(textField_mpp7a2k7),值取自人员档案
+     * GET /workhours/backfill-cf            (实际更新)
+     * GET /workhours/backfill-cf?dryRun=true(仅扫描预览,不更新)
+     */
+    @GetMapping("/backfill-cf")
+    public Map<String, Object> backfillCf(@RequestParam(defaultValue = "false") boolean dryRun) {
+        Map<String, Object> result = new LinkedHashMap<>();
+        try {
+            long start = System.currentTimeMillis();
+            Map<String, Object> stats = workHoursCalcService.backfillCfEmployee(dryRun);
+            long cost = System.currentTimeMillis() - start;
+            result.put("success", true);
+            result.put("message", dryRun ? "cf字段回填预览完成(未更新)" : "cf字段回填完成");
+            result.put("stats", stats);
+            result.put("costMs", cost);
+        } catch (Exception e) {
+            log.error("cf字段回填失败", e);
+            result.put("success", false);
+            result.put("message", e.getMessage());
+        }
+        return result;
+    }
+
     /**
      * 全量同步当月应填报工时
      * GET /workhours/sync?month=2026-04

+ 176 - 0
mjava-akdsbeisen/src/main/java/com/malk/service/workhours/WorkHoursCalcService.java

@@ -2,6 +2,7 @@ package com.malk.service.workhours;
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
+import com.google.common.util.concurrent.RateLimiter;
 import com.malk.server.aliwork.YDConf;
 import com.malk.server.aliwork.YDParam;
 import com.malk.server.dingtalk.DDR_New;
@@ -225,6 +226,7 @@ public class WorkHoursCalcService {
             s.put("userId", entry.getKey());
             s.put("textField_mh8xhqc1", info.get("textField_mh8xhqc1"));
             s.put("textField_mmekrcji", info.get("textField_mmekrcji"));
+            s.put("textField_mpp7a2k7", info.get("textField_mpp7a2k7"));
             sample.add(s);
             if (subset.size() >= limit) break;
         }
@@ -319,6 +321,174 @@ public class WorkHoursCalcService {
         log.info("应填报工时数据清空完成, 共删除{}条", totalDeleted);
     }
 
+    /**
+     * 回填存量应填报工时记录的「是否cf员工」字段(textField_mpp7a2k7)
+     * ppExt: 从人员档案预加载 userId→cf值,先只读全量扫描收集待更新实例(避免边查边改导致默认排序变动而漏读/重读),
+     *        再按 formInstanceId 逐条 update;update 用 useLatestVersion=true 刷新到含新字段的最新表单版本、
+     *        ignoreEmpty=false 确保值写入。已是目标值的记录跳过,可重复执行。
+     *
+     * @param dryRun true 仅扫描预览不实际更新
+     * @return Map{total, toUpdate, updated, skippedNoCf, skippedSame, fail}
+     */
+    @SuppressWarnings("unchecked")
+    public Map<String, Object> backfillCfEmployee(boolean dryRun) {
+        Map<String, Object> stats = new LinkedHashMap<>();
+        String appType = whConf.getYidaAppType();
+        String systemToken = whConf.getYidaSystemToken();
+
+        // 1. 预加载人员档案 userId → cf值(queryAllPersonnelDetails 已把源 textField_mow9w7d8 存到 textField_mpp7a2k7)
+        Map<String, Map<String, Object>> personnelMap = queryAllPersonnelDetails();
+        log.info("回填cf: 人员档案共{}条", personnelMap.size());
+        if (personnelMap.isEmpty()) {
+            log.warn("回填cf: 人员档案为空,跳过");
+            stats.put("total", 0);
+            return stats;
+        }
+
+        // 2. 只读扫描应填报工时记录,收集待更新 [实例ID, cf值]
+        //    fixme: 宜搭 search 仅支持返回前 30000 条,本表跨多月已超限,故按【应填报日期】逐月分区查询(月内 < 30000)
+        List<String[]> toUpdate = new ArrayList<>();
+        int total = 0, skippedNoCf = 0, skippedSame = 0;
+        int pageSize = YDConf.PAGE_SIZE_LIMIT;
+        ZoneId zone = ZoneId.systemDefault();
+        LocalDate monthCursor = LocalDate.of(2026, 4, 1);                    // 数据起点:应填报工时自 2026-04 起有数据
+        LocalDate scanEnd = LocalDate.now().withDayOfMonth(1).plusMonths(1); // 含当前月(当前 4/5/6 共 3 个月)
+        while (monthCursor.isBefore(scanEnd)) {
+            LocalDate nextMonth = monthCursor.plusMonths(1);
+            long startMs = monthCursor.atStartOfDay(zone).toInstant().toEpochMilli();
+            long endMs = nextMonth.atStartOfDay(zone).toInstant().toEpochMilli() - 1;
+            Map<String, Object> dateRange = new HashMap<>();
+            dateRange.put("dateField_mmd8onl5", Arrays.asList(startMs, endMs));
+            String searchFieldJson = JSON.toJSONString(dateRange);
+
+            int currentPage = 1;
+            long totalCount;
+            do {
+                DDR_New result = ydClient.queryData(YDParam.builder()
+                        .appType(appType)
+                        .systemToken(systemToken)
+                        .formUuid(whConf.getFormUuidRequiredHours())
+                        .searchFieldJson(searchFieldJson)
+                        .currentPage(currentPage)
+                        .pageSize(pageSize)
+                        .build(), YDConf.FORM_QUERY.retrieve_search_form);
+
+                totalCount = result.getTotalCount();
+                List<Map> dataList = (List<Map>) result.getData();
+                if (dataList == null || dataList.isEmpty()) break;
+
+                for (Map item : dataList) {
+                    total++;
+                    Object instId = item.get("formInstanceId");
+                    Map<String, Object> formData = (Map<String, Object>) item.get("formData");
+                    if (instId == null || formData == null) {
+                        skippedNoCf++;
+                        continue;
+                    }
+
+                    String empId = extractEmployeeId(formData, "employeeField_mmd8onl4");
+                    Map<String, Object> info = empId == null ? null : personnelMap.get(empId);
+                    Object cfValue = info == null ? null : info.get("textField_mpp7a2k7");
+                    if (cfValue == null || String.valueOf(cfValue).trim().isEmpty()) {
+                        skippedNoCf++;
+                        continue;
+                    }
+                    // 已是目标值则跳过,保证可重复执行
+                    Object cur = formData.get("textField_mpp7a2k7");
+                    if (cur != null && String.valueOf(cur).equals(String.valueOf(cfValue))) {
+                        skippedSame++;
+                        continue;
+                    }
+                    toUpdate.add(new String[]{String.valueOf(instId), String.valueOf(cfValue)});
+                }
+                currentPage++;
+            } while ((long) (currentPage - 1) * pageSize < totalCount);
+
+            log.info("回填cf扫描[{}]: 累计扫描{}, 待更新{}", monthCursor, total, toUpdate.size());
+            monthCursor = nextMonth;
+        }
+
+        log.info("回填cf扫描完成: 共{}条, 待更新{}, 无cf跳过{}, 已是目标值跳过{}",
+                total, toUpdate.size(), skippedNoCf, skippedSame);
+
+        stats.put("total", total);
+        stats.put("toUpdate", toUpdate.size());
+        stats.put("skippedNoCf", skippedNoCf);
+        stats.put("skippedSame", skippedSame);
+        if (dryRun) {
+            log.info("回填cf: dryRun 模式,仅预览不更新");
+            stats.put("updated", 0);
+            stats.put("fail", 0);
+            stats.put("dryRun", true);
+            return stats;
+        }
+
+        // 3. 并发更新(10线程,按实例ID维度分任务)
+        // fixme: 宜搭写接口有 QPS 上限,10 线程裸跑会触发「请求过于频繁」,用 30 QPS 限流(与 PersonnelSync 一致)
+        AtomicInteger updated = new AtomicInteger(0);
+        AtomicInteger fail = new AtomicInteger(0);
+        RateLimiter yidaLimiter = RateLimiter.create(20.0);
+        ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
+        try {
+            List<Future<?>> futures = new ArrayList<>();
+            for (String[] pair : toUpdate) {
+                futures.add(executor.submit(() -> {
+                    JSONObject upd = new JSONObject();
+                    upd.put("textField_mpp7a2k7", pair[1]);
+                    boolean ok = false;
+                    // fixme: 宜搭「请求过于频繁」是瞬时突发限流,重试+退避可补齐
+                    for (int retry = 0; retry <= MAX_RETRY && !ok; retry++) {
+                        try {
+                            yidaLimiter.acquire();
+                            ydClient.operateData(YDParam.builder()
+                                    .appType(appType)
+                                    .systemToken(systemToken)
+                                    .formUuid(whConf.getFormUuidRequiredHours())
+                                    .formInstanceId(pair[0])
+                                    .updateFormDataJson(upd.toJSONString())
+                                    .ignoreEmpty(false)
+                                    .useLatestVersion(true)
+                                    .build(), YDConf.FORM_OPERATION.update);
+                            ok = true;
+                        } catch (Exception e) {
+                            if (retry < MAX_RETRY) {
+                                try {
+                                    Thread.sleep(1000L * (retry + 1));
+                                } catch (InterruptedException ie) {
+                                    Thread.currentThread().interrupt();
+                                }
+                            } else {
+                                fail.incrementAndGet();
+                                log.error("回填cf失败: instId={}", pair[0], e);
+                            }
+                        }
+                    }
+                    if (ok) {
+                        int n = updated.incrementAndGet();
+                        if (n % 500 == 0) {
+                            log.info("回填cf进度: 已更新{}/{}", n, toUpdate.size());
+                        }
+                    }
+                }));
+            }
+            for (Future<?> f : futures) {
+                try {
+                    f.get();
+                } catch (Exception e) {
+                    log.error("回填cf线程执行异常", e);
+                }
+            }
+        } finally {
+            executor.shutdown();
+        }
+
+        stats.put("updated", updated.get());
+        stats.put("fail", fail.get());
+        log.info("回填cf完成: 扫描{}, 更新{}, 失败{}, 无cf跳过{}, 已是目标值跳过{}",
+                total, updated.get(), fail.get(), skippedNoCf, skippedSame);
+        return stats;
+    }
+
     // ==================== 多线程并发写入 ====================
 
     /**
@@ -485,6 +655,8 @@ public class WorkHoursCalcService {
                     info.put("departmentSelectField_mkow4ydr", formData.get("departmentSelectField_mkow4ydr_id"));
                     // 归属公司:人员档案是 SelectField 下拉,直接取字符串;目标表单写入仍用 textField_mmekrcji
                     info.put("textField_mmekrcji", formData.get("selectField_mh8xhqc4"));
+                    // prd: 是否cf员工,源 textField_mow9w7d8(人员档案)→ 目标 textField_mpp7a2k7(应填报工时)
+                    info.put("textField_mpp7a2k7", formData.get("textField_mow9w7d8"));
                     personnelMap.put(empId, info);
                 }
             }
@@ -540,6 +712,8 @@ public class WorkHoursCalcService {
                     info.put("departmentSelectField_mkow4ydr", formData.get("departmentSelectField_mkow4ydr_id"));
                     // 归属公司:人员档案是 SelectField 下拉,直接取字符串;目标表单写入仍用 textField_mmekrcji
                     info.put("textField_mmekrcji", formData.get("selectField_mh8xhqc4"));
+                    // prd: 是否cf员工,源 textField_mow9w7d8(人员档案)→ 目标 textField_mpp7a2k7(应填报工时)
+                    info.put("textField_mpp7a2k7", formData.get("textField_mow9w7d8"));
                     personnelMap.put(empId, info);
                 }
             }
@@ -626,6 +800,8 @@ public class WorkHoursCalcService {
                 }
             }
             putIfNotNull(formData, "textField_mmekrcji", personnelInfo.get("textField_mmekrcji"));
+            // prd: 是否cf员工(源 textField_mow9w7d8 → 目标 textField_mpp7a2k7)
+            putIfNotNull(formData, "textField_mpp7a2k7", personnelInfo.get("textField_mpp7a2k7"));
         }
 
         // fixme: 日期组件在 searchCondition 中必须使用数组格式 [start, end]

+ 6 - 2
mjava-akdsbeisen/src/main/java/com/malk/timer/WorkHoursCalcTimer.java

@@ -27,8 +27,12 @@ public class WorkHoursCalcTimer {
         }
     }
 
-    // 每月2号~31号,每天凌晨3点:增量同步(查询人员档案最近2天修改的)
-    @Scheduled(cron = "0 0 3 2-31 * ?")
+    // prd: 增量同步改为每天 3 次(07:00 / 12:15 / 18:30),查询人员档案最近2天修改的
+    // fixme: 单条 cron 的时/分字段相互独立,无法表达「7:00 与 12:15 与 18:30」三个离散时点
+    //        (会变成时×分笛卡尔积),故用 @Scheduled 可重复注解挂 3 条独立 cron
+    @Scheduled(cron = "0 0 7 * * ?")
+    @Scheduled(cron = "0 15 12 * * ?")
+    @Scheduled(cron = "0 30 18 * * ?")
     public void calcDailyIncrementalSync() {
         log.info("开始执行应填报工时【增量】同步任务");
         try {