|
|
@@ -1,7 +1,15 @@
|
|
|
package com.malk.tonglibo.utils;
|
|
|
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
|
|
+import com.malk.server.aliwork.YDConf;
|
|
|
+import com.malk.server.aliwork.YDParam;
|
|
|
+import com.malk.server.aliwork.YDSearch;
|
|
|
+import com.malk.service.aliwork.YDClient;
|
|
|
import com.malk.tonglibo.Mapper.MachineDataMapper;
|
|
|
+import com.malk.tonglibo.Mapper.MachineDetailMapper;
|
|
|
import com.malk.tonglibo.Service.IMachineDataService;
|
|
|
+import com.malk.tonglibo.entity.MachineDetail;
|
|
|
import com.malk.tonglibo.entity.RawDeviceData;
|
|
|
import com.malk.tonglibo.entity.MachineData;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
@@ -26,12 +34,16 @@ public class DeviceDataPersistTask {
|
|
|
private MachineDataMapper machineDataMapper;
|
|
|
@Autowired
|
|
|
private IMachineDataService machineDataService;
|
|
|
+ @Autowired
|
|
|
+ private MachineDetailMapper machineDetailMapper;
|
|
|
|
|
|
@Autowired
|
|
|
private DataBuffer dataBuffer;
|
|
|
|
|
|
@Autowired
|
|
|
private ChangeDetector changeDetector;
|
|
|
+ @Autowired
|
|
|
+ private YDClient ydClient;
|
|
|
|
|
|
|
|
|
@Scheduled(fixedDelay = 1000) // 每秒执行一次
|
|
|
@@ -39,29 +51,48 @@ public class DeviceDataPersistTask {
|
|
|
List<RawDeviceData> dataList = dataBuffer.drain(500);
|
|
|
if (dataList.isEmpty()) return;
|
|
|
|
|
|
- List<MachineData> toInsert = new ArrayList<>();
|
|
|
- int unchangedCount = 0;
|
|
|
+ // "machineNo + 分钟"
|
|
|
+ Map<String, MachineData> latestPerMinute = new HashMap<>();
|
|
|
|
|
|
for (RawDeviceData item : dataList) {
|
|
|
MachineData data = convertToMachineData(item);
|
|
|
- if (data == null) continue;
|
|
|
+ if (data == null) {
|
|
|
+ log.warn("设备数据转换失败,跳过: {}", item);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ //生成 key:machineNo + 时间分钟
|
|
|
+ String minuteKey = data.getMachineNo() + "_" +
|
|
|
+ data.getTime().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm"));
|
|
|
+ // 保留最新的数据
|
|
|
+ MachineData existing = latestPerMinute.get(minuteKey);
|
|
|
+ if (existing == null || data.getTime().isAfter(existing.getTime())) {
|
|
|
+ latestPerMinute.put(minuteKey, data);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //对每分钟最新数据进行 MD5 变化检测
|
|
|
+ List<MachineData> toInsert = new ArrayList<>();
|
|
|
+ int unchangedCount = 0;
|
|
|
|
|
|
- //使用 MD5 判断是否变化
|
|
|
+ for (MachineData data : latestPerMinute.values()) {
|
|
|
if (changeDetector.isChanged(data)) {
|
|
|
- String fingerprint = MD5Util.md5(changeDetector.concatFields(data)); // 或在 convert 时生成
|
|
|
+ String fingerprint = MD5Util.md5(changeDetector.concatFields(data));
|
|
|
data.setDataFingerprint(fingerprint);
|
|
|
toInsert.add(data);
|
|
|
- changeDetector.recordAsCurrent(data.getParamId(), fingerprint);
|
|
|
+ changeDetector.recordAsCurrent(data.getMachineNo(), fingerprint); // 更新为当前指纹
|
|
|
} else {
|
|
|
unchangedCount++;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ // 批量入库
|
|
|
if (!toInsert.isEmpty()) {
|
|
|
machineDataService.saveBatch(toInsert, 100);
|
|
|
- log.info("入库 {} 条(变化数据),跳过 {} 条未变数据", toInsert.size(), unchangedCount);
|
|
|
+ log.info("入库 {} 条(每设备每分钟最新且变化数据),跳过 {} 条未变数据,原始上报 {} 条",
|
|
|
+ toInsert.size(), unchangedCount, dataList.size());
|
|
|
} else if (unchangedCount > 0) {
|
|
|
- log.debug("跳过 {} 条未变化数据", unchangedCount);
|
|
|
+ log.debug("跳过 {} 条未变化数据(已按分钟合并)", unchangedCount);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -81,30 +112,44 @@ public class DeviceDataPersistTask {
|
|
|
|
|
|
// 填充字段(根据索引取值)
|
|
|
machineData.setJqyxsh(safeGet(data, 0));
|
|
|
- machineData.setZcn(safeGet(data, 2));
|
|
|
- machineData.setZqsj(safeGet(data, 4));
|
|
|
- machineData.setSgdyl(safeGet(data, 6));
|
|
|
- machineData.setXgdyl(safeGet(data, 8));
|
|
|
- machineData.setYskqyl1(safeGet(data, 10));
|
|
|
- machineData.setYskqyl2(safeGet(data, 12));
|
|
|
- machineData.setZymwd1(safeGet(data, 14));
|
|
|
- machineData.setZymwd2(safeGet(data, 16));
|
|
|
- machineData.setZymwd3(safeGet(data, 18));
|
|
|
- machineData.setZymwd4(safeGet(data, 20));
|
|
|
- machineData.setSmwd1(safeGet(data, 22));
|
|
|
- machineData.setSmwd2(safeGet(data, 24));
|
|
|
- machineData.setSmwd3(safeGet(data, 26));
|
|
|
- machineData.setSmwd4(safeGet(data, 28));
|
|
|
- machineData.setXmwd1(safeGet(data, 30));
|
|
|
- machineData.setXmwd2(safeGet(data, 32));
|
|
|
- machineData.setXmwd3(safeGet(data, 34));
|
|
|
- machineData.setXmwd4(safeGet(data, 36));
|
|
|
- machineData.setXjsj(safeGet(data, 38));
|
|
|
- machineData.setTssj(safeGet(data, 40));
|
|
|
- machineData.setZysj(safeGet(data, 42));
|
|
|
+ machineData.setZcn(safeGet(data, 1));
|
|
|
+ machineData.setDn(safeGet(data, 2));
|
|
|
+ machineData.setZqsj(safeGet(data, 3));
|
|
|
+ machineData.setSgdyl(safeGet(data, 4));
|
|
|
+ machineData.setXgdyl(safeGet(data, 5));
|
|
|
+ machineData.setYskqyl1(safeGet(data, 6));
|
|
|
+ machineData.setYskqyl2(safeGet(data, 7));
|
|
|
+ machineData.setZymwd1(safeGet(data, 8));
|
|
|
+ machineData.setZymwd2(safeGet(data, 9));
|
|
|
+ machineData.setZymwd3(safeGet(data, 10));
|
|
|
+ machineData.setZymwd4(safeGet(data, 11));
|
|
|
+ machineData.setSmwd1(safeGet(data, 12));
|
|
|
+ machineData.setSmwd2(safeGet(data, 13));
|
|
|
+ machineData.setSmwd3(safeGet(data, 14));
|
|
|
+ machineData.setSmwd4(safeGet(data, 15));
|
|
|
+ machineData.setXmwd1(safeGet(data, 16));
|
|
|
+ machineData.setXmwd2(safeGet(data, 17));
|
|
|
+ machineData.setXmwd3(safeGet(data, 18));
|
|
|
+ machineData.setXmwd4(safeGet(data, 19));
|
|
|
+ machineData.setXjsj(safeGet(data, 20));
|
|
|
+ machineData.setTssj(safeGet(data, 21));
|
|
|
+ machineData.setZysj(safeGet(data, 22));
|
|
|
+ List<MachineDetail> machineList = machineDetailMapper.selectList(new QueryWrapper<MachineDetail>().eq("machine_no", key));
|
|
|
+ if (machineList != null && !machineList.isEmpty()){
|
|
|
+ MachineDetail yidaParam = machineList.get(0);
|
|
|
+ String machineName = yidaParam.getMachineName();
|
|
|
+ String machineCode = yidaParam.getMachineCode();
|
|
|
+ String machineFlag = yidaParam.getMachineFlag();
|
|
|
+ if (machineName != null) {
|
|
|
+ machineData.setMachineName(machineName);
|
|
|
+ machineData.setMachineCode(machineCode);
|
|
|
+ machineData.setMachineFlag(machineFlag);
|
|
|
+ } else {
|
|
|
+ log.warn("未在宜搭中找到设备编号 {} 对应的名称,使用默认值", key);
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
return machineData;
|
|
|
-
|
|
|
} catch (Exception e) {
|
|
|
log.error("转换数据失败: {}", item, e);
|
|
|
return null;
|
|
|
@@ -114,7 +159,12 @@ public class DeviceDataPersistTask {
|
|
|
// 安全获取索引值
|
|
|
private String safeGet(List<String> list, int index) {
|
|
|
if (index >= 0 && index < list.size()) {
|
|
|
- return list.get(index);
|
|
|
+ String value = list.get(index);
|
|
|
+ // 切割掉 . 后面的数据
|
|
|
+ if (value != null) {
|
|
|
+ String[] parts = value.split("\\.");
|
|
|
+ return parts[0]; // 返回点号前面的部分
|
|
|
+ }
|
|
|
}
|
|
|
return "";
|
|
|
}
|