diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java index 2d1dce5a..71f82992 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java @@ -1,6 +1,7 @@ package org.jetlinks.community.device.message; import lombok.extern.slf4j.Slf4j; +import org.jetlinks.core.Values; import org.jetlinks.core.device.DeviceRegistry; import org.jetlinks.core.message.*; import org.jetlinks.core.message.event.EventMessage; @@ -14,6 +15,7 @@ import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import javax.annotation.Nonnull; +import java.util.HashMap; import java.util.function.Function; /** @@ -89,7 +91,6 @@ public class DeviceMessageConnector .then(); } - public Mono getTopic(Message message) { if (message instanceof DeviceMessage) { DeviceMessage deviceMessage = ((DeviceMessage) message); @@ -101,43 +102,16 @@ public class DeviceMessageConnector .getDevice(deviceId) //获取设备配置是可能存在的性能瓶颈 .flatMap(operator -> operator.getSelfConfigs(appendConfigHeader)) + .switchIfEmpty(Mono.fromSupplier(() -> Values.of(new HashMap<>()))) .flatMap(configs -> { configs.getAllValues().forEach(deviceMessage::addHeader); - String topic; - - // TODO: 2019/12/28 自定义topic支持? - - if (message instanceof EventMessage) { //事件 - EventMessage event = ((EventMessage) message); - topic = "/device/" + deviceMessage.getDeviceId() + "/message/event/".concat(event.getEvent()); - } else if (message instanceof ReportPropertyMessage) { //上报属性 - topic = "/device/" + deviceMessage.getDeviceId() + "/message/property/report"; - } else if (message instanceof DeviceOnlineMessage) { //设备上线 - topic = "/device/" + deviceMessage.getDeviceId() + "/online"; - } else if (message instanceof DeviceOfflineMessage) { //设备离线 - topic = "/device/" + deviceMessage.getDeviceId() + "/offline"; - } else if (message instanceof ChildDeviceMessage) { //子设备消息 - topic = "/device/" + deviceMessage.getDeviceId() + "/message/children"; + String topic = "/device/".concat(deviceId).concat(createDeviceMessageTopic(message)); + if (message instanceof ChildDeviceMessage) { //子设备消息 return onMessage(((ChildDeviceMessage) message).getChildDeviceMessage()) .thenReturn(topic); } else if (message instanceof ChildDeviceMessageReply) { //子设备消息 - topic = "/device/" + deviceMessage.getDeviceId() + "/message/children/reply"; return onMessage(((ChildDeviceMessageReply) message).getChildDeviceMessage()) .thenReturn(topic); - } else if (message instanceof ReadPropertyMessage) { //读取属性 - topic = "/device/" + deviceMessage.getDeviceId() + "/message/property/read"; - } else if (message instanceof WritePropertyMessage) { //修改属性 - topic = "/device/" + deviceMessage.getDeviceId() + "/message/property/write"; - } else if (message instanceof FunctionInvokeMessage) { //调用功能 - topic = "/device/" + deviceMessage.getDeviceId() + "/message/function/reply"; - } else if (message instanceof ReadPropertyMessageReply) { //读取属性回复 - topic = "/device/" + deviceMessage.getDeviceId() + "/message/property/read/reply"; - } else if (message instanceof WritePropertyMessageReply) { //修改属性回复 - topic = "/device/" + deviceMessage.getDeviceId() + "/message/property/write/reply"; - } else if (message instanceof FunctionInvokeMessageReply) { //调用功能回复 - topic = "/device/" + deviceMessage.getDeviceId() + "/message/function/reply"; - } else { - topic = "/device/" + deviceMessage.getDeviceId() + "/message/unknown"; } return Mono.just(topic); }); @@ -146,6 +120,49 @@ public class DeviceMessageConnector return Mono.just("/device/unknown/message/unknown"); } + public String createDeviceMessageTopic(Message message) { + if (message instanceof EventMessage) { //事件 + EventMessage event = ((EventMessage) message); + return "/message/event/".concat(event.getEvent()); + } else if (message instanceof ReportPropertyMessage) { //上报属性 + return "/message/property/report"; + } else if (message instanceof DeviceOnlineMessage) { //设备上线 + return "/online"; + } else if (message instanceof DeviceOfflineMessage) { //设备离线 + return "/offline"; + } else if (message instanceof ChildDeviceMessage) { //子设备消息 + Message msg = ((ChildDeviceMessage) message).getChildDeviceMessage(); + if (msg instanceof DeviceMessage) { + return "/message/children/".concat(((DeviceMessage) msg).getDeviceId()).concat(createDeviceMessageTopic(msg)); + } + return "/message/children/".concat(createDeviceMessageTopic(message)); + } else if (message instanceof ChildDeviceMessageReply) { //子设备消息 + Message msg = ((ChildDeviceMessageReply) message).getChildDeviceMessage(); + if (msg instanceof DeviceMessage) { + return "/message/children/reply/".concat(((DeviceMessage) msg).getDeviceId()).concat(createDeviceMessageTopic(msg)); + } + return "/message/children/reply/".concat(createDeviceMessageTopic(message)); + } else if (message instanceof ReadPropertyMessage) { //读取属性 + return "/message/property/read"; + } else if (message instanceof WritePropertyMessage) { //修改属性 + return "/message/property/write"; + } else if (message instanceof FunctionInvokeMessage) { //调用功能 + return "/message/function/reply"; + } else if (message instanceof ReadPropertyMessageReply) { //读取属性回复 + return "/message/property/read/reply"; + } else if (message instanceof WritePropertyMessageReply) { //修改属性回复 + return "/message/property/write/reply"; + } else if (message instanceof FunctionInvokeMessageReply) { //调用功能回复 + return "/message/function/reply"; + } else if (message instanceof DeviceRegisterMessage) { //注册 + return "/register"; + } else if (message instanceof DeviceUnRegisterMessage) { //注销 + return "/unregister"; + } else { + return "/message/unknown"; + } + } + @Nonnull @Override public Flux onConnection() { diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceInstanceService.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceInstanceService.java index 1d819761..398fb8d5 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceInstanceService.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceInstanceService.java @@ -21,12 +21,11 @@ import org.hswebframework.web.logger.ReactiveLogger; import org.jetlinks.community.device.entity.DeviceOperationLogEntity; import org.jetlinks.community.device.message.DeviceMessageUtils; import org.jetlinks.community.gateway.Subscription; +import org.jetlinks.community.gateway.annotation.Subscribe; import org.jetlinks.core.device.DeviceConfigKey; import org.jetlinks.core.device.DeviceOperator; import org.jetlinks.core.device.DeviceRegistry; -import org.jetlinks.core.message.DeviceMessage; -import org.jetlinks.core.message.DeviceOfflineMessage; -import org.jetlinks.core.message.DeviceOnlineMessage; +import org.jetlinks.core.message.*; import org.jetlinks.core.metadata.DataType; import org.jetlinks.core.metadata.DeviceMetadata; import org.jetlinks.core.metadata.EventMetadata; @@ -389,4 +388,48 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService autoBindChildrenDevice(ChildDeviceMessage message) { + String childId = message.getChildDeviceId(); + Message childMessage = message.getChildDeviceMessage(); + if (childMessage instanceof DeviceRegisterMessage) { + return registry.getDevice(message.getDeviceId()) + .flatMap(DeviceOperator::getState) + .flatMap(state -> createUpdate() + .set(DeviceInstanceEntity::getParentId, message.getDeviceId()) + .set(DeviceInstanceEntity::getState, DeviceState.of(state)) + .where(DeviceInstanceEntity::getId, childId) + .execute() + .then(registry + .getDevice(childId) + .flatMap(dev -> dev.setConfig(DeviceConfigKey.parentGatewayId, message.getDeviceId()))) + .then()); + } + return Mono.empty(); + } + + @Subscribe("/device/*/message/children/*/unregister") + public Mono autoUnbindChildrenDevice(ChildDeviceMessage message) { + String childId = message.getChildDeviceId(); + Message childMessage = message.getChildDeviceMessage(); + if (childMessage instanceof DeviceUnRegisterMessage) { + + return registry.getDevice(childId) + .flatMap(dev -> dev + .removeConfig(DeviceConfigKey.parentGatewayId.getKey()) + .then(dev.checkState())) + .flatMap(state -> createUpdate() + .setNull(DeviceInstanceEntity::getParentId) + .set(DeviceInstanceEntity::getState, DeviceState.of(state)) + .where(DeviceInstanceEntity::getId, childId) + .execute() + .then()); + + + } + return Mono.empty(); + } + + }