优化数据转换

This commit is contained in:
zhou-hao 2020-06-05 14:54:37 +08:00
parent 2da42e854d
commit a5d59bf73b
3 changed files with 27 additions and 22 deletions

View File

@ -1,25 +1,22 @@
package org.jetlinks.community.gateway;
import com.alibaba.fastjson.JSON;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.MessageType;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Optional;
public class DeviceMessageUtils {
@SuppressWarnings("all")
public static Optional<DeviceMessage> convert(TopicMessage message){
if (message.getMessage() instanceof EncodableMessage) {
Object nativeMessage = ((EncodableMessage) message.getMessage()).getNativePayload();
if (nativeMessage instanceof DeviceMessage) {
return Optional.of((DeviceMessage)nativeMessage);
} else if (nativeMessage instanceof Map) {
return MessageType.convertMessage(((Map<String, Object>) nativeMessage));
}
Object nativeMessage = message.convertMessage();
if (nativeMessage instanceof DeviceMessage) {
return Optional.of((DeviceMessage)nativeMessage);
} else if (nativeMessage instanceof Map) {
return MessageType.convertMessage(((Map<String, Object>) nativeMessage));
}
return MessageType.convertMessage(JSON.parseObject(message.getMessage().getPayload().toString(StandardCharsets.UTF_8)));
return Optional.empty();
}
}

View File

@ -1,9 +1,11 @@
package org.jetlinks.community.gateway;
import com.alibaba.fastjson.JSON;
import io.netty.buffer.ByteBufUtil;
import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.rule.engine.executor.PayloadType;
import javax.annotation.Nonnull;
import java.nio.charset.StandardCharsets;
public interface TopicMessage {
@ -31,11 +33,17 @@ public interface TopicMessage {
if (getMessage() instanceof EncodableMessage) {
return ((EncodableMessage) getMessage()).getNativePayload();
}
if (getMessage().getPayloadType() == null) {
return getMessage().getBytes();
byte[] payload = getMessage().payloadAsBytes();
//maybe json
if (/* { }*/(payload[0] == 123 && payload[payload.length - 1] == 125)
|| /* [ ] */(payload[0] == 91 && payload[payload.length - 1] == 93)
) {
return JSON.parseObject(new String(payload));
}
return PayloadType.valueOf(getMessage().getPayloadType().name()).read(getMessage().getPayload());
if (ByteBufUtil.isText(getMessage().getPayload(), StandardCharsets.UTF_8)) {
return getMessage().payloadAsString();
}
return payload;
}
static TopicMessage of(String topic, EncodedMessage message) {

View File

@ -72,14 +72,14 @@ class ProxyMessageListener implements MessageListener {
return message.getMessage().getPayload();
}
if (message.getMessage() instanceof EncodableMessage) {
Object payload = ((EncodableMessage) message.getMessage()).getNativePayload();
if(paramType.isInstance(payload)){
return payload;
}
return FastBeanCopier.DEFAULT_CONVERT.convert(payload, paramType, new Class[]{});
Object payload = message.convertMessage();
if (paramType.isInstance(payload)) {
return payload;
}
return message;
if (payload instanceof byte[]) {
return payload;
}
return FastBeanCopier.DEFAULT_CONVERT.convert(payload, paramType, new Class[]{});
}