From 57d2f5fdc9de72ce9ed3d6ac806a02af812a0ab8 Mon Sep 17 00:00:00 2001 From: zhou-hao Date: Thu, 11 Mar 2021 14:30:11 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=A0=E9=99=A4=E5=B7=B2=E7=BB=8F=E5=BA=9F?= =?UTF-8?q?=E5=BC=83=E7=9A=84=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../gateway/DefaultTopicMessage.java | 21 ------ .../community/gateway/DeviceMessageUtils.java | 11 --- .../community/gateway/TopicMessage.java | 66 ----------------- .../community/gateway/TopicMessageWrap.java | 57 --------------- .../gateway/rule/TopicMessageCodec.java | 73 ------------------- .../gateway/spring/ProxyMessageListener.java | 7 -- .../web/response/TopicMessageResponse.java | 28 ------- 7 files changed, 263 deletions(-) delete mode 100644 jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/DefaultTopicMessage.java delete mode 100644 jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/TopicMessage.java delete mode 100644 jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/TopicMessageWrap.java delete mode 100644 jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/rule/TopicMessageCodec.java delete mode 100644 jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/web/response/TopicMessageResponse.java diff --git a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/DefaultTopicMessage.java b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/DefaultTopicMessage.java deleted file mode 100644 index 5f55faa0..00000000 --- a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/DefaultTopicMessage.java +++ /dev/null @@ -1,21 +0,0 @@ -package org.jetlinks.community.gateway; - -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.Setter; -import org.jetlinks.core.message.codec.EncodedMessage; -import org.jetlinks.core.message.codec.Transport; - -@Getter -@Setter -@AllArgsConstructor -class DefaultTopicMessage implements TopicMessage { - private String topic; - - private EncodedMessage message; - - public DefaultTopicMessage(String topic,Object message){ - this.topic=topic; - - } -} diff --git a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/DeviceMessageUtils.java b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/DeviceMessageUtils.java index 5bda65ea..8f295e5c 100644 --- a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/DeviceMessageUtils.java +++ b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/DeviceMessageUtils.java @@ -15,17 +15,6 @@ import java.util.Optional; public class DeviceMessageUtils { - @SuppressWarnings("all") - public static Optional convert(TopicMessage message){ - Object nativeMessage = message.convertMessage(); - if (nativeMessage instanceof DeviceMessage) { - return Optional.of((DeviceMessage)nativeMessage); - } else if (nativeMessage instanceof Map) { - return MessageType.convertMessage(((Map) nativeMessage)); - } - return Optional.empty(); - } - public static Optional convert(TopicPayload message) { return Optional.of(message.decode(DeviceMessage.class)); } diff --git a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/TopicMessage.java b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/TopicMessage.java deleted file mode 100644 index 64e6847e..00000000 --- a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/TopicMessage.java +++ /dev/null @@ -1,66 +0,0 @@ -package org.jetlinks.community.gateway; - -import com.alibaba.fastjson.JSON; -import io.netty.buffer.ByteBufUtil; -import org.jetlinks.core.message.codec.EncodedMessage; -import org.jetlinks.supports.utils.MqttTopicUtils; - -import javax.annotation.Nonnull; -import java.nio.charset.StandardCharsets; -import java.util.Map; - -@Deprecated -public interface TopicMessage { - - /** - * 主题: 格式为: /group/1/user/1, 支持通配符: **(多层路径),*(单层路径) - * - *
-     *     /group/** , /group/下的全部topic.包括子目录
-     *     /group/1/* , /group/1/下的topic. 不包括子目录
-     * 
- * - * @return topic - */ - @Nonnull - String getTopic(); - - /** - * @return 已编码的消息 - * @see org.jetlinks.core.message.codec.MqttMessage - */ - @Nonnull - EncodedMessage getMessage(); - - default Map getTopicVars(String pattern) { - return MqttTopicUtils.getPathVariables(pattern, getTopic()); - } - - default Object convertMessage() { - if (getMessage() instanceof EncodableMessage) { - return ((EncodableMessage) getMessage()).getNativePayload(); - } - byte[] payload = getMessage().payloadAsBytes(); - //maybe json - if (/* { }*/(payload[0] == 123 && payload[payload.length - 1] == 125) - || /* [ ] */(payload[0] == 91 && payload[payload.length - 1] == 93) - ) { - return JSON.parseObject(new String(payload)); - } - if (ByteBufUtil.isText(getMessage().getPayload(), StandardCharsets.UTF_8)) { - return getMessage().payloadAsString(); - } - return payload; - } - - static TopicMessage of(String topic, EncodedMessage message) { - return new DefaultTopicMessage(topic, message); - } - - static TopicMessage of(String topic, Object payload) { - if (payload instanceof EncodedMessage) { - return of(topic, ((EncodedMessage) payload)); - } - return of(topic, EncodableMessage.of(payload)); - } -} diff --git a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/TopicMessageWrap.java b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/TopicMessageWrap.java deleted file mode 100644 index 37074f7c..00000000 --- a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/TopicMessageWrap.java +++ /dev/null @@ -1,57 +0,0 @@ -package org.jetlinks.community.gateway; - -import io.netty.buffer.ByteBuf; -import org.jetlinks.core.NativePayload; -import org.jetlinks.core.Payload; -import org.jetlinks.core.event.TopicPayload; -import org.jetlinks.core.message.codec.EncodedMessage; - -import javax.annotation.Nonnull; - -public class TopicMessageWrap implements TopicMessage { - - private String topic; - - private EncodedMessage message; - - public static TopicMessageWrap wrap(TopicPayload topicPayload) { - Payload payload = topicPayload.getPayload(); - TopicMessageWrap wrap = new TopicMessageWrap(); - wrap.topic = topicPayload.getTopic(); - if (payload instanceof NativePayload) { - wrap.message = new EncodableMessage() { - @Override - public Object getNativePayload() { - return ((NativePayload) payload).getNativeObject(); - } - - @Nonnull - @Override - public ByteBuf getPayload() { - return payload.getBody(); - } - }; - } else { - wrap.message = new EncodedMessage() { - @Nonnull - @Override - public ByteBuf getPayload() { - return payload.getBody(); - } - }; - } - return wrap; - } - - @Nonnull - @Override - public String getTopic() { - return topic; - } - - @Nonnull - @Override - public EncodedMessage getMessage() { - return message; - } -} diff --git a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/rule/TopicMessageCodec.java b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/rule/TopicMessageCodec.java deleted file mode 100644 index ec604b2c..00000000 --- a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/rule/TopicMessageCodec.java +++ /dev/null @@ -1,73 +0,0 @@ -package org.jetlinks.community.gateway.rule; - -import io.netty.buffer.ByteBuf; -import lombok.Getter; -import lombok.Setter; -import org.jetlinks.community.gateway.TopicMessage; -import org.jetlinks.rule.engine.api.RuleData; -import org.jetlinks.rule.engine.api.RuleDataCodec; -import org.jetlinks.rule.engine.api.RuleDataCodecs; -import org.jetlinks.rule.engine.executor.PayloadType; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.util.HashMap; -import java.util.Map; - -public class TopicMessageCodec implements RuleDataCodec { - - private static final TopicMessageCodec INSTANCE = new TopicMessageCodec(); - - static { - RuleDataCodecs.register(TopicMessage.class, INSTANCE); - } - - public static void register() { - } - - - public static TopicMessageCodec getInstance() { - return INSTANCE; - } - - @Override - public Map encode(TopicMessage data, Feature... features) { - - ByteBuf payload = data.getMessage().getPayload(); - PayloadType payloadType = PayloadType.valueOf(data.getMessage().getPayloadType().name()); - - Map map = new HashMap<>(); - map.put("topic", data.getTopic()); - map.put("message", payloadType.read(payload)); - return map; - } - - @Override - public Flux decode(RuleData data, Feature... features) { - - return Mono.fromSupplier(() -> Feature.find(TopicFeature.class, features) - .map(TopicFeature::getTopics) - .orElseThrow(() -> new UnsupportedOperationException("topics not found"))) - .flatMapMany(Flux::just) - .flatMap(topic -> data - .dataToMap() - .map(map -> TopicMessage.of(topic, map))); - } - - - public static TopicFeature feature(String... topics) { - return new TopicFeature(topics); - } - - @Getter - @Setter - public static class TopicFeature implements RuleDataCodec.Feature { - - private String[] topics; - - public TopicFeature(String... topics) { - this.topics = topics; - } - } - -} diff --git a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/spring/ProxyMessageListener.java b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/spring/ProxyMessageListener.java index 63b2fca4..2f24b420 100644 --- a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/spring/ProxyMessageListener.java +++ b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/spring/ProxyMessageListener.java @@ -3,8 +3,6 @@ package org.jetlinks.community.gateway.spring; import lombok.extern.slf4j.Slf4j; import org.hswebframework.web.bean.FastBeanCopier; import org.hswebframework.web.proxy.Proxy; -import org.jetlinks.community.gateway.TopicMessage; -import org.jetlinks.community.gateway.TopicMessageWrap; import org.jetlinks.core.NativePayload; import org.jetlinks.core.Payload; import org.jetlinks.core.codec.Codecs; @@ -17,7 +15,6 @@ import reactor.core.publisher.Mono; import java.lang.reflect.Method; import java.util.StringJoiner; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; @Slf4j @@ -80,10 +77,6 @@ class ProxyMessageListener implements MessageListener { return message; } try { - if (paramType.equals(TopicMessage.class)) { - log.warn("TopicMessage已弃用,请替换为TopicPayload! {}", method); - return TopicMessageWrap.wrap(message); - } Payload payload = message.getPayload(); Object decodedPayload; if (payload instanceof NativePayload) { diff --git a/jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/web/response/TopicMessageResponse.java b/jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/web/response/TopicMessageResponse.java deleted file mode 100644 index 0714d13d..00000000 --- a/jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/web/response/TopicMessageResponse.java +++ /dev/null @@ -1,28 +0,0 @@ -package org.jetlinks.community.network.manager.web.response; - -import lombok.Getter; -import lombok.Setter; -import org.jetlinks.community.gateway.TopicMessage; - -import java.nio.charset.StandardCharsets; - -@Getter -@Setter -public class TopicMessageResponse { - - private String topic; - - private Object message; - - public static TopicMessageResponse of(TopicMessage topicMessage) { - TopicMessageResponse response = new TopicMessageResponse(); - response.setTopic(topicMessage.getTopic()); - response.setMessage(topicMessage.getMessage().getPayload().toString(StandardCharsets.UTF_8)); - return response; - } - - public TopicMessage toTopicMessage() { - return TopicMessage.of(this.topic, this.message); - } - -}