diff --git a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/DeviceMessageUtils.java b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/DeviceMessageUtils.java index 27870af3..34ad1683 100644 --- a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/DeviceMessageUtils.java +++ b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/DeviceMessageUtils.java @@ -1,25 +1,22 @@ package org.jetlinks.community.gateway; -import com.alibaba.fastjson.JSON; import org.jetlinks.core.message.DeviceMessage; import org.jetlinks.core.message.MessageType; -import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.Optional; public class DeviceMessageUtils { + @SuppressWarnings("all") public static Optional convert(TopicMessage message){ - if (message.getMessage() instanceof EncodableMessage) { - Object nativeMessage = ((EncodableMessage) message.getMessage()).getNativePayload(); - if (nativeMessage instanceof DeviceMessage) { - return Optional.of((DeviceMessage)nativeMessage); - } else if (nativeMessage instanceof Map) { - return MessageType.convertMessage(((Map) nativeMessage)); - } + Object nativeMessage = message.convertMessage(); + if (nativeMessage instanceof DeviceMessage) { + return Optional.of((DeviceMessage)nativeMessage); + } else if (nativeMessage instanceof Map) { + return MessageType.convertMessage(((Map) nativeMessage)); } - return MessageType.convertMessage(JSON.parseObject(message.getMessage().getPayload().toString(StandardCharsets.UTF_8))); + return Optional.empty(); } } diff --git a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/TopicMessage.java b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/TopicMessage.java index fa4a2877..ef629a0b 100644 --- a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/TopicMessage.java +++ b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/TopicMessage.java @@ -1,9 +1,11 @@ package org.jetlinks.community.gateway; +import com.alibaba.fastjson.JSON; +import io.netty.buffer.ByteBufUtil; import org.jetlinks.core.message.codec.EncodedMessage; -import org.jetlinks.rule.engine.executor.PayloadType; import javax.annotation.Nonnull; +import java.nio.charset.StandardCharsets; public interface TopicMessage { @@ -31,11 +33,17 @@ public interface TopicMessage { if (getMessage() instanceof EncodableMessage) { return ((EncodableMessage) getMessage()).getNativePayload(); } - - if (getMessage().getPayloadType() == null) { - return getMessage().getBytes(); + byte[] payload = getMessage().payloadAsBytes(); + //maybe json + if (/* { }*/(payload[0] == 123 && payload[payload.length - 1] == 125) + || /* [ ] */(payload[0] == 91 && payload[payload.length - 1] == 93) + ) { + return JSON.parseObject(new String(payload)); } - return PayloadType.valueOf(getMessage().getPayloadType().name()).read(getMessage().getPayload()); + if (ByteBufUtil.isText(getMessage().getPayload(), StandardCharsets.UTF_8)) { + return getMessage().payloadAsString(); + } + return payload; } static TopicMessage of(String topic, EncodedMessage message) { diff --git a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/spring/ProxyMessageListener.java b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/spring/ProxyMessageListener.java index f78732e5..9beaa852 100644 --- a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/spring/ProxyMessageListener.java +++ b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/spring/ProxyMessageListener.java @@ -72,14 +72,14 @@ class ProxyMessageListener implements MessageListener { return message.getMessage().getPayload(); } - if (message.getMessage() instanceof EncodableMessage) { - Object payload = ((EncodableMessage) message.getMessage()).getNativePayload(); - if(paramType.isInstance(payload)){ - return payload; - } - return FastBeanCopier.DEFAULT_CONVERT.convert(payload, paramType, new Class[]{}); + Object payload = message.convertMessage(); + if (paramType.isInstance(payload)) { + return payload; } - return message; + if (payload instanceof byte[]) { + return payload; + } + return FastBeanCopier.DEFAULT_CONVERT.convert(payload, paramType, new Class[]{}); }