Merge remote-tracking branch 'origin/master'

This commit is contained in:
zhouhao 2023-02-28 15:55:33 +08:00
commit 703012c2c3
19 changed files with 1198 additions and 13 deletions

View File

@ -0,0 +1,65 @@
package org.jetlinks.community;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.jetlinks.core.utils.SerializeUtils;
import reactor.util.context.Context;
import reactor.util.context.ContextView;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Optional;
@AllArgsConstructor(staticName = "of")
@NoArgsConstructor
@Getter
@Setter
public class OperationSource implements Externalizable {
private static final long serialVersionUID = 1L;
/**
* ID,type对应操作的唯一标识
*/
private String id;
/**
* 操作源名称
*/
private String name;
/**
* 操作目标,通常为ID对应的详情数据
*/
private Object data;
public static OperationSource of(String id, Object data) {
return of(id, id, data);
}
public static Context ofContext(String id, String name, Object data) {
return Context.of(OperationSource.class, of(id, name, data));
}
public static Optional<OperationSource> fromContext(ContextView ctx) {
return ctx.getOrEmpty(OperationSource.class);
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
out.writeUTF(id);
SerializeUtils.writeObject(name, out);
SerializeUtils.writeObject(data, out);
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
id = in.readUTF();
name = (String) SerializeUtils.readObject(in);
data = SerializeUtils.readObject(in);
}
}

View File

@ -1,3 +1,5 @@
package org.jetlinks.community.script;
import lombok.*;
@ -21,4 +23,4 @@ public class Script {
return of(name, content, source);
}
}
}

View File

@ -89,4 +89,4 @@ public interface ScriptFactory {
<T> T bind(Script script,
Class<T> interfaceType);
}
}

View File

@ -15,8 +15,10 @@ import org.hswebframework.web.authorization.annotation.*;
import org.hswebframework.web.authorization.exception.UnAuthorizedException;
import org.hswebframework.web.crud.service.ReactiveCrudService;
import org.hswebframework.web.crud.web.reactive.ReactiveServiceCrudController;
import org.hswebframework.web.exception.ValidationException;
import org.hswebframework.web.i18n.LocaleUtils;
import org.hswebframework.web.system.authorization.defaults.service.DefaultPermissionService;
import org.hswebframework.web.validator.CreateGroup;
import org.jetlinks.community.auth.configuration.MenuProperties;
import org.jetlinks.community.auth.entity.MenuEntity;
import org.jetlinks.community.auth.entity.MenuView;
@ -24,6 +26,7 @@ import org.jetlinks.community.auth.service.DefaultMenuService;
import org.jetlinks.community.auth.service.MenuGrantService;
import org.jetlinks.community.auth.service.request.MenuGrantRequest;
import org.jetlinks.community.auth.web.request.AuthorizationSettingDetail;
import org.jetlinks.community.web.response.ValidationResult;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
@ -263,6 +266,31 @@ public class MenuController implements ReactiveServiceCrudController<MenuEntity,
);
}
@GetMapping("/code/_validate")
@QueryAction
@Operation(summary = "验证菜单编码是否合法", description = "同一所有者的相同应用下的菜单,编码不能重复")
public Mono<ValidationResult> codeValidate(@RequestParam @Parameter(description = "菜单编码") String code,
@RequestParam(required = false)
@Parameter(description = "外部菜单所属应用ID") String appId,
@RequestParam @Parameter(description = "菜单所有者") String owner) {
return LocaleUtils.currentReactive()
.flatMap(locale -> {
MenuEntity entity = new MenuEntity();
entity.setCode(code);
entity.setOwner(owner);
entity.tryValidate("code", CreateGroup.class);
return defaultMenuService
.findById(entity.getId())
.map(menu -> ValidationResult
.error(LocaleUtils.resolveMessage("error.id_already_exists", locale)));
})
.defaultIfEmpty(ValidationResult.success())
.onErrorResume(ValidationException.class, e -> Mono.just(e.getI18nCode())
.map(ValidationResult::error));
}
private Mono<AuthorizationSettingDetail> getAuthorizationSettingDetail(Flux<MenuView> menus) {
return Mono
.zip(menus

View File

@ -0,0 +1,56 @@
package org.jetlinks.community.auth.web;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.AllArgsConstructor;
import org.hswebframework.web.authorization.annotation.Authorize;
import org.hswebframework.web.authorization.annotation.QueryAction;
import org.hswebframework.web.authorization.annotation.Resource;
import org.hswebframework.web.exception.ValidationException;
import org.hswebframework.web.i18n.LocaleUtils;
import org.hswebframework.web.system.authorization.api.entity.PermissionEntity;
import org.hswebframework.web.system.authorization.defaults.service.DefaultPermissionService;
import org.hswebframework.web.validator.CreateGroup;
import org.jetlinks.community.web.response.ValidationResult;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
/**
* 权限管理.
*
* @author zhangji 2022/12/23
*/
@RestController
@RequestMapping("/permission")
@Authorize
@Resource(id = "permission", name = "权限管理", group = "system")
@Tag(name = "权限管理")
@AllArgsConstructor
public class PermissionController {
private final DefaultPermissionService permissionService;
@GetMapping("/id/_validate")
@QueryAction
@Operation(summary = "验证权限ID是否合法")
public Mono<ValidationResult> permissionIdValidate2(@RequestParam @Parameter(description = "权限ID") String id) {
return LocaleUtils.currentReactive()
.flatMap(locale -> {
PermissionEntity entity = new PermissionEntity();
entity.setId(id);
entity.tryValidate("id", CreateGroup.class);
return permissionService
.findById(id)
.map(permission -> ValidationResult
.error(LocaleUtils.resolveMessage("error.id_already_exists", locale)));
})
.defaultIfEmpty(ValidationResult.success())
.onErrorResume(ValidationException.class, e -> Mono.just(e.getI18nCode())
.map(ValidationResult::error));
}
}

View File

@ -99,6 +99,13 @@
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.jetlinks.community</groupId>
<artifactId>script-component</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,86 @@
package org.jetlinks.community.device.entity;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.codec.digest.DigestUtils;
import org.hswebframework.ezorm.rdb.mapping.annotation.ColumnType;
import org.hswebframework.ezorm.rdb.mapping.annotation.DefaultValue;
import org.hswebframework.ezorm.rdb.mapping.annotation.JsonCodec;
import org.hswebframework.web.api.crud.entity.GenericEntity;
import org.hswebframework.web.api.crud.entity.RecordCreationEntity;
import org.hswebframework.web.api.crud.entity.RecordModifierEntity;
import org.hswebframework.web.crud.annotation.EnableEntityEvent;
import org.hswebframework.web.crud.generator.Generators;
import org.jetlinks.community.device.message.transparent.TransparentMessageCodecProvider;
import org.springframework.util.StringUtils;
import javax.persistence.Column;
import javax.persistence.Table;
import java.sql.JDBCType;
import java.util.Map;
@Getter
@Setter
@Table(name = "dev_transparent_codec")
@Schema(description = "透传消息解析器")
@EnableEntityEvent
public class TransparentMessageCodecEntity extends GenericEntity<String> implements RecordCreationEntity, RecordModifierEntity {
@Schema(description = "产品ID")
@Column(length = 64, nullable = false, updatable = false)
private String productId;
@Schema(description = "设备ID")
@Column(length = 64, updatable = false)
private String deviceId;
/**
* @see TransparentMessageCodecProvider#getProvider()
*/
@Schema(description = "编解码器提供商,如: jsr223")
@Column(length = 64, nullable = false)
private String provider;
/**
* 编解码配置
*
* @see TransparentMessageCodecProvider#createCodec(Map)
*/
@Schema(description = "编解码配置")
@Column(nullable = false)
@ColumnType(jdbcType = JDBCType.LONGVARCHAR, javaType = String.class)
@JsonCodec
private Map<String, Object> configuration;
@Schema(description = "创建人ID")
@Column(length = 64, nullable = false, updatable = false)
private String creatorId;
@Schema(description = "创建时间")
@Column(updatable = false)
@DefaultValue(generator = Generators.CURRENT_TIME)
private Long createTime;
@Schema(description = "修改人ID")
@Column(length = 64)
private String modifierId;
@Schema(description = "修改时间")
@DefaultValue(generator = Generators.CURRENT_TIME)
private Long modifyTime;
@Override
public String getId() {
if (!StringUtils.hasText(super.getId())) {
super.setId(
createId(productId, deviceId)
);
}
return super.getId();
}
public static String createId(String productId, String deviceId) {
return DigestUtils.md5Hex(String.join("|", productId, deviceId));
}
}

View File

@ -0,0 +1,285 @@
package org.jetlinks.community.device.message.transparent;
import com.alibaba.fastjson.JSON;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
import lombok.NonNull;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.collections4.MapUtils;
import org.jetlinks.community.OperationSource;
import org.jetlinks.community.PropertyConstants;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DirectDeviceMessage;
import org.jetlinks.core.message.MessageType;
import org.jetlinks.core.message.function.ThingFunctionInvokeMessage;
import org.jetlinks.core.message.property.ReadThingPropertyMessage;
import org.jetlinks.core.message.property.ReportPropertyMessage;
import org.jetlinks.core.message.property.WriteThingPropertyMessage;
import org.jetlinks.core.utils.TopicUtils;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.*;
import java.util.function.Function;
import java.util.function.Supplier;
@Slf4j
public class SimpleTransparentMessageCodec implements TransparentMessageCodec {
@NonNull
protected final Codec codec;
public SimpleTransparentMessageCodec(@NonNull Codec codec) {
this.codec = codec;
}
@Override
public final Mono<DirectDeviceMessage> encode(DeviceMessage message) {
return Mono.defer(() -> {
EncodeContext context = new EncodeContext(message);
codec.encode(context);
if (context.payload != null) {
DirectDeviceMessage msg = new DirectDeviceMessage();
msg.setPayload(ByteBufUtil.getBytes(context.payload));
//release
ReferenceCountUtil.safeRelease(context.payload);
msg.setMessageId(message.getMessageId());
msg.setDeviceId(message.getDeviceId());
if (null != message.getHeaders()) {
message.getHeaders().forEach(msg::addHeader);
}
context.headers.forEach(msg::addHeader);
return Mono.just(msg);
}
return Mono.empty();
});
}
@Override
public Flux<DeviceMessage> decode(DirectDeviceMessage message) {
return Mono
.fromCallable(() -> codec.decode(new DecodeContext(message)))
.flatMapMany(this::convert)
.doOnNext(msg -> {
String from = message.getMessageId();
if (from == null) {
from = message.getHeader(PropertyConstants.uid).orElse(null);
}
if (from != null) {
msg.addHeader("decodeFrom", from);
}
msg.thingId(message.getThingType(), message.getThingId());
});
}
@SuppressWarnings("all")
protected Flux<DeviceMessage> convert(Object msg) {
if (msg == null) {
return Flux.empty();
}
if (msg instanceof DeviceMessage) {
return Flux.just(((DeviceMessage) msg));
}
if (msg instanceof Map) {
if (MapUtils.isEmpty(((Map) msg))) {
return Flux.empty();
}
MessageType type = MessageType.of(((Map<String, Object>) msg)).orElse(MessageType.UNKNOWN);
if (type == MessageType.UNKNOWN) {
//返回map但是未设备未设备消息,则转为属性上报
return Flux.just(new ReportPropertyMessage().properties(((Map) msg)));
}
return Mono
.justOrEmpty(type.convert(((Map) msg)))
.flux()
.cast(DeviceMessage.class);
}
if (msg instanceof Collection) {
return Flux
.fromIterable(((Collection<?>) msg))
.flatMap(this::convert);
}
if (msg instanceof Publisher) {
return Flux
.from(((Publisher<?>) msg))
.flatMap(this::convert);
}
return Flux.error(new UnsupportedOperationException("unsupported data:" + msg));
}
public static class DecodeContext {
final DirectDeviceMessage msg;
final ByteBuf buffer;
DecodeContext(DirectDeviceMessage msg) {
this.msg = msg;
this.buffer = msg.asByteBuf();
}
public long timestamp() {
return msg.getTimestamp();
}
public ByteBuf payload() {
return buffer;
}
public Object json() {
return JSON.parse(buffer.array());
}
public Map<String, String> pathVars(String pattern, String path) {
return TopicUtils.getPathVariables(pattern, path);
}
public String url() {
return msg.getHeader("url")
.map(String::valueOf)
.orElse(null);
}
public String topic() {
return msg.getHeader("topic")
.map(String::valueOf)
.orElse(null);
}
public DirectDeviceMessage message() {
return msg;
}
}
/**
* <pre>{@code
*
* context
* .whenReadProperty("temp",()->return "0x0122")
* .whenFunction("func",args->{
*
* })
*
* }</pre>
*/
public static class EncodeContext {
private final DeviceMessage source;
private ByteBuf payload;
private final Map<String, Object> headers = new HashMap<>();
public EncodeContext(DeviceMessage source) {
this.source = source;
}
public DeviceMessage message() {
return source;
}
public EncodeContext topic(String topic) {
headers.put("topic", topic);
return this;
}
public ByteBuf payload() {
return payload == null ? payload = Unpooled.buffer() : payload;
}
public ByteBuf newBuffer() {
return Unpooled.buffer();
}
@SneakyThrows
public EncodeContext setPayload(String strOrHex, String charset) {
if (strOrHex.startsWith("0x")) {
payload().writeBytes(Hex.decodeHex(strOrHex.substring(2)));
} else {
payload().writeBytes(strOrHex.getBytes(charset));
}
return this;
}
@SneakyThrows
public EncodeContext setPayload(String strOrHex) {
setPayload(strOrHex, "utf-8");
return this;
}
public EncodeContext setPayload(Object data) {
if (data instanceof String) {
setPayload(((String) data));
}
if (data instanceof byte[]) {
payload().writeBytes(((byte[]) data));
}
if (data instanceof ByteBuf) {
this.payload = ((ByteBuf) data);
}
//todo 更多类型?
return this;
}
public EncodeContext whenFunction(String functionId, Function<Object, Object> supplier) {
if (source instanceof ThingFunctionInvokeMessage) {
ThingFunctionInvokeMessage<?> msg = ((ThingFunctionInvokeMessage<?>) source);
if ("*".equals(msg.getFunctionId()) || Objects.equals(functionId, msg.getFunctionId())) {
setPayload(supplier.apply(msg.inputsToMap()));
}
}
return this;
}
public EncodeContext whenWriteProperty(String property, Function<Object, Object> supplier) {
if (source instanceof WriteThingPropertyMessage) {
if ("*".equals(property)) {
setPayload(supplier.apply(((WriteThingPropertyMessage<?>) source).getProperties()));
return this;
}
Object value = ((WriteThingPropertyMessage<?>) source).getProperties().get(property);
if (value != null) {
setPayload(supplier.apply(value));
}
}
return this;
}
public EncodeContext whenReadProperties(Function<List<String>, Object> supplier) {
if (source instanceof ReadThingPropertyMessage) {
setPayload(supplier.apply(((ReadThingPropertyMessage<?>) source).getProperties()));
}
return this;
}
public EncodeContext whenReadProperty(String property, Supplier<Object> supplier) {
if (source instanceof ReadThingPropertyMessage) {
if ("*".equals(property) || ((ReadThingPropertyMessage<?>) source).getProperties().contains(property)) {
setPayload(supplier.get());
}
}
return this;
}
}
public interface Codec {
Object decode(DecodeContext context);
Object encode(EncodeContext context);
}
}

View File

@ -0,0 +1,197 @@
package org.jetlinks.community.device.message.transparent;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
import org.hswebframework.web.crud.events.EntityCreatedEvent;
import org.hswebframework.web.crud.events.EntityDeletedEvent;
import org.hswebframework.web.crud.events.EntityModifyEvent;
import org.hswebframework.web.crud.events.EntitySavedEvent;
import org.hswebframework.web.exception.ValidationException;
import org.jctools.maps.NonBlockingHashMap;
import org.jetlinks.core.device.DeviceConfigKey;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DirectDeviceMessage;
import org.jetlinks.core.message.Headers;
import org.jetlinks.core.message.interceptor.DeviceMessageSenderInterceptor;
import org.jetlinks.community.OperationSource;
import org.jetlinks.community.device.entity.TransparentMessageCodecEntity;
import org.jetlinks.community.gateway.annotation.Subscribe;
import org.jetlinks.supports.server.DecodedClientMessageHandler;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Map;
@Slf4j
@Component
public class TransparentDeviceMessageConnector implements CommandLineRunner, DeviceMessageSenderInterceptor {
private final ReactiveRepository<TransparentMessageCodecEntity, String> repository;
private final DecodedClientMessageHandler messageHandler;
private final EventBus eventBus;
private final Map<CacheKey, TransparentMessageCodec> codecs = new NonBlockingHashMap<>();
public TransparentDeviceMessageConnector(@SuppressWarnings("all")
ReactiveRepository<TransparentMessageCodecEntity, String> repository,
DecodedClientMessageHandler messageHandler,
EventBus eventBus,
ObjectProvider<TransparentMessageCodecProvider> providers) {
this.repository = repository;
this.messageHandler = messageHandler;
this.eventBus = eventBus;
for (TransparentMessageCodecProvider provider : providers) {
TransparentMessageCodecProviders.addProvider(provider);
}
}
@Subscribe("/device/*/*/message/direct")
public Mono<Void> handleMessage(DirectDeviceMessage message) {
String productId = message.getHeaderOrDefault(Headers.productId);
String deviceId = message.getDeviceId();
TransparentMessageCodec codec = getCodecOrNull(productId, deviceId);
if (null == codec) {
return Mono.empty();
}
return codec
.decode(message)
.flatMap(msg -> messageHandler.handleMessage(null, msg))
.then();
}
private TransparentMessageCodec getCodecOrNull(String productId, String deviceId) {
CacheKey cacheKey = new CacheKey(productId, deviceId);
TransparentMessageCodec codec = codecs.get(cacheKey);
if (codec == null) {
cacheKey.setDeviceId(null);
codec = codecs.get(cacheKey);
}
return codec;
}
@Override
public Mono<DeviceMessage> preSend(DeviceOperator device, DeviceMessage message) {
return device
.getSelfConfig(DeviceConfigKey.productId)
.mapNotNull(productId -> getCodecOrNull(productId, device.getDeviceId()))
.<DeviceMessage>flatMap(codec -> codec
.encode(message)
.doOnNext(msg -> {
msg.addHeader("encodeBy", message.getMessageType().name());
//所有透传消息都设置为异步
msg.addHeader(Headers.async, true);
// msg.addHeader(Headers.sendAndForget, true);
})
)
.defaultIfEmpty(message);
}
@Subscribe(value = "/_sys/transparent-codec/load", features = Subscription.Feature.broker)
public Mono<Void> doLoadCodec(TransparentMessageCodecEntity entity) {
CacheKey key = new CacheKey(entity.getProductId(), entity.getDeviceId());
TransparentMessageCodecProvider provider = TransparentMessageCodecProviders
.getProvider(entity.getProvider())
.orElseThrow(() -> new ValidationException("codec", "error.unsupported_codec", entity.getProvider()));
return provider
.createCodec(entity.getConfiguration())
.doOnNext(codec -> codecs.put(key, codec))
.contextWrite(OperationSource.ofContext(entity.getId(),null,entity))
.switchIfEmpty(Mono.fromRunnable(() -> codecs.remove(key)))
.then();
}
@Subscribe(value = "/_sys/transparent-codec/removed", features = Subscription.Feature.broker)
public Mono<Void> doRemoveCodec(TransparentMessageCodecEntity entity) {
CacheKey key = new CacheKey(entity.getProductId(), entity.getDeviceId());
codecs.remove(key);
return Mono.empty();
}
@EventListener
public void handleEntityEvent(EntityCreatedEvent<TransparentMessageCodecEntity> event) {
event.async(
Flux.fromIterable(event.getEntity())
.flatMap(this::loadCodec)
);
}
@EventListener
public void handleEntityEvent(EntitySavedEvent<TransparentMessageCodecEntity> event) {
event.async(
Flux.fromIterable(event.getEntity())
.flatMap(this::loadCodec)
);
}
@EventListener
public void handleEntityEvent(EntityModifyEvent<TransparentMessageCodecEntity> event) {
event.async(
Flux.fromIterable(event.getAfter())
.flatMap(this::loadCodec)
);
}
@EventListener
public void handleEntityEvent(EntityDeletedEvent<TransparentMessageCodecEntity> event) {
event.async(
Flux.fromIterable(event.getEntity())
.flatMap(this::removeCodec)
);
}
public Mono<Void> loadCodec(TransparentMessageCodecEntity entity) {
return doLoadCodec(entity)
.then(
eventBus
.publish("/_sys/transparent-codec/load", entity)
.then()
);
}
public Mono<Void> removeCodec(TransparentMessageCodecEntity entity) {
return doRemoveCodec(entity)
.then(
eventBus
.publish("/_sys/transparent-codec/removed", entity)
.then()
);
}
@Override
public void run(String... args) throws Exception {
repository
.createQuery()
.fetch()
.flatMap(e -> this
.doLoadCodec(e)
.onErrorResume(err -> {
log.error("load transparent device message codec [{}:{}] error", e.getId(), e.getProvider(), err);
return Mono.empty();
}))
.subscribe();
}
@Getter
@Setter
@EqualsAndHashCode
@AllArgsConstructor
static class CacheKey {
private String productId;
private String deviceId;
}
}

View File

@ -0,0 +1,14 @@
package org.jetlinks.community.device.message.transparent;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DirectDeviceMessage;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public interface TransparentMessageCodec {
Flux<DeviceMessage> decode(DirectDeviceMessage message);
Mono<DirectDeviceMessage> encode(DeviceMessage message);
}

View File

@ -0,0 +1,13 @@
package org.jetlinks.community.device.message.transparent;
import reactor.core.publisher.Mono;
import java.util.Map;
public interface TransparentMessageCodecProvider {
String getProvider();
Mono<TransparentMessageCodec> createCodec(Map<String,Object> configuration);
}

View File

@ -0,0 +1,32 @@
package org.jetlinks.community.device.message.transparent;
import org.hswebframework.web.exception.I18nSupportException;
import org.jctools.maps.NonBlockingHashMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public class TransparentMessageCodecProviders {
public static Map<String, TransparentMessageCodecProvider> providers = new NonBlockingHashMap<>();
static void addProvider(TransparentMessageCodecProvider provider) {
providers.put(provider.getProvider(), provider);
}
public static List<TransparentMessageCodecProvider> getProviders() {
return new ArrayList<>(providers.values());
}
public static Optional<TransparentMessageCodecProvider> getProvider(String provider) {
return Optional.ofNullable(providers.get(provider));
}
public static TransparentMessageCodecProvider getProviderNow(String provider) {
return getProvider(provider)
.orElseThrow(()->new I18nSupportException("error.unsupported_codec",provider));
}
}

View File

@ -0,0 +1,86 @@
package org.jetlinks.community.device.message.transparent.script;
import org.hswebframework.web.exception.ValidationException;
import org.jetlinks.community.device.message.transparent.SimpleTransparentMessageCodec;
import org.jetlinks.community.device.message.transparent.TransparentMessageCodec;
import org.jetlinks.community.device.message.transparent.TransparentMessageCodecProvider;
import org.jetlinks.community.script.Script;
import org.jetlinks.community.script.Scripts;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import reactor.core.publisher.Mono;
import java.util.Map;
import java.util.function.Function;
@Component
public class Jsr223TransparentMessageCodecProvider implements TransparentMessageCodecProvider {
@Override
public String getProvider() {
return "jsr223";
}
@Override
public Mono<TransparentMessageCodec> createCodec(Map<String, Object> configuration) {
String lang = (String) configuration.getOrDefault("lang", "js");
String script = (String) configuration.get("script");
Assert.hasText(lang, "lang can not be null");
Assert.hasText(script, "script can not be null");
CodecContext context = new CodecContext();
SimpleTransparentMessageCodec.Codec codec = Scripts
.getFactory(lang)
.bind(Script.of("jsr223-transparent", script),
SimpleTransparentMessageCodec.Codec.class);
if (context.encoder == null && codec != null) {
context.onDownstream(codec::encode);
}
if (context.decoder == null && codec != null) {
context.onUpstream(codec::decode);
}
if (codec == null && context.encoder == null && context.decoder == null) {
return Mono.error(new ValidationException("script", "error.codec_message_undefined"));
}
return Mono
.deferContextual(ctx -> Mono
.just(
new SimpleTransparentMessageCodec(context)
));
}
public static class CodecContext implements SimpleTransparentMessageCodec.Codec {
private Function<SimpleTransparentMessageCodec.EncodeContext, Object> encoder;
private Function<SimpleTransparentMessageCodec.DecodeContext, Object> decoder;
public void onDownstream(Function<SimpleTransparentMessageCodec.EncodeContext, Object> encoder) {
this.encoder = encoder;
}
public void onUpstream(Function<SimpleTransparentMessageCodec.DecodeContext, Object> decoder) {
this.decoder = decoder;
}
@Override
public Object decode(SimpleTransparentMessageCodec.DecodeContext context) {
if (decoder == null) {
return null;
}
return decoder.apply(context);
}
@Override
public Object encode(SimpleTransparentMessageCodec.EncodeContext context) {
if (encoder == null) {
return null;
}
return encoder.apply(context);
}
}
}

View File

@ -0,0 +1,159 @@
package org.jetlinks.community.device.web;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.AllArgsConstructor;
import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
import org.hswebframework.web.authorization.annotation.QueryAction;
import org.hswebframework.web.authorization.annotation.Resource;
import org.hswebframework.web.authorization.annotation.SaveAction;
import org.hswebframework.web.i18n.LocaleUtils;
import org.jetlinks.community.device.entity.TransparentMessageCodecEntity;
import org.jetlinks.community.device.message.transparent.TransparentMessageCodecProviders;
import org.jetlinks.community.device.web.request.TransparentMessageCodecRequest;
import org.jetlinks.community.device.web.request.TransparentMessageDecodeRequest;
import org.jetlinks.community.device.web.response.TransparentMessageDecodeResponse;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceProductOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.metadata.DeviceMetadata;
import org.jetlinks.core.utils.TypeScriptUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Mono;
@RestController
@RequestMapping("/device/transparent-codec")
@Tag(name = "设备透传消息解析配置")
@AllArgsConstructor
@Resource(id = "transparent-codec", name = "设备透传消息解析配置")
public class TransparentMessageCodecController {
private final ReactiveRepository<TransparentMessageCodecEntity, String> repository;
private final DeviceRegistry registry;
@PostMapping("/decode-test")
@QueryAction
@Operation(summary = "测试解码")
public Mono<TransparentMessageDecodeResponse> getCodec(@RequestBody Mono<TransparentMessageDecodeRequest> requestMono) {
return requestMono
.flatMapMany(req -> TransparentMessageCodecProviders
.getProviderNow(req.getProvider())
.createCodec(req.getConfiguration())
.flatMapMany(codec -> codec.decode(req.toMessage())))
.collectList()
.map(TransparentMessageDecodeResponse::of)
.onErrorResume(err -> LocaleUtils.doWithReactive(
err,
Throwable::getLocalizedMessage,
(e, msg) -> TransparentMessageDecodeResponse.error(msg)));
}
@GetMapping("/{productId}/{deviceId}.d.ts")
@QueryAction
@Operation(summary = "获取设备的TypeScript定义信息")
public Mono<String> getTypescriptDeclares(@PathVariable String productId,
@PathVariable String deviceId) {
return registry
.getDevice(deviceId)
.flatMap(DeviceOperator::getMetadata)
.flatMap(this::getTypescriptDeclares);
}
@GetMapping("/{productId}.d.ts")
@QueryAction
@Operation(summary = "获取产品的TypeScript定义信息")
public Mono<String> getTypescriptDeclares(@PathVariable String productId) {
return registry
.getProduct(productId)
.flatMap(DeviceProductOperator::getMetadata)
.flatMap(this::getTypescriptDeclares);
}
@GetMapping("/{productId}/{deviceId}")
@QueryAction
@Operation(summary = "获取设备的解析规则")
public Mono<TransparentMessageCodecEntity> getCodec(@PathVariable String productId,
@PathVariable String deviceId) {
return repository
.findById(TransparentMessageCodecEntity.createId(productId, deviceId))
//设备没有则获取产品的
.switchIfEmpty(Mono.defer(() -> {
if (StringUtils.hasText(deviceId)) {
return repository.findById(TransparentMessageCodecEntity.createId(productId, null));
}
return Mono.empty();
}));
}
@GetMapping("/{productId}")
@QueryAction
@Operation(summary = "获取产品的解析规则")
public Mono<TransparentMessageCodecEntity> getCodec(@PathVariable String productId) {
return getCodec(productId, null);
}
@PostMapping("/{productId}/{deviceId}")
@SaveAction
@Operation(summary = "保存设备解析规则")
public Mono<Void> saveCodec(@PathVariable String productId,
@PathVariable String deviceId,
@RequestBody Mono<TransparentMessageCodecRequest> requestMono) {
return requestMono
.flatMap(request-> {
TransparentMessageCodecEntity codec = new TransparentMessageCodecEntity();
codec.setProductId(productId);
codec.setDeviceId(deviceId);
codec.setProvider(request.getProvider());
codec.setConfiguration(request.getConfiguration());
return repository.save(codec);
})
.then();
}
@PostMapping("/{productId}")
@Operation(summary = "保存产品解析规则")
public Mono<Void> saveCodec(@PathVariable String productId,
@RequestBody Mono<TransparentMessageCodecRequest> requestMono) {
return saveCodec(productId, null, requestMono);
}
@DeleteMapping("/{productId}/{deviceId}")
@SaveAction
@Operation(summary = "重置设备的解析规则")
public Mono<Void> removeCodec(@PathVariable String productId,
@PathVariable String deviceId) {
return repository
.deleteById(TransparentMessageCodecEntity.createId(productId, deviceId))
.then();
}
@DeleteMapping("/{productId}")
@SaveAction
@Operation(summary = "重置产品的解析规则")
public Mono<Void> removeCodec(@PathVariable String productId) {
return removeCodec(productId, null);
}
private Mono<String> getTypescriptDeclares(DeviceMetadata metadata) {
StringBuilder builder = new StringBuilder();
TypeScriptUtils.createMetadataDeclare(metadata, builder);
TypeScriptUtils.loadDeclare("transparent-codec", builder);
return Mono.just(builder.toString());
}
}

View File

@ -1,8 +1,7 @@
package org.jetlinks.community.device.web.protocol;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import org.jetlinks.core.ProtocolSupport;
import reactor.core.publisher.Mono;
@ -11,22 +10,27 @@ import java.util.List;
@Getter
@Setter
@AllArgsConstructor
@Generated
@NoArgsConstructor
public class ProtocolDetail {
@Schema(description = "协议ID")
private String id;
@Schema(description = "协议名称")
private String name;
@Schema(description = "协议说明")
private String description;
private List<TransportDetail> transports;
public static Mono<ProtocolDetail> of(ProtocolSupport support) {
return support
.getSupportedTransport()
.flatMap(trans -> TransportDetail.of(support, trans))
.collectList()
.map(details -> new ProtocolDetail(support.getId(), support.getName(), details));
.map(details -> new ProtocolDetail(support.getId(), support.getName(),support.getDescription(), details));
}
}

View File

@ -1,22 +1,66 @@
package org.jetlinks.community.device.web.protocol;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import org.jetlinks.community.protocol.ProtocolFeature;
import org.jetlinks.core.ProtocolSupport;
import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.route.Route;
import org.jetlinks.supports.official.JetLinksDeviceMetadataCodec;
import reactor.core.publisher.Mono;
import java.util.List;
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@Generated
public class TransportDetail {
@Schema(description = "ID")
private String id;
@Schema(description = "名称")
private String name;
@Schema(description = "其他设置")
private List<ProtocolFeature> features;
@Schema(description = "路由信息")
private List<Route> routes;
@Schema(description = "文档信息")
private String document;
@Schema(description = "默认物模型")
private String metadata;
public static Mono<TransportDetail> of(ProtocolSupport support, Transport transport) {
return Mono.just(new TransportDetail(transport.getId(), transport.getName()));
return Mono
.zip(
support
//T1: 路由信息
.getRoutes(transport)
.collectList(),
support
//T2: 协议特性
.getFeatures(transport)
.map(ProtocolFeature::of)
.collectList(),
support
//T3: 默认物模型
.getDefaultMetadata(transport)
.flatMap(JetLinksDeviceMetadataCodec.getInstance()::encode)
.defaultIfEmpty("")
)
.map(tp3 -> new TransportDetail(
transport.getId(),
transport.getName(),
tp3.getT2(),
tp3.getT1(),
support.getDocument(transport),
tp3.getT3()));
}
}

View File

@ -0,0 +1,20 @@
package org.jetlinks.community.device.web.request;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import javax.validation.constraints.NotBlank;
import java.util.Map;
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class TransparentMessageCodecRequest {
@NotBlank
private String provider;
private Map<String,Object> configuration;
}

View File

@ -0,0 +1,47 @@
package org.jetlinks.community.device.web.request;
import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.collections4.MapUtils;
import org.hswebframework.web.validator.ValidatorUtils;
import org.jetlinks.core.message.DirectDeviceMessage;
import javax.validation.constraints.NotBlank;
import java.nio.charset.StandardCharsets;
import java.util.Map;
@Getter
@Setter
public class TransparentMessageDecodeRequest extends TransparentMessageCodecRequest {
// headers:{
// "topic":"/xxxx",
// "url":"/xxx"
// }
private Map<String, Object> headers;
@NotBlank
private String payload;
@SneakyThrows
public DirectDeviceMessage toMessage() {
ValidatorUtils.tryValidate(this);
DirectDeviceMessage message = new DirectDeviceMessage();
message.setDeviceId("test");
if (MapUtils.isNotEmpty(headers)) {
headers.forEach(message::addHeader);
}
byte[] data;
if (payload.startsWith("0x")) {
data = Hex.decodeHex(payload.substring(2));
} else {
data = payload.getBytes(StandardCharsets.UTF_8);
}
message.setPayload(data);
return message;
}
}

View File

@ -0,0 +1,40 @@
package org.jetlinks.community.device.web.response;
import lombok.Getter;
import lombok.Setter;
import org.jetlinks.core.message.DeviceMessage;
import java.util.List;
import java.util.stream.Collectors;
@Getter
@Setter
public class TransparentMessageDecodeResponse {
private boolean success;
private String reason;
private List<Object> outputs;
public static TransparentMessageDecodeResponse of(List<DeviceMessage> messages) {
TransparentMessageDecodeResponse response = new TransparentMessageDecodeResponse();
response.success = true;
response.outputs = messages
.stream()
.map(DeviceMessage::toJson)
.collect(Collectors.toList());
return response;
}
public static TransparentMessageDecodeResponse error(String reason) {
TransparentMessageDecodeResponse response = new TransparentMessageDecodeResponse();
response.success = false;
response.reason = reason;
return response;
}
public static TransparentMessageDecodeResponse of(Throwable err) {
return error(err.getLocalizedMessage());
}
}