diff --git a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/DeviceGatewayManager.java b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/DeviceGatewayManager.java index da04eb49..ad6e88ae 100755 --- a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/DeviceGatewayManager.java +++ b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/DeviceGatewayManager.java @@ -18,9 +18,12 @@ package org.jetlinks.community.gateway; import org.jetlinks.community.gateway.supports.DeviceGatewayProvider; import org.jetlinks.community.network.channel.ChannelInfo; import org.jetlinks.community.network.channel.ChannelProvider; +import org.jetlinks.core.command.CommandSupport; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.util.List; +import java.util.Map; import java.util.Optional; /** @@ -100,4 +103,35 @@ public interface DeviceGatewayManager { * @return DeviceGatewayProvider */ Optional getProvider(String provider); + + /** + * 执行指定网关的指定命令,网关需要实现{@link CommandSupport}. + * + * @param gatewayId 网关ID + * @param commandId 命令ID {@link CommandSupport} + * @param body 命令参数 + * @return 执行结果 + */ + @SuppressWarnings("all") + default Mono executeCommand(String gatewayId, + String commandId, + Mono> body) { + return (Mono)Mono + .zip( + this.getGateway(gatewayId) + .filter(gateway -> gateway.isWrapperFor(CommandSupport.class)) + .cast(CommandSupport.class), + body, + (cmd, param) -> cmd.execute(cmd.createCommand(commandId).with(param)) + ) + .flatMap(val -> { + if (val instanceof Mono) { + return ((Mono) val); + } + if (val instanceof Flux) { + return ((Flux) val).collectList(); + } + return Mono.just(val); + }); + } } diff --git a/jetlinks-manager/device-manager/pom.xml b/jetlinks-manager/device-manager/pom.xml index b30bbadc..0e168803 100644 --- a/jetlinks-manager/device-manager/pom.xml +++ b/jetlinks-manager/device-manager/pom.xml @@ -106,6 +106,12 @@ ${project.version} compile + + + org.jetlinks.community + plugin-component + ${project.version} + diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/plugin/PluginDeviceConfigMetadataSupplier.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/plugin/PluginDeviceConfigMetadataSupplier.java new file mode 100644 index 00000000..3ad06a67 --- /dev/null +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/plugin/PluginDeviceConfigMetadataSupplier.java @@ -0,0 +1,138 @@ +package org.jetlinks.community.device.service.plugin; + +import lombok.AllArgsConstructor; +import org.jetlinks.core.Configurable; +import org.jetlinks.core.device.DeviceOperator; +import org.jetlinks.core.device.DeviceRegistry; +import org.jetlinks.core.metadata.ConfigMetadata; +import org.jetlinks.plugin.internal.PluginDataIdMapper; +import org.jetlinks.plugin.internal.device.DeviceGatewayPlugin; +import org.jetlinks.community.PropertyConstants; +import org.jetlinks.community.device.entity.DeviceInstanceEntity; +import org.jetlinks.community.device.service.LocalDeviceInstanceService; +import org.jetlinks.community.device.service.LocalDeviceProductService; +import org.jetlinks.community.device.spi.DeviceConfigMetadataSupplier; +import org.jetlinks.community.device.utils.DeviceCacheUtils; +import org.jetlinks.community.gateway.supports.DeviceGatewayPropertiesManager; +import org.jetlinks.community.gateway.supports.DeviceGatewayProviders; +import org.jetlinks.community.plugin.device.PluginDeviceGatewayProvider; +import org.reactivestreams.Publisher; +import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.function.BiFunction; +import java.util.function.Function; + +@Component +@AllArgsConstructor +public class PluginDeviceConfigMetadataSupplier implements DeviceConfigMetadataSupplier { + private final DeviceRegistry registry; + private final PluginDataIdMapper idMapper; + private final LocalDeviceInstanceService deviceInstanceService; + private final LocalDeviceProductService productService; + private final DeviceGatewayPropertiesManager gatewayPropertiesManager; + + @Override + public Flux getDeviceConfigMetadata(String deviceId) { + return registry + .getDevice(deviceId) + .flatMap(DeviceOperator::getProduct) + //注册中心没有则查询数据库 + .switchIfEmpty( + DeviceCacheUtils + .getDeviceOrLoad(deviceId, deviceInstanceService::findById) + .map(DeviceInstanceEntity::getProductId) + .flatMap(registry::getProduct)) + .flatMapMany(prod -> getByConfigurable(prod, prod.getId(), DeviceGatewayPlugin::getDeviceConfigMetadata)); + } + + protected Flux getByConfigurable(Configurable configurable, + String productId, + BiFunction> mapper) { + return configurable + .getConfigs(PropertyConstants.accessProvider, PropertyConstants.accessId) + .flatMapMany(values -> getByAccessProvider( + values.getString(PropertyConstants.accessProvider.getKey(), ""), + values.getString(PropertyConstants.accessId.getKey(), ""), + productId, + mapper + )); + } + + protected Flux getByAccessProvider(String accessProvider, + String accessId, + String productId, + BiFunction> mapper) { + if (!StringUtils.hasText(accessProvider)) { + return Flux.empty(); + //不应该在这里校验 +// throw new BusinessException("error.product_device_gateway_must_not_be_null", 400); + } + return Mono + .justOrEmpty( + DeviceGatewayProviders + .getProvider(accessProvider) + .filter(provider -> provider.isWrapperFor(PluginDeviceGatewayProvider.class)) + .map(provider -> provider + .unwrap(PluginDeviceGatewayProvider.class) + .getPlugin(accessId)) + ) + .flatMapMany(plugin -> idMapper + .getExternalId(PluginDataIdMapper.TYPE_PRODUCT, plugin.getId(), productId) + .flatMapMany(id -> mapper.apply(plugin, id))); + } + + @Override + public Flux getDeviceConfigMetadataByProductId(String productId) { + return registry + .getProduct(productId) + .map(prod -> getByConfigurable(prod, prod.getId(), DeviceGatewayPlugin::getDeviceConfigMetadata)) + //注册中心没有则查询数据库 + .defaultIfEmpty( + DeviceCacheUtils + .getProductOrLoad(productId, productService::findById) + .filter(product -> StringUtils.hasText(product.getAccessProvider())) + .flatMapMany(product -> getByAccessProvider( + product.getAccessProvider(), + product.getAccessId(), + product.getId(), + DeviceGatewayPlugin::getDeviceConfigMetadata + )) + ) + .flatMapMany(Function.identity()); + } + + @Override + public Flux getProductConfigMetadata(String productId) { + return registry + .getProduct(productId) + .map(prod -> getByConfigurable(prod, prod.getId(), DeviceGatewayPlugin::getProductConfigMetadata)) + //注册中心没有则查询数据库 + .defaultIfEmpty( + DeviceCacheUtils + .getProductOrLoad(productId, productService::findById) + .filter(product -> StringUtils.hasText(product.getAccessProvider())) + .flatMapMany(product -> getByAccessProvider( + product.getAccessProvider(), + product.getAccessId(), + product.getId(), + DeviceGatewayPlugin::getProductConfigMetadata + )) + ) + .flatMapMany(Function.identity()); + } + + @Override + public Flux getProductConfigMetadataByAccessId(String productId, + String accessId) { + return gatewayPropertiesManager + .getProperties(accessId) + .flatMapMany(properties -> getByAccessProvider( + properties.getProvider(), + accessId, + productId, + DeviceGatewayPlugin::getProductConfigMetadata)); + } +} diff --git a/jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/web/DeviceGatewayController.java b/jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/web/DeviceGatewayController.java index 423b4f09..0f312623 100755 --- a/jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/web/DeviceGatewayController.java +++ b/jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/web/DeviceGatewayController.java @@ -34,8 +34,10 @@ import org.jetlinks.community.network.manager.web.response.DeviceGatewayDetail; import org.jetlinks.community.network.manager.web.response.DeviceGatewayProviderInfo; import org.jetlinks.community.utils.ReactorUtils; import org.jetlinks.core.ProtocolSupports; +import org.jetlinks.core.command.CommandSupport; import org.jetlinks.core.device.session.DeviceSessionInfo; import org.jetlinks.core.device.session.DeviceSessionManager; +import org.jetlinks.core.metadata.FunctionMetadata; import org.springframework.web.bind.annotation.*; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -43,6 +45,7 @@ import reactor.util.function.Tuple2; import reactor.util.function.Tuples; import java.util.Comparator; +import java.util.Map; @RestController @RequestMapping("/gateway/device") @@ -182,4 +185,35 @@ public class DeviceGatewayController implements ReactiveServiceCrudController getEntityCommands(@PathVariable String gatewayId) { + return this.gatewayManager + .getGateway(gatewayId) + .filter(gateway -> gateway.isWrapperFor(CommandSupport.class)) + .flatMapMany(gateway -> gateway.unwrap(CommandSupport.class).getCommandMetadata()); + } + + @PostMapping(value = "/{gatewayId}/command/QueryDevicePage") + @Operation(summary = "查询网关支持的设备列表") + @Resource(id = "device-instance", name = "设备实例", merge = false) + @QueryAction + public Mono> queryInGatewayDevice(@PathVariable String gatewayId, + @RequestBody Mono> body) { + return this + .gatewayManager + .executeCommand(gatewayId, "QueryDevicePage", body); + } + + @PostMapping(value = "/{gatewayId}/command/{commandId}") + @Operation(summary = "执行网关命令") + @SaveAction + public Mono getEntityCommands(@PathVariable String gatewayId, + @PathVariable String commandId, + @RequestBody Mono> body) { + return this.gatewayManager.executeCommand(gatewayId, commandId, body); + } + + }