优化设备配置同步

This commit is contained in:
zhouhao 2020-03-25 21:48:54 +08:00
parent c0a604e8a4
commit 3f50c3e7ee
2 changed files with 39 additions and 21 deletions

View File

@ -8,12 +8,19 @@ import org.hswebframework.web.api.crud.entity.RecordCreationEntity;
import org.hswebframework.web.crud.generator.Generators;
import org.hswebframework.web.validator.CreateGroup;
import org.jetlinks.community.device.enums.DeviceState;
import org.jetlinks.core.device.DeviceConfigKey;
import org.jetlinks.core.device.DeviceInfo;
import org.jetlinks.core.device.DeviceOperator;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Mono;
import javax.persistence.Column;
import javax.persistence.GeneratedValue;
import javax.persistence.Table;
import javax.validation.constraints.NotBlank;
import java.sql.JDBCType;
import java.util.Collections;
import java.util.Map;
@Getter
@ -37,7 +44,7 @@ public class DeviceInstanceEntity extends GenericEntity<String> implements Recor
private String describe;
@Comment("产品id")
@Column(name = "product_id",length = 32)
@Column(name = "product_id", length = 32)
@NotBlank(message = "产品ID不能为空", groups = CreateGroup.class)
private String productId;
@ -84,4 +91,20 @@ public class DeviceInstanceEntity extends GenericEntity<String> implements Recor
@Column(name = "parent_id", length = 32)
@Comment("父级设备ID")
private String parentId;
public DeviceInfo toDeviceInfo() {
DeviceInfo info = org.jetlinks.core.device.DeviceInfo.builder()
.id(this.getId())
.productId(this.getProductId())
.build()
.addConfig(DeviceConfigKey.parentGatewayId, this.getParentId());
if (!CollectionUtils.isEmpty(configuration)) {
configuration.forEach(info::addConfig);
}
if (StringUtils.hasText(deriveMetadata)) {
info.addConfig(DeviceConfigKey.metadata, deriveMetadata);
}
return info;
}
}

View File

@ -166,26 +166,21 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
*/
public Flux<DeviceDeployResult> deploy(Flux<DeviceInstanceEntity> flux) {
return flux
.flatMap(instance ->
registry.register(org.jetlinks.core.device.DeviceInfo.builder()
.id(instance.getId())
.productId(instance.getProductId())
.build()
.addConfig(DeviceConfigKey.parentGatewayId, instance.getParentId()))
//设置其他配置信息
.flatMap(deviceOperator -> deviceOperator.getState()
.flatMap(r -> {
if (r.equals(org.jetlinks.core.device.DeviceState.unknown) ||
r.equals(org.jetlinks.core.device.DeviceState.noActive)) {
instance.setState(DeviceState.offline);
return deviceOperator.putState(org.jetlinks.core.device.DeviceState.offline);
}
instance.setState(DeviceState.of(r));
return Mono.just(true);
})
.flatMap(success -> success ? Mono.just(deviceOperator) : Mono.empty())
)
.thenReturn(instance))
.flatMap(instance -> registry
.register(instance.toDeviceInfo())
.flatMap(deviceOperator -> deviceOperator.getState()
.flatMap(r -> {
if (r.equals(org.jetlinks.core.device.DeviceState.unknown) ||
r.equals(org.jetlinks.core.device.DeviceState.noActive)) {
instance.setState(DeviceState.offline);
return deviceOperator.putState(org.jetlinks.core.device.DeviceState.offline);
}
instance.setState(DeviceState.of(r));
return Mono.just(true);
})
.flatMap(success -> success ? Mono.just(deviceOperator) : Mono.empty())
)
.thenReturn(instance))
.buffer(50)
.publishOn(Schedulers.single())
.flatMap(all -> Flux.fromIterable(all)