diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/OperationSource.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/OperationSource.java new file mode 100644 index 00000000..99c07fe5 --- /dev/null +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/OperationSource.java @@ -0,0 +1,65 @@ +package org.jetlinks.community; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import org.jetlinks.core.utils.SerializeUtils; +import reactor.util.context.Context; +import reactor.util.context.ContextView; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.Optional; + +@AllArgsConstructor(staticName = "of") +@NoArgsConstructor +@Getter +@Setter +public class OperationSource implements Externalizable { + + private static final long serialVersionUID = 1L; + + /** + * ID,type对应操作的唯一标识 + */ + private String id; + + /** + * 操作源名称 + */ + private String name; + + /** + * 操作目标,通常为ID对应的详情数据 + */ + private Object data; + + public static OperationSource of(String id, Object data) { + return of(id, id, data); + } + + public static Context ofContext(String id, String name, Object data) { + return Context.of(OperationSource.class, of(id, name, data)); + } + + public static Optional fromContext(ContextView ctx) { + return ctx.getOrEmpty(OperationSource.class); + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeUTF(id); + SerializeUtils.writeObject(name, out); + SerializeUtils.writeObject(data, out); + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + id = in.readUTF(); + name = (String) SerializeUtils.readObject(in); + data = SerializeUtils.readObject(in); + } +} diff --git a/jetlinks-components/script-component/src/main/java/org/jetlinks/community/script/Script.java b/jetlinks-components/script-component/src/main/java/org/jetlinks/community/script/Script.java index 775c8fad..4a1143c1 100644 --- a/jetlinks-components/script-component/src/main/java/org/jetlinks/community/script/Script.java +++ b/jetlinks-components/script-component/src/main/java/org/jetlinks/community/script/Script.java @@ -1,3 +1,5 @@ + + package org.jetlinks.community.script; import lombok.*; @@ -21,4 +23,4 @@ public class Script { return of(name, content, source); } -} +} \ No newline at end of file diff --git a/jetlinks-components/script-component/src/main/java/org/jetlinks/community/script/ScriptFactory.java b/jetlinks-components/script-component/src/main/java/org/jetlinks/community/script/ScriptFactory.java index 7f5e312f..a6e4075a 100644 --- a/jetlinks-components/script-component/src/main/java/org/jetlinks/community/script/ScriptFactory.java +++ b/jetlinks-components/script-component/src/main/java/org/jetlinks/community/script/ScriptFactory.java @@ -89,4 +89,4 @@ public interface ScriptFactory { T bind(Script script, Class interfaceType); -} +} \ No newline at end of file diff --git a/jetlinks-manager/authentication-manager/src/main/java/org/jetlinks/community/auth/web/MenuController.java b/jetlinks-manager/authentication-manager/src/main/java/org/jetlinks/community/auth/web/MenuController.java index 871bb85c..b258ee80 100755 --- a/jetlinks-manager/authentication-manager/src/main/java/org/jetlinks/community/auth/web/MenuController.java +++ b/jetlinks-manager/authentication-manager/src/main/java/org/jetlinks/community/auth/web/MenuController.java @@ -15,8 +15,10 @@ import org.hswebframework.web.authorization.annotation.*; import org.hswebframework.web.authorization.exception.UnAuthorizedException; import org.hswebframework.web.crud.service.ReactiveCrudService; import org.hswebframework.web.crud.web.reactive.ReactiveServiceCrudController; +import org.hswebframework.web.exception.ValidationException; import org.hswebframework.web.i18n.LocaleUtils; import org.hswebframework.web.system.authorization.defaults.service.DefaultPermissionService; +import org.hswebframework.web.validator.CreateGroup; import org.jetlinks.community.auth.configuration.MenuProperties; import org.jetlinks.community.auth.entity.MenuEntity; import org.jetlinks.community.auth.entity.MenuView; @@ -24,6 +26,7 @@ import org.jetlinks.community.auth.service.DefaultMenuService; import org.jetlinks.community.auth.service.MenuGrantService; import org.jetlinks.community.auth.service.request.MenuGrantRequest; import org.jetlinks.community.auth.web.request.AuthorizationSettingDetail; +import org.jetlinks.community.web.response.ValidationResult; import org.springframework.transaction.annotation.Transactional; import org.springframework.web.bind.annotation.*; import reactor.core.publisher.Flux; @@ -263,6 +266,31 @@ public class MenuController implements ReactiveServiceCrudController codeValidate(@RequestParam @Parameter(description = "菜单编码") String code, + @RequestParam(required = false) + @Parameter(description = "外部菜单所属应用ID") String appId, + @RequestParam @Parameter(description = "菜单所有者") String owner) { + return LocaleUtils.currentReactive() + .flatMap(locale -> { + MenuEntity entity = new MenuEntity(); + entity.setCode(code); + entity.setOwner(owner); + entity.tryValidate("code", CreateGroup.class); + + return defaultMenuService + .findById(entity.getId()) + .map(menu -> ValidationResult + .error(LocaleUtils.resolveMessage("error.id_already_exists", locale))); + }) + .defaultIfEmpty(ValidationResult.success()) + .onErrorResume(ValidationException.class, e -> Mono.just(e.getI18nCode()) + .map(ValidationResult::error)); + } + private Mono getAuthorizationSettingDetail(Flux menus) { return Mono .zip(menus diff --git a/jetlinks-manager/authentication-manager/src/main/java/org/jetlinks/community/auth/web/PermissionController.java b/jetlinks-manager/authentication-manager/src/main/java/org/jetlinks/community/auth/web/PermissionController.java new file mode 100644 index 00000000..2a75c76e --- /dev/null +++ b/jetlinks-manager/authentication-manager/src/main/java/org/jetlinks/community/auth/web/PermissionController.java @@ -0,0 +1,56 @@ +package org.jetlinks.community.auth.web; + +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.tags.Tag; +import lombok.AllArgsConstructor; +import org.hswebframework.web.authorization.annotation.Authorize; +import org.hswebframework.web.authorization.annotation.QueryAction; +import org.hswebframework.web.authorization.annotation.Resource; +import org.hswebframework.web.exception.ValidationException; +import org.hswebframework.web.i18n.LocaleUtils; +import org.hswebframework.web.system.authorization.api.entity.PermissionEntity; +import org.hswebframework.web.system.authorization.defaults.service.DefaultPermissionService; +import org.hswebframework.web.validator.CreateGroup; +import org.jetlinks.community.web.response.ValidationResult; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; +import reactor.core.publisher.Mono; + +/** + * 权限管理. + * + * @author zhangji 2022/12/23 + */ +@RestController +@RequestMapping("/permission") +@Authorize +@Resource(id = "permission", name = "权限管理", group = "system") +@Tag(name = "权限管理") +@AllArgsConstructor +public class PermissionController { + + private final DefaultPermissionService permissionService; + + @GetMapping("/id/_validate") + @QueryAction + @Operation(summary = "验证权限ID是否合法") + public Mono permissionIdValidate2(@RequestParam @Parameter(description = "权限ID") String id) { + return LocaleUtils.currentReactive() + .flatMap(locale -> { + PermissionEntity entity = new PermissionEntity(); + entity.setId(id); + entity.tryValidate("id", CreateGroup.class); + + return permissionService + .findById(id) + .map(permission -> ValidationResult + .error(LocaleUtils.resolveMessage("error.id_already_exists", locale))); + }) + .defaultIfEmpty(ValidationResult.success()) + .onErrorResume(ValidationException.class, e -> Mono.just(e.getI18nCode()) + .map(ValidationResult::error)); + } +} diff --git a/jetlinks-manager/device-manager/pom.xml b/jetlinks-manager/device-manager/pom.xml index 5e3b64a3..c97d9877 100644 --- a/jetlinks-manager/device-manager/pom.xml +++ b/jetlinks-manager/device-manager/pom.xml @@ -99,6 +99,13 @@ compile + + + org.jetlinks.community + script-component + ${project.version} + compile + diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/TransparentMessageCodecEntity.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/TransparentMessageCodecEntity.java new file mode 100644 index 00000000..f46ecf99 --- /dev/null +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/TransparentMessageCodecEntity.java @@ -0,0 +1,86 @@ +package org.jetlinks.community.device.entity; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Getter; +import lombok.Setter; +import org.apache.commons.codec.digest.DigestUtils; +import org.hswebframework.ezorm.rdb.mapping.annotation.ColumnType; +import org.hswebframework.ezorm.rdb.mapping.annotation.DefaultValue; +import org.hswebframework.ezorm.rdb.mapping.annotation.JsonCodec; +import org.hswebframework.web.api.crud.entity.GenericEntity; +import org.hswebframework.web.api.crud.entity.RecordCreationEntity; +import org.hswebframework.web.api.crud.entity.RecordModifierEntity; +import org.hswebframework.web.crud.annotation.EnableEntityEvent; +import org.hswebframework.web.crud.generator.Generators; +import org.jetlinks.community.device.message.transparent.TransparentMessageCodecProvider; +import org.springframework.util.StringUtils; + +import javax.persistence.Column; +import javax.persistence.Table; +import java.sql.JDBCType; +import java.util.Map; + +@Getter +@Setter +@Table(name = "dev_transparent_codec") +@Schema(description = "透传消息解析器") +@EnableEntityEvent +public class TransparentMessageCodecEntity extends GenericEntity implements RecordCreationEntity, RecordModifierEntity { + + @Schema(description = "产品ID") + @Column(length = 64, nullable = false, updatable = false) + private String productId; + + @Schema(description = "设备ID") + @Column(length = 64, updatable = false) + private String deviceId; + + /** + * @see TransparentMessageCodecProvider#getProvider() + */ + @Schema(description = "编解码器提供商,如: jsr223") + @Column(length = 64, nullable = false) + private String provider; + + /** + * 编解码配置 + * + * @see TransparentMessageCodecProvider#createCodec(Map) + */ + @Schema(description = "编解码配置") + @Column(nullable = false) + @ColumnType(jdbcType = JDBCType.LONGVARCHAR, javaType = String.class) + @JsonCodec + private Map configuration; + + @Schema(description = "创建人ID") + @Column(length = 64, nullable = false, updatable = false) + private String creatorId; + + @Schema(description = "创建时间") + @Column(updatable = false) + @DefaultValue(generator = Generators.CURRENT_TIME) + private Long createTime; + + @Schema(description = "修改人ID") + @Column(length = 64) + private String modifierId; + + @Schema(description = "修改时间") + @DefaultValue(generator = Generators.CURRENT_TIME) + private Long modifyTime; + + @Override + public String getId() { + if (!StringUtils.hasText(super.getId())) { + super.setId( + createId(productId, deviceId) + ); + } + return super.getId(); + } + + public static String createId(String productId, String deviceId) { + return DigestUtils.md5Hex(String.join("|", productId, deviceId)); + } +} diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/transparent/SimpleTransparentMessageCodec.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/transparent/SimpleTransparentMessageCodec.java new file mode 100644 index 00000000..f2238acc --- /dev/null +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/transparent/SimpleTransparentMessageCodec.java @@ -0,0 +1,285 @@ +package org.jetlinks.community.device.message.transparent; + +import com.alibaba.fastjson.JSON; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; +import io.netty.util.ReferenceCountUtil; +import lombok.NonNull; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.codec.binary.Hex; +import org.apache.commons.collections4.MapUtils; +import org.jetlinks.community.OperationSource; +import org.jetlinks.community.PropertyConstants; +import org.jetlinks.core.message.DeviceMessage; +import org.jetlinks.core.message.DirectDeviceMessage; +import org.jetlinks.core.message.MessageType; +import org.jetlinks.core.message.function.ThingFunctionInvokeMessage; +import org.jetlinks.core.message.property.ReadThingPropertyMessage; +import org.jetlinks.core.message.property.ReportPropertyMessage; +import org.jetlinks.core.message.property.WriteThingPropertyMessage; +import org.jetlinks.core.utils.TopicUtils; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.*; +import java.util.function.Function; +import java.util.function.Supplier; + +@Slf4j +public class SimpleTransparentMessageCodec implements TransparentMessageCodec { + + @NonNull + protected final Codec codec; + + public SimpleTransparentMessageCodec(@NonNull Codec codec) { + this.codec = codec; + } + + + @Override + public final Mono encode(DeviceMessage message) { + + return Mono.defer(() -> { + + EncodeContext context = new EncodeContext(message); + + codec.encode(context); + + if (context.payload != null) { + DirectDeviceMessage msg = new DirectDeviceMessage(); + msg.setPayload(ByteBufUtil.getBytes(context.payload)); + //release + ReferenceCountUtil.safeRelease(context.payload); + + msg.setMessageId(message.getMessageId()); + msg.setDeviceId(message.getDeviceId()); + if (null != message.getHeaders()) { + message.getHeaders().forEach(msg::addHeader); + } + context.headers.forEach(msg::addHeader); + return Mono.just(msg); + + } + return Mono.empty(); + }); + } + + @Override + public Flux decode(DirectDeviceMessage message) { + + return Mono + .fromCallable(() -> codec.decode(new DecodeContext(message))) + .flatMapMany(this::convert) + .doOnNext(msg -> { + String from = message.getMessageId(); + if (from == null) { + from = message.getHeader(PropertyConstants.uid).orElse(null); + } + if (from != null) { + msg.addHeader("decodeFrom", from); + } + msg.thingId(message.getThingType(), message.getThingId()); + }); + + } + + @SuppressWarnings("all") + protected Flux convert(Object msg) { + if (msg == null) { + return Flux.empty(); + } + if (msg instanceof DeviceMessage) { + return Flux.just(((DeviceMessage) msg)); + } + if (msg instanceof Map) { + if (MapUtils.isEmpty(((Map) msg))) { + return Flux.empty(); + } + MessageType type = MessageType.of(((Map) msg)).orElse(MessageType.UNKNOWN); + if (type == MessageType.UNKNOWN) { + //返回map但是未设备未设备消息,则转为属性上报 + return Flux.just(new ReportPropertyMessage().properties(((Map) msg))); + } + return Mono + .justOrEmpty(type.convert(((Map) msg))) + .flux() + .cast(DeviceMessage.class); + } + if (msg instanceof Collection) { + return Flux + .fromIterable(((Collection) msg)) + .flatMap(this::convert); + } + if (msg instanceof Publisher) { + return Flux + .from(((Publisher) msg)) + .flatMap(this::convert); + } + return Flux.error(new UnsupportedOperationException("unsupported data:" + msg)); + } + + public static class DecodeContext { + final DirectDeviceMessage msg; + final ByteBuf buffer; + + DecodeContext(DirectDeviceMessage msg) { + this.msg = msg; + this.buffer = msg.asByteBuf(); + } + + public long timestamp() { + return msg.getTimestamp(); + } + + public ByteBuf payload() { + return buffer; + } + + public Object json() { + return JSON.parse(buffer.array()); + } + + public Map pathVars(String pattern, String path) { + return TopicUtils.getPathVariables(pattern, path); + } + + public String url() { + return msg.getHeader("url") + .map(String::valueOf) + .orElse(null); + } + + public String topic() { + return msg.getHeader("topic") + .map(String::valueOf) + .orElse(null); + } + + public DirectDeviceMessage message() { + return msg; + } + + } + + /** + *
{@code
+     *
+     * context
+     * .whenReadProperty("temp",()->return "0x0122")
+     * .whenFunction("func",args->{
+     *
+     * })
+     *
+     * }
+ */ + public static class EncodeContext { + + private final DeviceMessage source; + private ByteBuf payload; + private final Map headers = new HashMap<>(); + + public EncodeContext(DeviceMessage source) { + this.source = source; + } + + public DeviceMessage message() { + return source; + } + + public EncodeContext topic(String topic) { + headers.put("topic", topic); + return this; + } + + public ByteBuf payload() { + return payload == null ? payload = Unpooled.buffer() : payload; + } + + public ByteBuf newBuffer() { + return Unpooled.buffer(); + } + + @SneakyThrows + public EncodeContext setPayload(String strOrHex, String charset) { + if (strOrHex.startsWith("0x")) { + payload().writeBytes(Hex.decodeHex(strOrHex.substring(2))); + } else { + payload().writeBytes(strOrHex.getBytes(charset)); + } + return this; + } + + @SneakyThrows + public EncodeContext setPayload(String strOrHex) { + setPayload(strOrHex, "utf-8"); + return this; + } + + public EncodeContext setPayload(Object data) { + + if (data instanceof String) { + setPayload(((String) data)); + } + + if (data instanceof byte[]) { + payload().writeBytes(((byte[]) data)); + } + + if (data instanceof ByteBuf) { + this.payload = ((ByteBuf) data); + } + //todo 更多类型? + + return this; + } + + public EncodeContext whenFunction(String functionId, Function supplier) { + if (source instanceof ThingFunctionInvokeMessage) { + ThingFunctionInvokeMessage msg = ((ThingFunctionInvokeMessage) source); + if ("*".equals(msg.getFunctionId()) || Objects.equals(functionId, msg.getFunctionId())) { + setPayload(supplier.apply(msg.inputsToMap())); + } + } + return this; + } + + public EncodeContext whenWriteProperty(String property, Function supplier) { + if (source instanceof WriteThingPropertyMessage) { + if ("*".equals(property)) { + setPayload(supplier.apply(((WriteThingPropertyMessage) source).getProperties())); + return this; + } + Object value = ((WriteThingPropertyMessage) source).getProperties().get(property); + if (value != null) { + setPayload(supplier.apply(value)); + } + } + return this; + } + + public EncodeContext whenReadProperties(Function, Object> supplier) { + if (source instanceof ReadThingPropertyMessage) { + setPayload(supplier.apply(((ReadThingPropertyMessage) source).getProperties())); + } + return this; + } + + public EncodeContext whenReadProperty(String property, Supplier supplier) { + if (source instanceof ReadThingPropertyMessage) { + if ("*".equals(property) || ((ReadThingPropertyMessage) source).getProperties().contains(property)) { + setPayload(supplier.get()); + } + } + return this; + } + } + + public interface Codec { + Object decode(DecodeContext context); + + Object encode(EncodeContext context); + } +} diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/transparent/TransparentDeviceMessageConnector.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/transparent/TransparentDeviceMessageConnector.java new file mode 100644 index 00000000..22f6e27e --- /dev/null +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/transparent/TransparentDeviceMessageConnector.java @@ -0,0 +1,197 @@ +package org.jetlinks.community.device.message.transparent; + +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository; +import org.hswebframework.web.crud.events.EntityCreatedEvent; +import org.hswebframework.web.crud.events.EntityDeletedEvent; +import org.hswebframework.web.crud.events.EntityModifyEvent; +import org.hswebframework.web.crud.events.EntitySavedEvent; +import org.hswebframework.web.exception.ValidationException; +import org.jctools.maps.NonBlockingHashMap; +import org.jetlinks.core.device.DeviceConfigKey; +import org.jetlinks.core.device.DeviceOperator; +import org.jetlinks.core.event.EventBus; +import org.jetlinks.core.event.Subscription; +import org.jetlinks.core.message.DeviceMessage; +import org.jetlinks.core.message.DirectDeviceMessage; +import org.jetlinks.core.message.Headers; +import org.jetlinks.core.message.interceptor.DeviceMessageSenderInterceptor; +import org.jetlinks.community.OperationSource; +import org.jetlinks.community.device.entity.TransparentMessageCodecEntity; +import org.jetlinks.community.gateway.annotation.Subscribe; +import org.jetlinks.supports.server.DecodedClientMessageHandler; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.boot.CommandLineRunner; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.Map; + +@Slf4j +@Component +public class TransparentDeviceMessageConnector implements CommandLineRunner, DeviceMessageSenderInterceptor { + + private final ReactiveRepository repository; + + private final DecodedClientMessageHandler messageHandler; + + private final EventBus eventBus; + + private final Map codecs = new NonBlockingHashMap<>(); + + public TransparentDeviceMessageConnector(@SuppressWarnings("all") + ReactiveRepository repository, + DecodedClientMessageHandler messageHandler, + EventBus eventBus, + ObjectProvider providers) { + this.repository = repository; + this.messageHandler = messageHandler; + this.eventBus = eventBus; + for (TransparentMessageCodecProvider provider : providers) { + TransparentMessageCodecProviders.addProvider(provider); + } + } + + + @Subscribe("/device/*/*/message/direct") + public Mono handleMessage(DirectDeviceMessage message) { + String productId = message.getHeaderOrDefault(Headers.productId); + String deviceId = message.getDeviceId(); + TransparentMessageCodec codec = getCodecOrNull(productId, deviceId); + if (null == codec) { + return Mono.empty(); + } + return codec + .decode(message) + .flatMap(msg -> messageHandler.handleMessage(null, msg)) + .then(); + } + + private TransparentMessageCodec getCodecOrNull(String productId, String deviceId) { + CacheKey cacheKey = new CacheKey(productId, deviceId); + TransparentMessageCodec codec = codecs.get(cacheKey); + if (codec == null) { + cacheKey.setDeviceId(null); + codec = codecs.get(cacheKey); + } + return codec; + } + + @Override + public Mono preSend(DeviceOperator device, DeviceMessage message) { + return device + .getSelfConfig(DeviceConfigKey.productId) + .mapNotNull(productId -> getCodecOrNull(productId, device.getDeviceId())) + .flatMap(codec -> codec + .encode(message) + .doOnNext(msg -> { + msg.addHeader("encodeBy", message.getMessageType().name()); + //所有透传消息都设置为异步 + msg.addHeader(Headers.async, true); + // msg.addHeader(Headers.sendAndForget, true); + }) + ) + .defaultIfEmpty(message); + } + + + @Subscribe(value = "/_sys/transparent-codec/load", features = Subscription.Feature.broker) + public Mono doLoadCodec(TransparentMessageCodecEntity entity) { + CacheKey key = new CacheKey(entity.getProductId(), entity.getDeviceId()); + TransparentMessageCodecProvider provider = TransparentMessageCodecProviders + .getProvider(entity.getProvider()) + .orElseThrow(() -> new ValidationException("codec", "error.unsupported_codec", entity.getProvider())); + return provider + .createCodec(entity.getConfiguration()) + .doOnNext(codec -> codecs.put(key, codec)) + .contextWrite(OperationSource.ofContext(entity.getId(),null,entity)) + .switchIfEmpty(Mono.fromRunnable(() -> codecs.remove(key))) + .then(); + } + + @Subscribe(value = "/_sys/transparent-codec/removed", features = Subscription.Feature.broker) + public Mono doRemoveCodec(TransparentMessageCodecEntity entity) { + CacheKey key = new CacheKey(entity.getProductId(), entity.getDeviceId()); + codecs.remove(key); + return Mono.empty(); + } + + @EventListener + public void handleEntityEvent(EntityCreatedEvent event) { + event.async( + Flux.fromIterable(event.getEntity()) + .flatMap(this::loadCodec) + ); + } + + @EventListener + public void handleEntityEvent(EntitySavedEvent event) { + event.async( + Flux.fromIterable(event.getEntity()) + .flatMap(this::loadCodec) + ); + } + + @EventListener + public void handleEntityEvent(EntityModifyEvent event) { + event.async( + Flux.fromIterable(event.getAfter()) + .flatMap(this::loadCodec) + ); + } + + @EventListener + public void handleEntityEvent(EntityDeletedEvent event) { + event.async( + Flux.fromIterable(event.getEntity()) + .flatMap(this::removeCodec) + ); + } + + public Mono loadCodec(TransparentMessageCodecEntity entity) { + return doLoadCodec(entity) + .then( + eventBus + .publish("/_sys/transparent-codec/load", entity) + .then() + ); + } + + public Mono removeCodec(TransparentMessageCodecEntity entity) { + return doRemoveCodec(entity) + .then( + eventBus + .publish("/_sys/transparent-codec/removed", entity) + .then() + ); + } + + @Override + public void run(String... args) throws Exception { + repository + .createQuery() + .fetch() + .flatMap(e -> this + .doLoadCodec(e) + .onErrorResume(err -> { + log.error("load transparent device message codec [{}:{}] error", e.getId(), e.getProvider(), err); + return Mono.empty(); + })) + .subscribe(); + } + + @Getter + @Setter + @EqualsAndHashCode + @AllArgsConstructor + static class CacheKey { + private String productId; + private String deviceId; + } +} diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/transparent/TransparentMessageCodec.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/transparent/TransparentMessageCodec.java new file mode 100644 index 00000000..7c7e043a --- /dev/null +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/transparent/TransparentMessageCodec.java @@ -0,0 +1,14 @@ +package org.jetlinks.community.device.message.transparent; + +import org.jetlinks.core.message.DeviceMessage; +import org.jetlinks.core.message.DirectDeviceMessage; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public interface TransparentMessageCodec { + + Flux decode(DirectDeviceMessage message); + + Mono encode(DeviceMessage message); + +} diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/transparent/TransparentMessageCodecProvider.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/transparent/TransparentMessageCodecProvider.java new file mode 100644 index 00000000..00a81a9f --- /dev/null +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/transparent/TransparentMessageCodecProvider.java @@ -0,0 +1,13 @@ +package org.jetlinks.community.device.message.transparent; + +import reactor.core.publisher.Mono; + +import java.util.Map; + +public interface TransparentMessageCodecProvider { + + String getProvider(); + + Mono createCodec(Map configuration); + +} diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/transparent/TransparentMessageCodecProviders.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/transparent/TransparentMessageCodecProviders.java new file mode 100644 index 00000000..35305172 --- /dev/null +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/transparent/TransparentMessageCodecProviders.java @@ -0,0 +1,32 @@ +package org.jetlinks.community.device.message.transparent; + +import org.hswebframework.web.exception.I18nSupportException; +import org.jctools.maps.NonBlockingHashMap; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public class TransparentMessageCodecProviders { + + public static Map providers = new NonBlockingHashMap<>(); + + + static void addProvider(TransparentMessageCodecProvider provider) { + providers.put(provider.getProvider(), provider); + } + + public static List getProviders() { + return new ArrayList<>(providers.values()); + } + + public static Optional getProvider(String provider) { + return Optional.ofNullable(providers.get(provider)); + } + + public static TransparentMessageCodecProvider getProviderNow(String provider) { + return getProvider(provider) + .orElseThrow(()->new I18nSupportException("error.unsupported_codec",provider)); + } +} diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/transparent/script/Jsr223TransparentMessageCodecProvider.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/transparent/script/Jsr223TransparentMessageCodecProvider.java new file mode 100644 index 00000000..f7928e7d --- /dev/null +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/transparent/script/Jsr223TransparentMessageCodecProvider.java @@ -0,0 +1,86 @@ +package org.jetlinks.community.device.message.transparent.script; + +import org.hswebframework.web.exception.ValidationException; +import org.jetlinks.community.device.message.transparent.SimpleTransparentMessageCodec; +import org.jetlinks.community.device.message.transparent.TransparentMessageCodec; +import org.jetlinks.community.device.message.transparent.TransparentMessageCodecProvider; +import org.jetlinks.community.script.Script; +import org.jetlinks.community.script.Scripts; +import org.springframework.stereotype.Component; +import org.springframework.util.Assert; +import reactor.core.publisher.Mono; + +import java.util.Map; +import java.util.function.Function; + +@Component +public class Jsr223TransparentMessageCodecProvider implements TransparentMessageCodecProvider { + + @Override + public String getProvider() { + return "jsr223"; + } + + @Override + public Mono createCodec(Map configuration) { + String lang = (String) configuration.getOrDefault("lang", "js"); + String script = (String) configuration.get("script"); + Assert.hasText(lang, "lang can not be null"); + Assert.hasText(script, "script can not be null"); + + CodecContext context = new CodecContext(); + + SimpleTransparentMessageCodec.Codec codec = Scripts + .getFactory(lang) + .bind(Script.of("jsr223-transparent", script), + SimpleTransparentMessageCodec.Codec.class); + + if (context.encoder == null && codec != null) { + context.onDownstream(codec::encode); + } + if (context.decoder == null && codec != null) { + context.onUpstream(codec::decode); + } + + if (codec == null && context.encoder == null && context.decoder == null) { + return Mono.error(new ValidationException("script", "error.codec_message_undefined")); + } + return Mono + .deferContextual(ctx -> Mono + .just( + new SimpleTransparentMessageCodec(context) + )); + } + + public static class CodecContext implements SimpleTransparentMessageCodec.Codec { + + private Function encoder; + private Function decoder; + + public void onDownstream(Function encoder) { + this.encoder = encoder; + } + + public void onUpstream(Function decoder) { + this.decoder = decoder; + } + + @Override + public Object decode(SimpleTransparentMessageCodec.DecodeContext context) { + if (decoder == null) { + return null; + } + return decoder.apply(context); + } + + @Override + public Object encode(SimpleTransparentMessageCodec.EncodeContext context) { + if (encoder == null) { + return null; + } + return encoder.apply(context); + } + + } + +} diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/TransparentMessageCodecController.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/TransparentMessageCodecController.java new file mode 100644 index 00000000..a2f2e8cb --- /dev/null +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/TransparentMessageCodecController.java @@ -0,0 +1,159 @@ +package org.jetlinks.community.device.web; + +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.tags.Tag; +import lombok.AllArgsConstructor; +import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository; +import org.hswebframework.web.authorization.annotation.QueryAction; +import org.hswebframework.web.authorization.annotation.Resource; +import org.hswebframework.web.authorization.annotation.SaveAction; +import org.hswebframework.web.i18n.LocaleUtils; +import org.jetlinks.community.device.entity.TransparentMessageCodecEntity; +import org.jetlinks.community.device.message.transparent.TransparentMessageCodecProviders; +import org.jetlinks.community.device.web.request.TransparentMessageCodecRequest; +import org.jetlinks.community.device.web.request.TransparentMessageDecodeRequest; +import org.jetlinks.community.device.web.response.TransparentMessageDecodeResponse; +import org.jetlinks.core.device.DeviceOperator; +import org.jetlinks.core.device.DeviceProductOperator; +import org.jetlinks.core.device.DeviceRegistry; +import org.jetlinks.core.metadata.DeviceMetadata; +import org.jetlinks.core.utils.TypeScriptUtils; +import org.springframework.util.StringUtils; +import org.springframework.web.bind.annotation.*; +import reactor.core.publisher.Mono; + +@RestController +@RequestMapping("/device/transparent-codec") +@Tag(name = "设备透传消息解析配置") +@AllArgsConstructor +@Resource(id = "transparent-codec", name = "设备透传消息解析配置") +public class TransparentMessageCodecController { + + private final ReactiveRepository repository; + + private final DeviceRegistry registry; + + + @PostMapping("/decode-test") + @QueryAction + @Operation(summary = "测试解码") + public Mono getCodec(@RequestBody Mono requestMono) { + return requestMono + .flatMapMany(req -> TransparentMessageCodecProviders + .getProviderNow(req.getProvider()) + .createCodec(req.getConfiguration()) + .flatMapMany(codec -> codec.decode(req.toMessage()))) + .collectList() + .map(TransparentMessageDecodeResponse::of) + .onErrorResume(err -> LocaleUtils.doWithReactive( + err, + Throwable::getLocalizedMessage, + (e, msg) -> TransparentMessageDecodeResponse.error(msg))); + } + + @GetMapping("/{productId}/{deviceId}.d.ts") + @QueryAction + @Operation(summary = "获取设备的TypeScript定义信息") + public Mono getTypescriptDeclares(@PathVariable String productId, + @PathVariable String deviceId) { + return registry + .getDevice(deviceId) + .flatMap(DeviceOperator::getMetadata) + .flatMap(this::getTypescriptDeclares); + } + + @GetMapping("/{productId}.d.ts") + @QueryAction + @Operation(summary = "获取产品的TypeScript定义信息") + public Mono getTypescriptDeclares(@PathVariable String productId) { + return registry + .getProduct(productId) + .flatMap(DeviceProductOperator::getMetadata) + .flatMap(this::getTypescriptDeclares); + } + + + @GetMapping("/{productId}/{deviceId}") + @QueryAction + @Operation(summary = "获取设备的解析规则") + public Mono getCodec(@PathVariable String productId, + @PathVariable String deviceId) { + + + return repository + .findById(TransparentMessageCodecEntity.createId(productId, deviceId)) + //设备没有则获取产品的 + .switchIfEmpty(Mono.defer(() -> { + if (StringUtils.hasText(deviceId)) { + return repository.findById(TransparentMessageCodecEntity.createId(productId, null)); + } + return Mono.empty(); + })); + } + + @GetMapping("/{productId}") + @QueryAction + @Operation(summary = "获取产品的解析规则") + public Mono getCodec(@PathVariable String productId) { + + return getCodec(productId, null); + } + + + @PostMapping("/{productId}/{deviceId}") + @SaveAction + @Operation(summary = "保存设备解析规则") + public Mono saveCodec(@PathVariable String productId, + @PathVariable String deviceId, + @RequestBody Mono requestMono) { + + + return requestMono + .flatMap(request-> { + TransparentMessageCodecEntity codec = new TransparentMessageCodecEntity(); + codec.setProductId(productId); + codec.setDeviceId(deviceId); + codec.setProvider(request.getProvider()); + codec.setConfiguration(request.getConfiguration()); + return repository.save(codec); + }) + .then(); + } + + @PostMapping("/{productId}") + @Operation(summary = "保存产品解析规则") + public Mono saveCodec(@PathVariable String productId, + @RequestBody Mono requestMono) { + return saveCodec(productId, null, requestMono); + } + + @DeleteMapping("/{productId}/{deviceId}") + @SaveAction + @Operation(summary = "重置设备的解析规则") + public Mono removeCodec(@PathVariable String productId, + @PathVariable String deviceId) { + + + return repository + .deleteById(TransparentMessageCodecEntity.createId(productId, deviceId)) + .then(); + } + + @DeleteMapping("/{productId}") + @SaveAction + @Operation(summary = "重置产品的解析规则") + public Mono removeCodec(@PathVariable String productId) { + return removeCodec(productId, null); + } + + + private Mono getTypescriptDeclares(DeviceMetadata metadata) { + StringBuilder builder = new StringBuilder(); + + TypeScriptUtils.createMetadataDeclare(metadata, builder); + TypeScriptUtils.loadDeclare("transparent-codec", builder); + + return Mono.just(builder.toString()); + } + +} diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/protocol/ProtocolDetail.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/protocol/ProtocolDetail.java index cd047caf..e0153d88 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/protocol/ProtocolDetail.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/protocol/ProtocolDetail.java @@ -1,8 +1,7 @@ package org.jetlinks.community.device.web.protocol; -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.Setter; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.*; import org.jetlinks.core.ProtocolSupport; import reactor.core.publisher.Mono; @@ -11,22 +10,27 @@ import java.util.List; @Getter @Setter @AllArgsConstructor +@Generated +@NoArgsConstructor public class ProtocolDetail { + @Schema(description = "协议ID") private String id; + @Schema(description = "协议名称") private String name; + @Schema(description = "协议说明") + private String description; + private List transports; public static Mono of(ProtocolSupport support) { + return support .getSupportedTransport() .flatMap(trans -> TransportDetail.of(support, trans)) .collectList() - .map(details -> new ProtocolDetail(support.getId(), support.getName(), details)); + .map(details -> new ProtocolDetail(support.getId(), support.getName(),support.getDescription(), details)); } } - - - diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/protocol/TransportDetail.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/protocol/TransportDetail.java index 5d4019a7..362f15fb 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/protocol/TransportDetail.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/protocol/TransportDetail.java @@ -1,22 +1,66 @@ package org.jetlinks.community.device.web.protocol; -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.Setter; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.*; +import org.jetlinks.community.protocol.ProtocolFeature; import org.jetlinks.core.ProtocolSupport; import org.jetlinks.core.message.codec.Transport; +import org.jetlinks.core.route.Route; +import org.jetlinks.supports.official.JetLinksDeviceMetadataCodec; import reactor.core.publisher.Mono; +import java.util.List; + @Getter @Setter @AllArgsConstructor +@NoArgsConstructor +@Generated public class TransportDetail { + @Schema(description = "ID") private String id; + @Schema(description = "名称") private String name; + @Schema(description = "其他设置") + private List features; + + @Schema(description = "路由信息") + private List routes; + + @Schema(description = "文档信息") + private String document; + + @Schema(description = "默认物模型") + private String metadata; + public static Mono of(ProtocolSupport support, Transport transport) { - return Mono.just(new TransportDetail(transport.getId(), transport.getName())); + return Mono + .zip( + support + //T1: 路由信息 + .getRoutes(transport) + .collectList(), + support + //T2: 协议特性 + .getFeatures(transport) + .map(ProtocolFeature::of) + .collectList(), + support + //T3: 默认物模型 + .getDefaultMetadata(transport) + .flatMap(JetLinksDeviceMetadataCodec.getInstance()::encode) + .defaultIfEmpty("") + ) + .map(tp3 -> new TransportDetail( + transport.getId(), + transport.getName(), + tp3.getT2(), + tp3.getT1(), + support.getDocument(transport), + tp3.getT3())); + } } \ No newline at end of file diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/request/TransparentMessageCodecRequest.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/request/TransparentMessageCodecRequest.java new file mode 100644 index 00000000..b421929d --- /dev/null +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/request/TransparentMessageCodecRequest.java @@ -0,0 +1,20 @@ +package org.jetlinks.community.device.web.request; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +import javax.validation.constraints.NotBlank; +import java.util.Map; + +@Getter +@Setter +@AllArgsConstructor +@NoArgsConstructor +public class TransparentMessageCodecRequest { + @NotBlank + private String provider; + + private Map configuration; +} diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/request/TransparentMessageDecodeRequest.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/request/TransparentMessageDecodeRequest.java new file mode 100644 index 00000000..8fb15215 --- /dev/null +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/request/TransparentMessageDecodeRequest.java @@ -0,0 +1,47 @@ +package org.jetlinks.community.device.web.request; + +import lombok.Getter; +import lombok.Setter; +import lombok.SneakyThrows; +import org.apache.commons.codec.binary.Hex; +import org.apache.commons.collections4.MapUtils; +import org.hswebframework.web.validator.ValidatorUtils; +import org.jetlinks.core.message.DirectDeviceMessage; + +import javax.validation.constraints.NotBlank; +import java.nio.charset.StandardCharsets; +import java.util.Map; + +@Getter +@Setter +public class TransparentMessageDecodeRequest extends TransparentMessageCodecRequest { + + // headers:{ + // "topic":"/xxxx", + // "url":"/xxx" + // } + private Map headers; + + @NotBlank + private String payload; + + @SneakyThrows + public DirectDeviceMessage toMessage() { + ValidatorUtils.tryValidate(this); + + DirectDeviceMessage message = new DirectDeviceMessage(); + message.setDeviceId("test"); + if (MapUtils.isNotEmpty(headers)) { + headers.forEach(message::addHeader); + } + byte[] data; + if (payload.startsWith("0x")) { + data = Hex.decodeHex(payload.substring(2)); + } else { + data = payload.getBytes(StandardCharsets.UTF_8); + } + message.setPayload(data); + + return message; + } +} diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/response/TransparentMessageDecodeResponse.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/response/TransparentMessageDecodeResponse.java new file mode 100644 index 00000000..a781a352 --- /dev/null +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/response/TransparentMessageDecodeResponse.java @@ -0,0 +1,40 @@ +package org.jetlinks.community.device.web.response; + +import lombok.Getter; +import lombok.Setter; +import org.jetlinks.core.message.DeviceMessage; + +import java.util.List; +import java.util.stream.Collectors; + +@Getter +@Setter +public class TransparentMessageDecodeResponse { + private boolean success; + + private String reason; + + private List outputs; + + public static TransparentMessageDecodeResponse of(List messages) { + TransparentMessageDecodeResponse response = new TransparentMessageDecodeResponse(); + response.success = true; + response.outputs = messages + .stream() + .map(DeviceMessage::toJson) + .collect(Collectors.toList()); + + return response; + } + + public static TransparentMessageDecodeResponse error(String reason) { + TransparentMessageDecodeResponse response = new TransparentMessageDecodeResponse(); + response.success = false; + response.reason = reason; + return response; + } + + public static TransparentMessageDecodeResponse of(Throwable err) { + return error(err.getLocalizedMessage()); + } +}