refactor: 优化插件

This commit is contained in:
zhouhao 2025-05-30 10:58:05 +08:00
parent 0cf365e2b3
commit a809a8c33d
4 changed files with 212 additions and 0 deletions

View File

@ -18,9 +18,12 @@ package org.jetlinks.community.gateway;
import org.jetlinks.community.gateway.supports.DeviceGatewayProvider;
import org.jetlinks.community.network.channel.ChannelInfo;
import org.jetlinks.community.network.channel.ChannelProvider;
import org.jetlinks.core.command.CommandSupport;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
@ -100,4 +103,35 @@ public interface DeviceGatewayManager {
* @return DeviceGatewayProvider
*/
Optional<DeviceGatewayProvider> getProvider(String provider);
/**
* 执行指定网关的指定命令,网关需要实现{@link CommandSupport}.
*
* @param gatewayId 网关ID
* @param commandId 命令ID {@link CommandSupport}
* @param body 命令参数
* @return 执行结果
*/
@SuppressWarnings("all")
default <T> Mono<T> executeCommand(String gatewayId,
String commandId,
Mono<Map<String, Object>> body) {
return (Mono)Mono
.zip(
this.getGateway(gatewayId)
.filter(gateway -> gateway.isWrapperFor(CommandSupport.class))
.cast(CommandSupport.class),
body,
(cmd, param) -> cmd.execute(cmd.createCommand(commandId).with(param))
)
.flatMap(val -> {
if (val instanceof Mono) {
return ((Mono<?>) val);
}
if (val instanceof Flux) {
return ((Flux<?>) val).collectList();
}
return Mono.just(val);
});
}
}

View File

@ -106,6 +106,12 @@
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.jetlinks.community</groupId>
<artifactId>plugin-component</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,138 @@
package org.jetlinks.community.device.service.plugin;
import lombok.AllArgsConstructor;
import org.jetlinks.core.Configurable;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.metadata.ConfigMetadata;
import org.jetlinks.plugin.internal.PluginDataIdMapper;
import org.jetlinks.plugin.internal.device.DeviceGatewayPlugin;
import org.jetlinks.community.PropertyConstants;
import org.jetlinks.community.device.entity.DeviceInstanceEntity;
import org.jetlinks.community.device.service.LocalDeviceInstanceService;
import org.jetlinks.community.device.service.LocalDeviceProductService;
import org.jetlinks.community.device.spi.DeviceConfigMetadataSupplier;
import org.jetlinks.community.device.utils.DeviceCacheUtils;
import org.jetlinks.community.gateway.supports.DeviceGatewayPropertiesManager;
import org.jetlinks.community.gateway.supports.DeviceGatewayProviders;
import org.jetlinks.community.plugin.device.PluginDeviceGatewayProvider;
import org.reactivestreams.Publisher;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.function.BiFunction;
import java.util.function.Function;
@Component
@AllArgsConstructor
public class PluginDeviceConfigMetadataSupplier implements DeviceConfigMetadataSupplier {
private final DeviceRegistry registry;
private final PluginDataIdMapper idMapper;
private final LocalDeviceInstanceService deviceInstanceService;
private final LocalDeviceProductService productService;
private final DeviceGatewayPropertiesManager gatewayPropertiesManager;
@Override
public Flux<ConfigMetadata> getDeviceConfigMetadata(String deviceId) {
return registry
.getDevice(deviceId)
.flatMap(DeviceOperator::getProduct)
//注册中心没有则查询数据库
.switchIfEmpty(
DeviceCacheUtils
.getDeviceOrLoad(deviceId, deviceInstanceService::findById)
.map(DeviceInstanceEntity::getProductId)
.flatMap(registry::getProduct))
.flatMapMany(prod -> getByConfigurable(prod, prod.getId(), DeviceGatewayPlugin::getDeviceConfigMetadata));
}
protected Flux<ConfigMetadata> getByConfigurable(Configurable configurable,
String productId,
BiFunction<DeviceGatewayPlugin, String, Publisher<ConfigMetadata>> mapper) {
return configurable
.getConfigs(PropertyConstants.accessProvider, PropertyConstants.accessId)
.flatMapMany(values -> getByAccessProvider(
values.getString(PropertyConstants.accessProvider.getKey(), ""),
values.getString(PropertyConstants.accessId.getKey(), ""),
productId,
mapper
));
}
protected Flux<ConfigMetadata> getByAccessProvider(String accessProvider,
String accessId,
String productId,
BiFunction<DeviceGatewayPlugin, String, Publisher<ConfigMetadata>> mapper) {
if (!StringUtils.hasText(accessProvider)) {
return Flux.empty();
//不应该在这里校验
// throw new BusinessException("error.product_device_gateway_must_not_be_null", 400);
}
return Mono
.justOrEmpty(
DeviceGatewayProviders
.getProvider(accessProvider)
.filter(provider -> provider.isWrapperFor(PluginDeviceGatewayProvider.class))
.map(provider -> provider
.unwrap(PluginDeviceGatewayProvider.class)
.getPlugin(accessId))
)
.flatMapMany(plugin -> idMapper
.getExternalId(PluginDataIdMapper.TYPE_PRODUCT, plugin.getId(), productId)
.flatMapMany(id -> mapper.apply(plugin, id)));
}
@Override
public Flux<ConfigMetadata> getDeviceConfigMetadataByProductId(String productId) {
return registry
.getProduct(productId)
.map(prod -> getByConfigurable(prod, prod.getId(), DeviceGatewayPlugin::getDeviceConfigMetadata))
//注册中心没有则查询数据库
.defaultIfEmpty(
DeviceCacheUtils
.getProductOrLoad(productId, productService::findById)
.filter(product -> StringUtils.hasText(product.getAccessProvider()))
.flatMapMany(product -> getByAccessProvider(
product.getAccessProvider(),
product.getAccessId(),
product.getId(),
DeviceGatewayPlugin::getDeviceConfigMetadata
))
)
.flatMapMany(Function.identity());
}
@Override
public Flux<ConfigMetadata> getProductConfigMetadata(String productId) {
return registry
.getProduct(productId)
.map(prod -> getByConfigurable(prod, prod.getId(), DeviceGatewayPlugin::getProductConfigMetadata))
//注册中心没有则查询数据库
.defaultIfEmpty(
DeviceCacheUtils
.getProductOrLoad(productId, productService::findById)
.filter(product -> StringUtils.hasText(product.getAccessProvider()))
.flatMapMany(product -> getByAccessProvider(
product.getAccessProvider(),
product.getAccessId(),
product.getId(),
DeviceGatewayPlugin::getProductConfigMetadata
))
)
.flatMapMany(Function.identity());
}
@Override
public Flux<ConfigMetadata> getProductConfigMetadataByAccessId(String productId,
String accessId) {
return gatewayPropertiesManager
.getProperties(accessId)
.flatMapMany(properties -> getByAccessProvider(
properties.getProvider(),
accessId,
productId,
DeviceGatewayPlugin::getProductConfigMetadata));
}
}

View File

@ -34,8 +34,10 @@ import org.jetlinks.community.network.manager.web.response.DeviceGatewayDetail;
import org.jetlinks.community.network.manager.web.response.DeviceGatewayProviderInfo;
import org.jetlinks.community.utils.ReactorUtils;
import org.jetlinks.core.ProtocolSupports;
import org.jetlinks.core.command.CommandSupport;
import org.jetlinks.core.device.session.DeviceSessionInfo;
import org.jetlinks.core.device.session.DeviceSessionManager;
import org.jetlinks.core.metadata.FunctionMetadata;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -43,6 +45,7 @@ import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
import java.util.Comparator;
import java.util.Map;
@RestController
@RequestMapping("/gateway/device")
@ -182,4 +185,35 @@ public class DeviceGatewayController implements ReactiveServiceCrudController<De
}
@GetMapping(value = "/{gatewayId}/commands")
@Operation(summary = "获取网关支持的命令")
@QueryAction
public Flux<FunctionMetadata> getEntityCommands(@PathVariable String gatewayId) {
return this.gatewayManager
.getGateway(gatewayId)
.filter(gateway -> gateway.isWrapperFor(CommandSupport.class))
.flatMapMany(gateway -> gateway.unwrap(CommandSupport.class).getCommandMetadata());
}
@PostMapping(value = "/{gatewayId}/command/QueryDevicePage")
@Operation(summary = "查询网关支持的设备列表")
@Resource(id = "device-instance", name = "设备实例", merge = false)
@QueryAction
public Mono<PagerResult<Object>> queryInGatewayDevice(@PathVariable String gatewayId,
@RequestBody Mono<Map<String, Object>> body) {
return this
.gatewayManager
.executeCommand(gatewayId, "QueryDevicePage", body);
}
@PostMapping(value = "/{gatewayId}/command/{commandId}")
@Operation(summary = "执行网关命令")
@SaveAction
public Mono<Object> getEntityCommands(@PathVariable String gatewayId,
@PathVariable String commandId,
@RequestBody Mono<Map<String, Object>> body) {
return this.gatewayManager.executeCommand(gatewayId, commandId, body);
}
}