优化消息发送

This commit is contained in:
zhouhao 2020-02-17 22:50:14 +08:00
parent 0761dd48c2
commit 157fa284f2
1 changed files with 31 additions and 21 deletions

View File

@ -4,9 +4,13 @@ import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.hswebframework.web.authorization.annotation.Authorize;
import org.hswebframework.web.authorization.annotation.Resource;
import org.hswebframework.web.exception.BusinessException;
import org.hswebframework.web.id.IDGenerator;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.enums.ErrorCode;
import org.jetlinks.core.exception.DeviceOperationException;
import org.jetlinks.core.message.DeviceMessageReply;
import org.jetlinks.core.message.FunctionInvokeMessageSender;
import org.jetlinks.core.message.ReadPropertyMessageSender;
import org.jetlinks.core.message.WritePropertyMessageSender;
@ -28,6 +32,7 @@ import reactor.core.publisher.Mono;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
@RestController
@RequestMapping("/device")
@ -65,16 +70,14 @@ public class DeviceMessageController {
.map(DeviceOperator::messageSender)//发送消息到设备
.map(sender -> sender.readProperty(property).messageId(IDGenerator.SNOW_FLAKE_STRING.generate()))
.flatMapMany(ReadPropertyMessageSender::send)
.map(ReadPropertyMessageReply::getProperties);
.map(mapReply(ReadPropertyMessageReply::getProperties));
}
//获取标准设备属性
@GetMapping("/standard/{deviceId}/property/{property:.+}")
@SneakyThrows
public Mono<DevicePropertiesEntity> getStandardProperty(@PathVariable String deviceId,
@PathVariable String property) {
public Mono<DevicePropertiesEntity> getStandardProperty(@PathVariable String deviceId, @PathVariable String property) {
return Mono.from(registry
.getDevice(deviceId)
.switchIfEmpty(ErrorUtils.notFound("设备不存在"))
@ -96,6 +99,7 @@ public class DeviceMessageController {
entity.setValue(value.toString());
entity.setStringValue(value.toString());
entity.setFormatValue(dataType.format(value).toString());
}
return entity;
});
@ -105,47 +109,42 @@ public class DeviceMessageController {
}
//设置设备属性
@PostMapping("setting/{deviceId}/property")
@PostMapping("/setting/{deviceId}/property")
@SneakyThrows
public Flux<?> settingProperties(@PathVariable String deviceId,
@RequestBody Map<String, Object> properties) {
public Flux<?> settingProperties(@PathVariable String deviceId, @RequestBody Map<String, Object> properties) {
return registry
.getDevice(deviceId)
.switchIfEmpty(ErrorUtils.notFound("设备不存在"))
.map(DeviceOperator::messageSender)
.map(sender -> sender
.map(operator -> operator
.messageSender()
.writeProperty()
.messageId(IDGenerator.SNOW_FLAKE_STRING.generate())
.write(properties)
)
.flatMapMany(WritePropertyMessageSender::send)
.map(WritePropertyMessageReply::getProperties);
.map(mapReply(WritePropertyMessageReply::getProperties));
}
//设备功能调用
@PostMapping("invoked/{deviceId}/function/{functionId}")
@SneakyThrows
public Flux<?> invokedFunction(@PathVariable String deviceId,
@PathVariable String functionId,
@RequestBody Map<String, Object> properties) {
public Flux<?> invokedFunction(@PathVariable String deviceId, @PathVariable String functionId, @RequestBody Map<String, Object> properties) {
return registry
.getDevice(deviceId)
.switchIfEmpty(ErrorUtils.notFound("设备不存在"))
.map(DeviceOperator::messageSender)
.map(sender -> sender
.map(operator -> operator
.messageSender()
.invokeFunction(functionId)
.messageId(IDGenerator.SNOW_FLAKE_STRING.generate())
.setParameter(properties))
.flatMapMany(FunctionInvokeMessageSender::send)
.map(FunctionInvokeMessageReply::getOutput)
;
.map(mapReply(FunctionInvokeMessageReply::getOutput));
}
//获取设备所有属性
@PostMapping("/{deviceId}/properties")
@SneakyThrows
@ -160,8 +159,19 @@ public class DeviceMessageController {
sender.readProperty(list.toArray(new String[0]))
.messageId(IDGenerator.SNOW_FLAKE_STRING.generate())
.send()))
.map(ReadPropertyMessageReply::getProperties)
;
.map(mapReply(ReadPropertyMessageReply::getProperties));
}
private static <R extends DeviceMessageReply, T> Function<R, T> mapReply(Function<R, T> function) {
return reply -> {
if (!reply.isSuccess()) {
throw new BusinessException(reply.getMessage(), reply.getCode());
}
T mapped = function.apply(reply);
if (mapped == null) {
throw new DeviceOperationException(ErrorCode.NO_REPLY);
}
return mapped;
};
}
}