From a029762367aea064bcdf8c5ab9f10de2fc831b00 Mon Sep 17 00:00:00 2001 From: zhou-hao Date: Mon, 9 Mar 2020 15:45:25 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E7=88=B6=E5=AD=90=E8=AE=BE?= =?UTF-8?q?=E5=A4=87=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../device/entity/DeviceInstanceEntity.java | 3 + .../device/entity/DeviceProductEntity.java | 8 +- .../DeviceInstanceImportExportEntity.java | 3 + .../service/LocalDeviceInstanceService.java | 88 +++++++++++-------- .../service/LocalDeviceProductService.java | 11 ++- .../DefaultDeviceSessionManager.java | 2 +- simulator/scripts/demo-device.js | 12 +++ 7 files changed, 84 insertions(+), 43 deletions(-) diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DeviceInstanceEntity.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DeviceInstanceEntity.java index 4cc52f56..464f128e 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DeviceInstanceEntity.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DeviceInstanceEntity.java @@ -81,4 +81,7 @@ public class DeviceInstanceEntity extends GenericEntity implements Recor @Comment("所属机构id") private String orgId; + @Column(name = "parent_id", length = 32) + @Comment("父级设备ID") + private String parentId; } diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DeviceProductEntity.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DeviceProductEntity.java index 496231e8..bfc709f7 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DeviceProductEntity.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DeviceProductEntity.java @@ -3,10 +3,7 @@ package org.jetlinks.community.device.entity; import lombok.Getter; import lombok.Setter; import org.hibernate.validator.constraints.Length; -import org.hswebframework.ezorm.rdb.mapping.annotation.ColumnType; -import org.hswebframework.ezorm.rdb.mapping.annotation.Comment; -import org.hswebframework.ezorm.rdb.mapping.annotation.EnumCodec; -import org.hswebframework.ezorm.rdb.mapping.annotation.JsonCodec; +import org.hswebframework.ezorm.rdb.mapping.annotation.*; import org.hswebframework.web.api.crud.entity.GenericEntity; import org.hswebframework.web.api.crud.entity.RecordCreationEntity; import org.hswebframework.web.crud.generator.Generators; @@ -89,16 +86,17 @@ public class DeviceProductEntity extends GenericEntity implements Record @Comment("产品状态") @Column(name = "state") + @DefaultValue("0") private Byte state; @Column(name = "creator_id") @Comment("创建者id") private String creatorId; + @Comment("创建时间") @Column(name = "create_time") private Long createTime; - @Column(name = "org_id", length = 32) @Comment("所属机构id") private String orgId; diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/excel/DeviceInstanceImportExportEntity.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/excel/DeviceInstanceImportExportEntity.java index 03fdde64..13464cc3 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/excel/DeviceInstanceImportExportEntity.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/excel/DeviceInstanceImportExportEntity.java @@ -18,5 +18,8 @@ public class DeviceInstanceImportExportEntity { @ExcelProperty("描述") private String describe; + @ExcelProperty("父级设备ID") + private String parentId; + } diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceInstanceService.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceInstanceService.java index 4e8902ac..40cc5864 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceInstanceService.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceInstanceService.java @@ -5,6 +5,7 @@ import com.alibaba.excel.ExcelWriter; import com.alibaba.excel.write.metadata.WriteSheet; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; import org.hswebframework.ezorm.core.dsl.Query; import org.hswebframework.ezorm.core.param.QueryParam; import org.hswebframework.ezorm.core.param.TermType; @@ -17,9 +18,11 @@ import org.hswebframework.web.exception.BusinessException; import org.hswebframework.web.exception.NotFoundException; import org.hswebframework.web.logger.ReactiveLogger; import org.jetlinks.community.device.entity.DeviceOperationLogEntity; +import org.jetlinks.community.device.message.DeviceMessageUtils; import org.jetlinks.core.device.DeviceConfigKey; import org.jetlinks.core.device.DeviceOperator; import org.jetlinks.core.device.DeviceRegistry; +import org.jetlinks.core.message.DeviceMessage; import org.jetlinks.core.message.DeviceOfflineMessage; import org.jetlinks.core.message.DeviceOnlineMessage; import org.jetlinks.core.metadata.DataType; @@ -53,6 +56,7 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.SignalType; import reactor.core.scheduler.Schedulers; import reactor.util.function.Tuple2; +import reactor.util.function.Tuple3; import reactor.util.function.Tuples; import javax.annotation.PostConstruct; @@ -65,6 +69,7 @@ import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Function; import java.util.stream.Collectors; import static org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric.devicePropertyMetric; @@ -163,7 +168,9 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService deviceOperator.getState() .flatMap(r -> { @@ -320,19 +327,8 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService Mono.fromCallable(() -> { - - if (message.getMessage() instanceof EncodableMessage) { - Object msg = ((EncodableMessage) message.getMessage()).getNativePayload(); - if (msg instanceof DeviceOnlineMessage) { - return ((DeviceOnlineMessage) msg).getDeviceId(); - } - if (msg instanceof DeviceOfflineMessage) { - return ((DeviceOfflineMessage) msg).getDeviceId(); - } - } - return null; - })), 800, 200, Duration.ofSeconds(2)) + .flatMap(message -> Mono.justOrEmpty(DeviceMessageUtils.convert(message)) + .map(DeviceMessage::getDeviceId)), 800, 200, Duration.ofSeconds(2)) .flatMap(list -> syncStateBatch(Flux.just(list), false).count()) .onErrorContinue((err, obj) -> log.error(err.getMessage(), err)) .subscribe((i) -> log.info("同步设备状态成功:{}", i)); @@ -347,30 +343,50 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService syncStateBatch(Flux> batch, boolean force) { - return batch.flatMap(list -> Flux.fromIterable(list) - .flatMap(registry::getDevice) - .publishOn(Schedulers.parallel()) - .flatMap(operation -> { - if (force) { - return operation.checkState().zipWith(Mono.just(operation.getDeviceId())); - } - return operation.getState().zipWith(Mono.just(operation.getDeviceId())); - }) - .groupBy(Tuple2::getT1, Tuple2::getT2) - .flatMap(group -> { - @SuppressWarnings("all") - DeviceState state = group.key() == null ? DeviceState.offline : DeviceState.of(group.key()); - return group.collectList() - .flatMap(idList -> getRepository() - .createUpdate() - .set(DeviceInstanceEntity::getState, state) - .where() - .in(DeviceInstanceEntity::getId, idList) - .execute()); - })); + return batch + .flatMap(list -> Flux.fromIterable(list) + .flatMap(registry::getDevice) + .publishOn(Schedulers.parallel()) + .flatMap(operation -> { + Mono state = force ? operation.checkState() : operation.getState(); + return Mono.zip( + state,//状态 + Mono.just(operation.getDeviceId()), //设备id + operation.getConfig(DeviceConfigKey.isGatewayDevice)//是否为网关设备 + ); + }) + .groupBy(Tuple2::getT1, Function.identity()) + .flatMap(group -> { + @SuppressWarnings("all") + DeviceState state = group.key() == null ? DeviceState.offline : DeviceState.of(group.key()); + return group + .collectList() + .flatMap(idList -> Mono.zip( + //修改设备状态 + getRepository() + .createUpdate() + .set(DeviceInstanceEntity::getState, state) + .where() + .in(DeviceInstanceEntity::getId, idList.stream().map(Tuple3::getT2).collect(Collectors.toList())) + .execute(), + //修改子设备状态 + Flux.fromIterable(idList) + .filter(Tuple3::getT3) + .map(Tuple3::getT2) + .collectList() + .filter(CollectionUtils::isNotEmpty) + .flatMap(parents -> + getRepository() + .createUpdate() + .set(DeviceInstanceEntity::getState, state) + .where() + .in(DeviceInstanceEntity::getParentId, parents) + .execute() + ).defaultIfEmpty(0) + , Math::addExact)); + })); } - public Flux doBatchImport(String fileUrl) { return deviceProductService .createQuery() diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceProductService.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceProductService.java index 60fb6705..68f0c403 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceProductService.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceProductService.java @@ -3,6 +3,7 @@ package org.jetlinks.community.device.service; import lombok.extern.slf4j.Slf4j; import org.hswebframework.web.bean.FastBeanCopier; import org.hswebframework.web.crud.service.GenericReactiveCrudService; +import org.jetlinks.core.device.DeviceConfigKey; import org.jetlinks.core.device.DeviceRegistry; import org.jetlinks.core.device.ProductInfo; import org.jetlinks.community.device.entity.DeviceProductEntity; @@ -14,6 +15,8 @@ import org.springframework.context.ApplicationEventPublisher; import org.springframework.stereotype.Service; import reactor.core.publisher.Mono; +import static org.jetlinks.community.device.enums.DeviceType.gateway; + @Service @Slf4j public class LocalDeviceProductService extends GenericReactiveCrudService { @@ -26,7 +29,13 @@ public class LocalDeviceProductService extends GenericReactiveCrudService deploy(String id) { return findById(Mono.just(id)) - .flatMap(product -> registry.registry(new ProductInfo(id, product.getMessageProtocol(), product.getMetadata())) + .flatMap(product -> registry.registry( + ProductInfo.builder() + .id(id) + .protocol(product.getMessageProtocol()) + .metadata(product.getMetadata()) + .build() + .addConfig(DeviceConfigKey.isGatewayDevice, product.getDeviceType() == gateway)) .flatMap(deviceProductOperator -> deviceProductOperator.setConfigs(product.getConfiguration())) .flatMap(re -> createUpdate() .set(DeviceProductEntity::getState, DeviceProductState.registered.getValue()) diff --git a/jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/DefaultDeviceSessionManager.java b/jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/DefaultDeviceSessionManager.java index 889227b0..af2a0ff7 100644 --- a/jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/DefaultDeviceSessionManager.java +++ b/jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/DefaultDeviceSessionManager.java @@ -232,7 +232,7 @@ public class DefaultDeviceSessionManager implements DeviceSessionManager { .switchIfEmpty(Mono.fromRunnable(() -> log.warn("children device [{}] not fond in registry", childrenDeviceId))) .flatMap(deviceOperator -> deviceOperator .online(session.getServerId().orElse(serverId), session.getId()) - .then(deviceOperator.setConfig(DeviceConfigKey.parentMeshDeviceId, deviceId)) + .then(deviceOperator.setConfig(DeviceConfigKey.parentGatewayId, deviceId)) .thenReturn(new ChildrenDeviceSession(childrenDeviceId, session, deviceOperator))) .doOnSuccess(s -> children.computeIfAbsent(deviceId, __ -> new ConcurrentHashMap<>()).put(childrenDeviceId, s)); }); diff --git a/simulator/scripts/demo-device.js b/simulator/scripts/demo-device.js index 67547b5e..5002208b 100644 --- a/simulator/scripts/demo-device.js +++ b/simulator/scripts/demo-device.js @@ -54,6 +54,18 @@ simulator.bindHandler("/read-property", function (message, session) { })); }); +simulator.bindHandler("/children/read-property", function (message, session) { + _logger.info("读取子设备属性:[{}]", message); + session.sendMessage("/children/read-property-reply", JSON.stringify({ + messageId: message.messageId, + deviceId: message.deviceId, + timestamp: new Date().getTime(), + properties: {"temperature": java.util.concurrent.ThreadLocalRandom.current().nextInt(20, 30)}, + success: true + })); +}); + + simulator.bindHandler("/invoke-function", function (message, session) { _logger.info("调用功能:[{}]", message); session.sendMessage("/invoke-function", JSON.stringify({