From 5879d8bbe51276ad2fc2004b3b99ee9339667ea3 Mon Sep 17 00:00:00 2001 From: zhouhao Date: Wed, 15 Oct 2025 14:05:09 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E4=BC=98=E5=8C=96=E5=91=8A?= =?UTF-8?q?=E8=AD=A6=E8=A7=A3=E9=99=A4=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../engine/alarm/DefaultAlarmHandler.java | 74 +++++++++++-------- pom.xml | 2 +- 2 files changed, 44 insertions(+), 32 deletions(-) diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/DefaultAlarmHandler.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/DefaultAlarmHandler.java index 19fb97ed..7125bc91 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/DefaultAlarmHandler.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/DefaultAlarmHandler.java @@ -111,6 +111,7 @@ public class DefaultAlarmHandler implements AlarmHandler { AlarmRecordEntity record = ofRecord(cache, alarmInfo); AlarmResult result = ofRecordCache(cache); AlarmHistoryInfo historyInfo = createHistory(record, alarmInfo); + result.setRecordId(recordId); //时间滞后的告警数据,仅记录日志. if (!cache.isEffectiveTrigger(alarmInfo.getAlarmTime())) { return historyService @@ -169,6 +170,9 @@ public class DefaultAlarmHandler implements AlarmHandler { @Override public Mono relieveAlarm(RelieveInfo relieveInfo) { + String recordId = createRecordId(relieveInfo); + ReactiveLock lock = ReactiveLockHolder + .getLock("triggerAlarm:" + recordId); return this .getRecordCache(createRecordId(relieveInfo)) .flatMap(cache -> { @@ -179,27 +183,28 @@ public class DefaultAlarmHandler implements AlarmHandler { AlarmRecordEntity record = this.ofRecord(cache, relieveInfo); AlarmHistoryInfo historyInfo = this.createHistory(record, relieveInfo); RelieveResult relieveResult = createRelieveResult(record, relieveInfo); + relieveResult.setRecordId(record.getId()); //无效解除告警,更新解除告警时间 if (!cache.isEffectiveRelieve(relieveInfo.getRelieveTime())) { return this.updateRecordCache( - record.getId(), - relieveTime, - _cache -> _cache.withRelievedTime(relieveTime), - false, true) - .thenReturn(relieveResult); + record.getId(), + relieveTime, + _cache -> _cache.withRelievedTime(relieveTime), + false, true) + .thenReturn(relieveResult); } return Mono .zip(alarmRecordService.changeRecordState( - StringUtils.hasText(relieveInfo.getAlarmRelieveType()) - ? relieveInfo.getAlarmRelieveType() - : AlarmHandleType.system.getValue(), - AlarmRecordState.normal, - record.getId()), - this.updateRecordCache(record.getId(), - relieveTime, - _cache -> _cache.relieved(relieveTime), - false, true), - (total, ignore) -> total) + StringUtils.hasText(relieveInfo.getAlarmRelieveType()) + ? relieveInfo.getAlarmRelieveType() + : AlarmHandleType.system.getValue(), + AlarmRecordState.normal, + record.getId()), + this.updateRecordCache(record.getId(), + relieveTime, + _cache -> _cache.relieved(relieveTime), + false, true), + (total, ignore) -> total) .flatMap(total -> { //如果有数据被更新说明是正在告警中 if (total > 0) { @@ -212,7 +217,8 @@ public class DefaultAlarmHandler implements AlarmHandler { } return Mono.empty(); }); - }); + }) + .as(lock::lock); } public Mono publishAlarmHandleHistory(AlarmInfo alarmInfo, AlarmHandleInfo info) { @@ -339,9 +345,9 @@ public class DefaultAlarmHandler implements AlarmHandler { private Mono publishAlarmLog(AlarmHistoryInfo historyInfo, AlarmInfo alarmInfo) { String topic = Topics.alarmLog(historyInfo.getTargetType(), - historyInfo.getTargetId(), - historyInfo.getAlarmConfigId(), - historyInfo.getAlarmRecordId()); + historyInfo.getTargetId(), + historyInfo.getAlarmConfigId(), + historyInfo.getAlarmRecordId()); eventPublisher.publishEvent(historyInfo); return doPublishAlarmInfo(topic, historyInfo, alarmInfo); @@ -351,9 +357,9 @@ public class DefaultAlarmHandler implements AlarmHandler { AlarmRecordEntity record) { return this .updateRecordCache(record.getId(), - result.getAlarmTime(), - cache -> cache.with(result), - true, false) + result.getAlarmTime(), + cache -> cache.with(result), + true, false) .thenReturn(result); } @@ -361,13 +367,13 @@ public class DefaultAlarmHandler implements AlarmHandler { private AlarmHandleInfo createRelieveAlarmHandleInfo(RelieveInfo relieveInfo, AlarmRecordEntity record) { AlarmHandleInfo alarmHandleInfo = createAlarmHandleInfo(record); alarmHandleInfo.setHandleTime(relieveInfo.getRelieveTime() == null - ? System.currentTimeMillis() : relieveInfo.getRelieveTime()); + ? System.currentTimeMillis() : relieveInfo.getRelieveTime()); alarmHandleInfo.setState(AlarmRecordState.normal); alarmHandleInfo.setType(relieveInfo.getAlarmRelieveType() == null ? - AlarmHandleType.system.getValue() : relieveInfo.getAlarmRelieveType()); + AlarmHandleType.system.getValue() : relieveInfo.getAlarmRelieveType()); alarmHandleInfo.setDescribe(StringUtils.hasText(relieveInfo.getDescribe()) - ? relieveInfo.getDescribe() - : getLocaleDescribe()); + ? relieveInfo.getDescribe() + : getLocaleDescribe()); alarmHandleInfo.setHandleState(AlarmHandleState.processed); return alarmHandleInfo; } @@ -385,11 +391,14 @@ public class DefaultAlarmHandler implements AlarmHandler { return LocaleUtils.resolveMessage("message.scene_triggered_relieve_alarm", "场景触发解除告警"); } - private String createRecordId(AlarmInfo alarmInfo) { return AlarmRecordEntity.generateId(alarmInfo.getTargetId(), alarmInfo.getTargetType(), alarmInfo.getAlarmConfigId()); } + private String createRecordId(RelieveInfo alarmInfo) { + return AlarmRecordEntity.generateId(alarmInfo.getTargetId(), alarmInfo.getTargetType(), alarmInfo.getAlarmConfigId()); + } + private Mono getRecordCache(String recordId) { return storageManager .getStorage(CACHE_ID) @@ -473,17 +482,17 @@ public class DefaultAlarmHandler implements AlarmHandler { //告警在解除最近一次告警之后才算有效 public boolean isEffectiveTrigger(long timestamp) { //只获取时间不可行或者告警仍在运行也需要解除 - return timestamp > relieve.reliveTime || !trigger.isAlarming(); + return timestamp > relieve.reliveTime || !trigger.isAlarming() || trigger.isCacheMiss(); } //解除告警在最近一次告警时间之后 并且 当前告警正在告警中才算有效 public boolean isEffectiveRelieve(long timestamp) { - return timestamp > trigger.alarmTime && trigger.isAlarming(); + return (timestamp > trigger.alarmTime && trigger.isAlarming()) || trigger.isCacheMiss(); } //有效的触发时间 public boolean isEffectiveTime(long timestamp) { - return timestamp > relieve.reliveTime && timestamp > trigger.alarmTime; + return (timestamp > relieve.reliveTime && timestamp > trigger.alarmTime) || trigger.isCacheMiss(); } } @@ -499,6 +508,10 @@ public class DefaultAlarmHandler implements AlarmHandler { long lastAlarmTime; + public boolean isCacheMiss() { + return state == 0x00 && alarmTime == 0L && lastAlarmTime == 0L; + } + public boolean isAlarming() { return state == stateAlarming; } @@ -548,7 +561,6 @@ public class DefaultAlarmHandler implements AlarmHandler { } } - @Getter @Setter public static class RelieveCache implements Externalizable { diff --git a/pom.xml b/pom.xml index eadcb601..cb0cba9a 100644 --- a/pom.xml +++ b/pom.xml @@ -49,7 +49,7 @@ 7.17.26 3.12.0 1.2.83 - 2024.0.1 + 2024.0.8 4.5.11 2.24.3 2.0.16