From b0e383005e226dff43c2adb56fd5364e99a1536a Mon Sep 17 00:00:00 2001 From: zhou-hao Date: Thu, 19 Nov 2020 09:31:06 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96TopicMessage=E9=87=8A?= =?UTF-8?q?=E6=94=BE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../community/gateway/TopicMessage.java | 1 + .../gateway/spring/ProxyMessageListener.java | 40 +++++++++++-------- .../message/DeviceMessageMeasurement.java | 2 + .../message/NotificationsPublishProvider.java | 3 +- .../providers/DeviceAlarmProvider.java | 2 +- 5 files changed, 29 insertions(+), 19 deletions(-) diff --git a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/TopicMessage.java b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/TopicMessage.java index 7a4e473c..64e6847e 100644 --- a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/TopicMessage.java +++ b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/TopicMessage.java @@ -9,6 +9,7 @@ import javax.annotation.Nonnull; import java.nio.charset.StandardCharsets; import java.util.Map; +@Deprecated public interface TopicMessage { /** diff --git a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/spring/ProxyMessageListener.java b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/spring/ProxyMessageListener.java index fcebd9b2..27660319 100644 --- a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/spring/ProxyMessageListener.java +++ b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/spring/ProxyMessageListener.java @@ -83,31 +83,39 @@ class ProxyMessageListener implements MessageListener { log.warn("TopicMessage已弃用,请替换为TopicPayload! {}", method); return TopicMessageWrap.wrap(message); } - Payload payload = message.getPayload(); - Object decodedPayload; - if (payload instanceof NativePayload) { - decodedPayload = ((NativePayload) payload).getNativeObject(); - } else { - if (decoder == null) { - decoder = Codecs.lookup(resolvableType); + try { + Payload payload = message.getPayload(); + Object decodedPayload; + if (payload instanceof NativePayload) { + decodedPayload = ((NativePayload) payload).getNativeObject(); + } else { + if (decoder == null) { + decoder = Codecs.lookup(resolvableType); + } + decodedPayload = decoder.decode(message); } - decodedPayload = decoder.decode(message); + if (paramType.isInstance(decodedPayload)) { + return decodedPayload; + } + return FastBeanCopier.DEFAULT_CONVERT.convert(decodedPayload, paramType, resolvableType.resolveGenerics()); + } finally { + message.release(); } - if (paramType.isInstance(decodedPayload)) { - return decodedPayload; - } - - return FastBeanCopier.DEFAULT_CONVERT.convert(decodedPayload, paramType, resolvableType.resolveGenerics()); } @Override public Mono onMessage(TopicPayload message) { - return Mono.defer(() -> { - Object val = proxy.apply(target, paramType == Void.class ? null : convert(message)); + boolean paramVoid = paramType == Void.class; + try { + Object val = proxy.apply(target, paramVoid ? null : convert(message)); if (val instanceof Publisher) { return Mono.from((Publisher) val).then(); } return Mono.empty(); - }); + } finally { + if (paramVoid) { + message.release(); + } + } } } diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/message/DeviceMessageMeasurement.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/message/DeviceMessageMeasurement.java index 66fd8277..ca8dbc3e 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/message/DeviceMessageMeasurement.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/message/DeviceMessageMeasurement.java @@ -4,6 +4,7 @@ import org.jetlinks.core.device.DeviceProductOperator; import org.jetlinks.core.device.DeviceRegistry; import org.jetlinks.core.event.EventBus; import org.jetlinks.core.event.Subscription; +import org.jetlinks.core.event.TopicPayload; import org.jetlinks.core.metadata.ConfigMetadata; import org.jetlinks.core.metadata.DataType; import org.jetlinks.core.metadata.DefaultConfigMetadata; @@ -82,6 +83,7 @@ class DeviceMessageMeasurement extends StaticMeasurement { //通过订阅消息来统计实时数据量 return eventBus .subscribe(Subscription.of("real-time-device-message", "/device/**", Subscription.Feature.local, Subscription.Feature.broker)) + .doOnNext(TopicPayload::release) .window(parameter.getDuration("interval").orElse(Duration.ofSeconds(1))) .flatMap(Flux::count) .map(total -> SimpleMeasurementValue.of(total, System.currentTimeMillis())); diff --git a/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/message/NotificationsPublishProvider.java b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/message/NotificationsPublishProvider.java index 306a89d5..c4200e3b 100644 --- a/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/message/NotificationsPublishProvider.java +++ b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/message/NotificationsPublishProvider.java @@ -1,6 +1,5 @@ package org.jetlinks.community.notify.manager.message; -import com.alibaba.fastjson.JSON; import lombok.AllArgsConstructor; import org.jetlinks.community.gateway.external.Message; import org.jetlinks.community.gateway.external.SubscribeRequest; @@ -40,6 +39,6 @@ public class NotificationsPublishProvider implements SubscriptionProvider { "/notifications/user/" + request.getAuthentication().getUser().getId() + "/*/*", Subscription.Feature.local, Subscription.Feature.broker )) - .map(msg -> Message.success(request.getId(), msg.getTopic(), JSON.parseObject(msg.bodyToString()))); + .map(msg -> Message.success(request.getId(), msg.getTopic(), msg.bodyToJson(true))); } } diff --git a/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/providers/DeviceAlarmProvider.java b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/providers/DeviceAlarmProvider.java index a6dd66a8..1442b7c2 100644 --- a/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/providers/DeviceAlarmProvider.java +++ b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/providers/DeviceAlarmProvider.java @@ -59,7 +59,7 @@ public class DeviceAlarmProvider implements SubscriberProvider { Subscription.Feature.local )) .map(msg -> { - JSONObject json = msg.bodyToJson(); + JSONObject json = msg.bodyToJson(true); return Notify.of( String.format("设备[%s]发生告警:[%s]!", json.getString("deviceName"), json.getString("alarmName")),