diff --git a/docker/run-all/docker-compose.yml b/docker/run-all/docker-compose.yml index 87a50183..ce394a4a 100644 --- a/docker/run-all/docker-compose.yml +++ b/docker/run-all/docker-compose.yml @@ -81,9 +81,9 @@ services: - "spring.r2dbc.url=r2dbc:postgresql://postgres:5432/jetlinks" #数据库连接地址 - "spring.r2dbc.username=postgres" - "spring.r2dbc.password=jetlinks" - - "spring.data.elasticsearch.client.reactive.endpoints=elasticsearch:9200" - # - "spring.data.elasticsearch.client.reactive.username=admin" - # - "spring.data.elasticsearch.client.reactive.password=admin" + - "spring.elasticsearch.urls=elasticsearch:9200" + # - "spring.elasticsearch.username=admin" + # - "spring.elasticsearch.password=admin" # - "spring.reactor.debug-agent.enabled=false" #设置为false能提升性能 - "spring.redis.host=redis" - "spring.redis.port=6379" diff --git a/jetlinks-components/configure-component/src/main/java/org/jetlinks/community/configure/device/PersistenceDeviceSessionManager.java b/jetlinks-components/configure-component/src/main/java/org/jetlinks/community/configure/device/PersistenceDeviceSessionManager.java index cbbfa660..d92391db 100644 --- a/jetlinks-components/configure-component/src/main/java/org/jetlinks/community/configure/device/PersistenceDeviceSessionManager.java +++ b/jetlinks-components/configure-component/src/main/java/org/jetlinks/community/configure/device/PersistenceDeviceSessionManager.java @@ -23,6 +23,7 @@ import reactor.core.publisher.Mono; import javax.annotation.Nonnull; import java.io.File; +import java.time.Duration; import java.util.function.Supplier; @Slf4j @@ -35,6 +36,10 @@ public class PersistenceDeviceSessionManager extends ClusterDeviceSessionManager @Setter private String filePath; + @Getter + @Setter + private Duration flushInterval = Duration.ofMinutes(10); + public PersistenceDeviceSessionManager(RpcManager rpcManager) { super(rpcManager); } @@ -72,6 +77,24 @@ public class PersistenceDeviceSessionManager extends ClusterDeviceSessionManager } repository = initStore(filePath); + if (!flushInterval.isZero() && !flushInterval.isNegative()) { + disposable.add( + Flux + .interval(flushInterval) + .onBackpressureDrop() + .concatMap(ignore -> Flux + .fromIterable(localSessions.values()) + .mapNotNull(ref -> { + if (ref.loaded != null && ref.loaded.isWrapFrom(PersistentSession.class)) { + return ref.loaded.unwrap(PersistentSession.class); + } + return null; + }) + .as(this::tryPersistent), 1) + .subscribe() + ); + } + disposable.add( listenEvent(event -> { //移除持久化的会话 @@ -95,6 +118,7 @@ public class PersistenceDeviceSessionManager extends ClusterDeviceSessionManager .map(ref -> ref.loaded.unwrap(PersistentSession.class)) .as(this::tryPersistent) .block(); + repository.store.compactMoveChunks(); repository.store.close(); } @@ -133,10 +157,10 @@ public class PersistenceDeviceSessionManager extends ClusterDeviceSessionManager .toSession(registry.get()) .doOnNext(session -> { log.debug("resume session[{}]", session.getDeviceId()); - localSessions.putIfAbsent(session.getDeviceId(), new DeviceSessionRef( - session.getDeviceId(), - this, - session)); + localSessions.putIfAbsent(session.getDeviceId(), + new DeviceSessionRef(session.getDeviceId(), + this, + session)); }) .onErrorResume((err) -> { log.debug("resume session[{}] error", entity.getDeviceId(), err); diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/parser/DefaultLinkTypeParser.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/parser/DefaultLinkTypeParser.java index 87a5f95f..a0304b51 100644 --- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/parser/DefaultLinkTypeParser.java +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/parser/DefaultLinkTypeParser.java @@ -4,6 +4,7 @@ import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.hswebframework.ezorm.core.param.Term; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; import java.util.List; import java.util.function.Consumer; @@ -15,10 +16,13 @@ import java.util.function.Consumer; @Component public class DefaultLinkTypeParser implements LinkTypeParser { - private TermTypeParser parser = new DefaultTermTypeParser(); + private final TermTypeParser parser = new DefaultTermTypeParser(); @Override public BoolQueryBuilder process(Term term, Consumer consumer, BoolQueryBuilder queryBuilders) { + if (term.getValue() == null && CollectionUtils.isEmpty(term.getTerms())) { + return queryBuilders; + } if (term.getType() == Term.Type.or) { handleOr(queryBuilders, term, consumer); } else { @@ -52,4 +56,4 @@ public class DefaultLinkTypeParser implements LinkTypeParser { } -} +} \ No newline at end of file diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/utils/ElasticSearchConverter.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/utils/ElasticSearchConverter.java index 5f980769..c9d110aa 100644 --- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/utils/ElasticSearchConverter.java +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/utils/ElasticSearchConverter.java @@ -6,9 +6,7 @@ import org.jetlinks.community.elastic.search.index.ElasticSearchIndexMetadata; import org.jetlinks.core.metadata.Converter; import org.jetlinks.core.metadata.DataType; import org.jetlinks.core.metadata.PropertyMetadata; -import org.jetlinks.core.metadata.types.DateTimeType; -import org.jetlinks.core.metadata.types.GeoPoint; -import org.jetlinks.core.metadata.types.GeoType; +import org.jetlinks.core.metadata.types.*; import java.util.Date; import java.util.HashMap; @@ -22,7 +20,9 @@ public class ElasticSearchConverter { return QueryParamTranslator.convertSearchSourceBuilder(queryParam, metadata); } - public static Map convertDataToElastic(Map data, List properties) { + public static Map convertDataToElastic(Map data, + List properties) { + Map newValue = new HashMap<>(data); for (PropertyMetadata property : properties) { DataType type = property.getValueType(); Object val = data.get(property.getId()); @@ -32,37 +32,52 @@ public class ElasticSearchConverter { //处理地理位置类型 if (type instanceof GeoType) { GeoPoint point = ((GeoType) type).convert(val); + Map geoData = new HashMap<>(); geoData.put("lat", point.getLat()); geoData.put("lon", point.getLon()); - data.put(property.getId(), geoData); + + newValue.put(property.getId(), geoData); + } else if (type instanceof GeoShapeType) { + GeoShape shape = ((GeoShapeType) type).convert(val); + if (shape == null) { + throw new UnsupportedOperationException("不支持的GeoShape格式:" + val); + } + Map geoData = new HashMap<>(); + geoData.put("type", shape.getType().name()); + geoData.put("coordinates", shape.getCoordinates()); + newValue.put(property.getId(), geoData); } else if (type instanceof DateTimeType) { Date date = ((DateTimeType) type).convert(val); - data.put(property.getId(), date.getTime()); + newValue.put(property.getId(), date.getTime()); } else if (type instanceof Converter) { - data.put(property.getId(), ((Converter) type).convert(val)); + newValue.put(property.getId(), ((Converter) type).convert(val)); } } - return data; + return newValue; } - public static Map convertDataFromElastic(Map data, List properties) { + public static Map convertDataFromElastic(Map data, + List properties) { + Map newData = new HashMap<>(data); for (PropertyMetadata property : properties) { DataType type = property.getValueType(); - Object val = data.get(property.getId()); + Object val = newData.get(property.getId()); if (val == null) { continue; } //处理地理位置类型 if (type instanceof GeoType) { - data.put(property.getId(), ((GeoType) type).convertToMap(val)); - } else if (type instanceof DateTimeType) { + newData.put(property.getId(), ((GeoType) type).convertToMap(val)); + }else if (type instanceof GeoShapeType) { + newData.put(property.getId(),GeoShape.of(val).toMap()); + } else if (type instanceof DateTimeType) { Date date = ((DateTimeType) type).convert(val); - data.put(property.getId(), date); + newData.put(property.getId(), date); } else if (type instanceof Converter) { - data.put(property.getId(), ((Converter) type).convert(val)); + newData.put(property.getId(), ((Converter) type).convert(val)); } } - return data; + return newData; } -} +} \ No newline at end of file diff --git a/jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/server/vertx/VertxHttpExchange.java b/jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/server/vertx/VertxHttpExchange.java index f2897f75..4ecbdc59 100755 --- a/jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/server/vertx/VertxHttpExchange.java +++ b/jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/server/vertx/VertxHttpExchange.java @@ -2,6 +2,8 @@ package org.jetlinks.community.network.http.server.vertx; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.buffer.UnpooledHeapByteBuf; +import io.netty.util.ReferenceCountUtil; import io.vertx.core.MultiMap; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpMethod; @@ -224,7 +226,12 @@ public class VertxHttpExchange implements HttpExchange, HttpResponse, HttpReques .create(sink -> { Buffer buf = Buffer.buffer(buffer); setResponseDefaultLength(buf.length()); - response.write(buf, v -> sink.success()); + response.write(buf, v -> { + sink.success(); + if(!(buffer instanceof UnpooledHeapByteBuf)){ + ReferenceCountUtil.safeRelease(buffer); + } + }); }); } diff --git a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/VertxMqttClient.java b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/VertxMqttClient.java index c6d40351..dc7cf842 100755 --- a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/VertxMqttClient.java +++ b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/VertxMqttClient.java @@ -1,6 +1,8 @@ package org.jetlinks.community.network.mqtt.client; +import io.netty.buffer.ByteBuf; import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.util.ReferenceCountUtil; import io.vertx.core.buffer.Buffer; import lombok.Getter; import lombok.Setter; @@ -199,19 +201,24 @@ public class VertxMqttClient implements MqttClient { private Mono doPublish(MqttMessage message) { return Mono.create((sink) -> { - Buffer buffer = Buffer.buffer(message.getPayload()); + ByteBuf payload = message.getPayload(); + Buffer buffer = Buffer.buffer(payload); client.publish(message.getTopic(), buffer, MqttQoS.valueOf(message.getQosLevel()), message.isDup(), message.isRetain(), result -> { - if (result.succeeded()) { - log.info("publish mqtt [{}] message success: {}", client.clientId(), message); - sink.success(); - } else { - log.info("publish mqtt [{}] message error : {}", client.clientId(), message, result.cause()); - sink.error(result.cause()); + try { + if (result.succeeded()) { + log.info("publish mqtt [{}] message success: {}", client.clientId(), message); + sink.success(); + } else { + log.info("publish mqtt [{}] message error : {}", client.clientId(), message, result.cause()); + sink.error(result.cause()); + } + } finally { + ReferenceCountUtil.safeRelease(payload); } }); }); diff --git a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.java b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.java index 7b427050..7a1f4cad 100755 --- a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.java +++ b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.java @@ -5,6 +5,7 @@ import io.netty.buffer.Unpooled; import io.netty.handler.codec.DecoderException; import io.netty.handler.codec.mqtt.MqttConnectReturnCode; import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.util.ReferenceCountUtil; import io.vertx.core.buffer.Buffer; import io.vertx.core.net.SocketAddress; import io.vertx.mqtt.MqttEndpoint; @@ -32,6 +33,7 @@ import java.time.Duration; import java.util.Objects; import java.util.Optional; import java.util.function.Consumer; +import java.util.function.Function; import java.util.stream.Collectors; @Slf4j @@ -41,7 +43,7 @@ class VertxMqttConnection implements MqttConnection { private long keepAliveTimeoutMs; @Getter private long lastPingTime = System.currentTimeMillis(); - private volatile boolean closed = false, accepted = false, autoAckSub = true, autoAckUnSub = true, autoAckMsg = false; + private volatile boolean closed = false, accepted = false, autoAckSub = true, autoAckUnSub = true, autoAckMsg = true; private static final MqttAuth emptyAuth = new MqttAuth() { @Override public String getUsername() { @@ -259,19 +261,22 @@ class VertxMqttConnection implements MqttConnection { ping(); return Mono .create(sink -> { - Buffer buffer = Buffer.buffer(message.getPayload()); - endpoint.publish(message.getTopic(), - buffer, - MqttQoS.valueOf(message.getQosLevel()), - message.isDup(), - message.isRetain(), - result -> { - if (result.succeeded()) { - sink.success(); - } else { - sink.error(result.cause()); - } - } + ByteBuf buf = message.getPayload(); + Buffer buffer = Buffer.buffer(buf); + endpoint.publish( + message.getTopic(), + buffer, + MqttQoS.valueOf(message.getQosLevel()), + message.isDup(), + message.isRetain(), + result -> { + if (result.succeeded()) { + sink.success(); + } else { + sink.error(result.cause()); + } + ReferenceCountUtil.safeRelease(buf); + } ); }); } diff --git a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClient.java b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClient.java index 13eb2fca..75aad9e5 100644 --- a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClient.java +++ b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClient.java @@ -1,5 +1,7 @@ package org.jetlinks.community.network.tcp.client; +import io.netty.buffer.ByteBuf; +import io.netty.util.ReferenceCountUtil; import io.vertx.core.buffer.Buffer; import io.vertx.core.net.NetClient; import io.vertx.core.net.NetSocket; @@ -78,9 +80,11 @@ public class VertxTcpClient implements TcpClient { sink.error(new SocketException("socket closed")); return; } - Buffer buffer = Buffer.buffer(message.getPayload()); + ByteBuf buf = message.getPayload(); + Buffer buffer = Buffer.buffer(buf); int len = buffer.length(); socket.write(buffer, r -> { + ReferenceCountUtil.safeRelease(buf); if (r.succeeded()) { keepAlive(); sink.success(); diff --git a/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/DeviceMessageSendTaskExecutorProvider.java b/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/DeviceMessageSendTaskExecutorProvider.java index 3082ee9d..07490b75 100755 --- a/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/DeviceMessageSendTaskExecutorProvider.java +++ b/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/DeviceMessageSendTaskExecutorProvider.java @@ -31,6 +31,7 @@ import org.jetlinks.rule.engine.api.task.TaskExecutor; import org.jetlinks.rule.engine.api.task.TaskExecutorProvider; import org.jetlinks.rule.engine.defaults.FunctionTaskExecutor; import org.reactivestreams.Publisher; +import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; import reactor.core.publisher.Flux; @@ -48,6 +49,7 @@ import java.util.stream.Collectors; @AllArgsConstructor +@Component public class DeviceMessageSendTaskExecutorProvider implements TaskExecutorProvider { public static final String EXECUTOR = "device-message-sender"; diff --git a/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/ThingPropertyDetail.java b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/ThingPropertyDetail.java index b028bd42..acb9a176 100644 --- a/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/ThingPropertyDetail.java +++ b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/ThingPropertyDetail.java @@ -80,6 +80,10 @@ public class ThingPropertyDetail implements ThingProperty { return this; } + public ThingPropertyDetail createTime(long createTime) { + this.createTime = createTime; + return this; + } public ThingPropertyDetail formatTime(String formatTime) { this.formatTime = formatTime; diff --git a/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/operations/ColumnModeQueryOperationsBase.java b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/operations/ColumnModeQueryOperationsBase.java index d5fe05b6..d3385820 100644 --- a/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/operations/ColumnModeQueryOperationsBase.java +++ b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/operations/ColumnModeQueryOperationsBase.java @@ -3,6 +3,7 @@ package org.jetlinks.community.things.data.operations; import org.hswebframework.ezorm.core.dsl.Query; import org.hswebframework.web.api.crud.entity.PagerResult; import org.hswebframework.web.api.crud.entity.QueryParamEntity; +import org.jetlinks.community.things.data.ThingsDataConstants; import org.jetlinks.core.metadata.PropertyMetadata; import org.jetlinks.core.things.ThingMetadata; import org.jetlinks.core.things.ThingsRegistry; @@ -60,6 +61,8 @@ public abstract class ColumnModeQueryOperationsBase extends AbstractQueryOperati .next(ThingPropertyDetail .of(value, entry.getValue()) .thingId(data.getString(metricBuilder.getThingIdProperty(), null)) + .timestamp(data.getTimestamp()) + .createTime(data.getLong(ThingsDataConstants.COLUMN_CREATE_TIME,data.getTimestamp())) .generateId() )); } @@ -94,6 +97,8 @@ public abstract class ColumnModeQueryOperationsBase extends AbstractQueryOperati data -> ThingPropertyDetail .of(data.get(property).orElse(null), properties.get(property)) .thingId(data.getString(metricBuilder.getThingIdProperty(), null)) + .timestamp(data.getTimestamp()) + .createTime(data.getLong(ThingsDataConstants.COLUMN_CREATE_TIME, data.getTimestamp())) .generateId() ); } diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DeviceTagEntity.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DeviceTagEntity.java index ca2f8b88..98eb7c36 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DeviceTagEntity.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DeviceTagEntity.java @@ -19,6 +19,7 @@ import javax.persistence.Column; import javax.persistence.Index; import javax.persistence.Table; import javax.validation.constraints.NotBlank; +import javax.validation.constraints.NotNull; import java.util.Date; @Getter @@ -45,8 +46,8 @@ public class DeviceTagEntity extends GenericEntity { private String name; @Column(length = 256, nullable = false) - @NotBlank(message = "[value]不能为空", groups = CreateGroup.class) - @Length(max = 256, min = 1, message = "[value]长度不能大于256", groups = CreateGroup.class) + @NotNull(message = "[value]不能为空", groups = CreateGroup.class) + @Length(max = 256, message = "[value]长度不能大于256", groups = CreateGroup.class) @Schema(description = "标签值") private String value; diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/DeviceMessageBusinessHandler.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/DeviceMessageBusinessHandler.java index 4c9b3c63..51db9a29 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/DeviceMessageBusinessHandler.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/DeviceMessageBusinessHandler.java @@ -1,15 +1,21 @@ package org.jetlinks.community.device.service; import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.MapUtils; import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository; import org.jetlinks.community.PropertyConstants; +import org.jetlinks.community.buffer.PersistenceBuffer; +import org.jetlinks.community.configure.cluster.Cluster; import org.jetlinks.community.device.entity.DeviceInstanceEntity; import org.jetlinks.community.device.entity.DeviceTagEntity; import org.jetlinks.community.device.enums.DeviceFeature; import org.jetlinks.community.device.enums.DeviceState; import org.jetlinks.community.gateway.annotation.Subscribe; +import org.jetlinks.community.utils.ErrorUtils; import org.jetlinks.core.device.DeviceConfigKey; import org.jetlinks.core.device.DeviceOperator; import org.jetlinks.core.device.DeviceRegistry; @@ -18,17 +24,26 @@ import org.jetlinks.core.event.Subscription; import org.jetlinks.core.message.*; import org.jetlinks.core.metadata.DeviceMetadata; import org.jetlinks.core.utils.FluxUtils; +import org.jetlinks.core.utils.Reactors; import org.jetlinks.reactor.ql.utils.CastUtils; import org.jetlinks.supports.official.JetLinksDeviceMetadataCodec; +import org.springframework.dao.QueryTimeoutException; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; +import reactor.core.Disposable; +import reactor.core.Disposables; import reactor.core.publisher.BufferOverflowStrategy; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; import java.time.Duration; import java.util.Date; import java.util.HashMap; @@ -51,6 +66,8 @@ public class DeviceMessageBusinessHandler { private final EventBus eventBus; + private final Disposable.Composite disposable = Disposables.composite(); + /** * 自动注册设备信息 *

@@ -285,6 +302,40 @@ public class DeviceMessageBusinessHandler { } + + @AllArgsConstructor + @NoArgsConstructor + @Getter + @Setter + public static class StateBuf implements Externalizable { + //有效期一小时 + static long expires = Duration.ofHours(1).toMillis(); + + private String id; + private long time; + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeUTF(id); + out.writeLong(time); + } + + @Override + public void readExternal(ObjectInput in) throws IOException { + id = in.readUTF(); + time = in.readLong(); + } + + public boolean isEffective() { + return System.currentTimeMillis() - time < expires; + } + } + + @PreDestroy + public void shutdown() { + disposable.dispose(); + } + @PostConstruct public void init() { @@ -295,22 +346,36 @@ public class DeviceMessageBusinessHandler { .justLocal()//只订阅本地 .build(); - //订阅设备上下线消息,同步数据库中的设备状态, - //最小间隔800毫秒,最大缓冲数量500,最长间隔2秒. - //如果2条消息间隔大于0.8秒则不缓冲直接更新 - //否则缓冲,数量超过500后批量更新 - //无论缓冲区是否超过500条,都每2秒更新一次. - FluxUtils.bufferRate(eventBus - .subscribe(subscription, DeviceMessage.class) - .map(DeviceMessage::getDeviceId), - 800, Integer.getInteger("device.state.sync.batch", 500), Duration.ofSeconds(2)) - .onBackpressureBuffer(64, - list -> log.warn("无法处理更多设备状态同步!"), - BufferOverflowStrategy.DROP_OLDEST) - .publishOn(Schedulers.boundedElastic(), 64) - .concatMap(list -> deviceService.syncStateBatch(Flux.just(list), false).map(List::size)) - .onErrorContinue((err, obj) -> log.error(err.getMessage(), err)) - .subscribe((i) -> log.info("同步设备状态成功:{}", i)); + //缓冲同步设备上线信息,在突发大量上下线的情况,减少数据库的压力 + PersistenceBuffer buffer = + new PersistenceBuffer<>( + "./data/device-state-buffer", + "device-state.queue", + StateBuf::new, + flux -> deviceService + .syncStateBatch(flux + .filter(StateBuf::isEffective) + .map(StateBuf::getId) + .distinct() + .collectList() + .flux(), false) + .then(Reactors.ALWAYS_FALSE)) + .name("device-state-synchronizer") + .parallelism(1) + .bufferTimeout(Duration.ofSeconds(1)) + .retryWhenError(e -> ErrorUtils + .hasException(e, + IOException.class, + QueryTimeoutException.class)) + .bufferSize(1000); + + buffer.start(); + + disposable.add(eventBus + .subscribe(subscription, DeviceMessage.class) + .subscribe(msg -> buffer.write(new StateBuf(msg.getDeviceId(), msg.getTimestamp())))); + + disposable.add(buffer); } diff --git a/jetlinks-manager/network-manager/pom.xml b/jetlinks-manager/network-manager/pom.xml index 6d408b3f..55dec7ad 100644 --- a/jetlinks-manager/network-manager/pom.xml +++ b/jetlinks-manager/network-manager/pom.xml @@ -86,7 +86,11 @@ compile - + + io.opentelemetry + opentelemetry-semconv + 1.12.0-alpha + diff --git a/jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/debug/DeviceDebugSubscriptionProvider.java b/jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/debug/DeviceDebugSubscriptionProvider.java new file mode 100644 index 00000000..a5eb3beb --- /dev/null +++ b/jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/debug/DeviceDebugSubscriptionProvider.java @@ -0,0 +1,279 @@ +package org.jetlinks.community.network.manager.debug; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.serializer.SerializerFeature; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import lombok.*; +import lombok.extern.slf4j.Slf4j; +import org.hswebframework.web.id.IDGenerator; +import org.jetlinks.core.device.DeviceRegistry; +import org.jetlinks.core.event.EventBus; +import org.jetlinks.core.event.Subscription; +import org.jetlinks.core.trace.DeviceTracer; +import org.jetlinks.core.trace.EventBusSpanExporter; +import org.jetlinks.core.trace.ProtocolTracer; +import org.jetlinks.core.trace.TraceHolder; +import org.jetlinks.core.trace.data.SpanDataInfo; +import org.jetlinks.core.utils.TopicUtils; +import org.jetlinks.community.gateway.annotation.Subscribe; +import org.jetlinks.community.gateway.external.SubscribeRequest; +import org.jetlinks.community.gateway.external.SubscriptionProvider; +import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; +import reactor.core.Disposable; +import reactor.core.Disposables; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +@Component +@AllArgsConstructor +@Slf4j +public class DeviceDebugSubscriptionProvider implements SubscriptionProvider { + private final EventBus eventBus; + + private final DeviceRegistry registry; + + @Override + public String id() { + return "device-debug"; + } + + @Override + public String name() { + return "设备诊断"; + } + + @Override + public String[] getTopicPattern() { + return new String[]{"/debug/device/*/trace"}; + } + + @Override + public Flux subscribe(SubscribeRequest request) { + String deviceId = TopicUtils + .getPathVariables("/debug/device/{deviceId}/trace", request.getTopic()) + .get("deviceId"); + return startDebug(deviceId); + } + + /** + * @param deviceId deviceId + * @see DeviceTracer + * @see EventBusSpanExporter + */ + Flux startDebug(String deviceId) { + if (TraceHolder.isDisabled()) { + return Flux + .just(TraceData + .of(TraceDataType.log, + true, + "0", + "error", + "链路追踪功能已禁用,请联系管理员.", + System.currentTimeMillis(), + System.currentTimeMillis())); + } + //订阅设备跟踪信息 + return Flux + .merge(this + .getTraceData(DeviceTracer.SpanName.operation(deviceId, "*")) + .flatMap(this::convertDeviceTrace), + registry + .getDevice(deviceId) + .flatMap(device -> device + .getProtocol() + .map(pro -> ProtocolTracer.SpanName.operation(pro.getId(), "*"))) + .flatMapMany(this::getTraceData) + .flatMap(this::convertProtocolTrace) + ); + } + + private Mono convertProtocolTrace(SpanDataInfo traceData) { + String errorInfo = traceData + .getEvent(SemanticAttributes.EXCEPTION_EVENT_NAME) + .flatMap(event -> event.getAttribute(SemanticAttributes.EXCEPTION_STACKTRACE)) + .orElse(null); + String operation = traceData.getName().substring(traceData.getName().lastIndexOf("/") + 1); + //协议跟踪只展示错误信息,因为协议无法定位到具体的设备,如果现实全部跟踪信息可能会有很多信息 + if (StringUtils.hasText(errorInfo)) { + return Mono.just(TraceData + .of(TraceDataType.log, + true, + traceData.getTraceId(), + operation, + getDeviceTraceDetail(traceData), + traceData.getStartWithNanos() / 1000 / 1000, + traceData.getStartWithNanos() / 1000 / 1000 + )); + } + return Mono.empty(); + + } + + private boolean hasError(SpanDataInfo data) { + return data + .getEvent(SemanticAttributes.EXCEPTION_EVENT_NAME) + .isPresent(); + } + + private Object getDeviceTraceDetail(SpanDataInfo data) { + + String message = data + .getAttribute(DeviceTracer.SpanKey.message) + .orElse(null); + + String response = data + .getAttribute(DeviceTracer.SpanKey.response) + .orElse(null); + + if (StringUtils.hasText(message)) { + if (StringUtils.hasText(response)) { + return String.join("\n\n", response); + } + return message; + } + + if (StringUtils.hasText(response)) { + return response; + } + + String errorInfo = data + .getEvent(SemanticAttributes.EXCEPTION_EVENT_NAME) + .flatMap(event -> event.getAttribute(SemanticAttributes.EXCEPTION_STACKTRACE)) + .orElse(null); + + if (StringUtils.hasText(errorInfo)) { + return errorInfo; + } + + return JSON.toJSONString(data.getAttributes(), SerializerFeature.PrettyFormat); + + } + + private Mono convertDeviceTrace(SpanDataInfo traceData) { + String name = traceData.getName(); + String operation = name.substring(name.lastIndexOf("/") + 1); + return Mono.just(TraceData + .of(TraceDataType.data, + hasError(traceData), + traceData.getTraceId(), + operation, + getDeviceTraceDetail(traceData), + traceData.getStartWithNanos() / 1000 / 1000, + traceData.getStartWithNanos() / 1000 / 1000 + )); + } + + private Flux getTraceData(String name) { + //启用跟踪 + Disposable disposable = enableSpan(name); + + return eventBus + .subscribe(Subscription + .builder() + .subscriberId("device_debug_tracer") + .topics("/trace/*" + name) + .broker() + .local() + .build(), + SpanDataInfo.class) + //完成时关闭跟踪 + .doFinally(s -> disposable.dispose()); + } + + private Disposable enableSpan(String name) { + Disposable.Composite disposable = Disposables.composite(); + + String id = IDGenerator.UUID.generate(); + + eventBus + .publish("/_sys/_trace/opt", new TraceOpt(id, name, true)) + .subscribe(); + + disposable.add(() -> eventBus + .publish("/_sys/_trace/opt", new TraceOpt(id, name, false)) + .subscribe()); + + return disposable; + } + + @Subscribe(value = "/_sys/_trace/opt", features = {Subscription.Feature.broker, Subscription.Feature.local}) + public Mono handleTraceEnable(TraceOpt opt) { + if (opt.enable) { + log.debug("enable trace {} id:{}", opt.span, opt.id); + TraceHolder.enable(opt.span, opt.id); + } else { + log.debug("remove trace {} id:{}", opt.span, opt.id); + TraceHolder.removeEnabled(opt.span, opt.id); + } + return Mono.empty(); + } + + @Getter + @Setter + @AllArgsConstructor + @NoArgsConstructor + public static class TraceOpt { + private String id; + private String span; + private boolean enable; + } + + public enum TraceDataType { + /** + * 和设备有关联的数据 + */ + data, + /** + * 和设备没有关联的日志信息 + */ + log + } + + @Setter + @Getter + @AllArgsConstructor(staticName = "of") + @NoArgsConstructor + @ToString + public static class TraceData implements Serializable { + + static Set downstreamOperation = new HashSet<>( + Arrays.asList( + "downstream", "encode", "request" + ) + ); + private static final long serialVersionUID = 1L; + // 跟踪数据类型 + private TraceDataType type; + //是否有错误信息 + private boolean error; + // 跟踪ID + private String traceId; + /** + * @see DeviceTracer.SpanName + * 操作. encode,decode + */ + private String operation; + // 数据内容 + private Object detail; + //开始时间 毫秒 + private long startTime; + //结束时间 毫秒 + private long endTime; + + //是否上行操作 + public boolean isUpstream() { + return !isDownstream(); + } + + //是否下行操作 + public boolean isDownstream() { + return operation != null && downstreamOperation.contains(operation); + } + } +} diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/measurement/AlarmRecordRankMeasurement.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/measurement/AlarmRecordRankMeasurement.java index a68cada1..739d32fa 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/measurement/AlarmRecordRankMeasurement.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/measurement/AlarmRecordRankMeasurement.java @@ -95,9 +95,9 @@ public class AlarmRecordRankMeasurement extends StaticMeasurement { Comparator comparator; if (Objects.equals(parameter.getString("order",""), "asc")){ - comparator = Comparator.comparingInt(d-> d.getInt("count", 0)); + comparator = Comparator.comparingLong(d-> d.getLong("count", 0L)); }else { - comparator = Comparator.comparingInt(d-> d.getInt("count", 0)).reversed(); + comparator = Comparator.comparingLong(d-> d.getLong("count", 0L)).reversed(); } AggregationQueryParam param = createQueryParam(parameter); @@ -122,11 +122,11 @@ public class AlarmRecordRankMeasurement extends StaticMeasurement { private String targetName; - private Integer count; + private long count; public SimpleResult(AggregationData data) { String targetId = data.getString("targetId", ""); - this.setCount(data.getInt("count", 0)); + this.setCount(data.getLong("count", 0L)); this.setTargetName(data.getString("targetName", targetId)); this.setTargetId(data.getString("targetId", "")); } diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/DeviceTrigger.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/DeviceTrigger.java index 0972f0ba..05e14acf 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/DeviceTrigger.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/DeviceTrigger.java @@ -264,9 +264,9 @@ public class DeviceTrigger extends DeviceSelectorSpec implements Serializable { String column; if (arr.length > 3 && arr[0].equals("properties")) { - column = "t['" + createColumnAlias(term.getColumn()) + "." + String.join(".", Arrays.copyOfRange(arr, 2, arr.length - 1)) + "']"; + column = "t['" + createColumnAlias(term.getColumn(), false) + "." + String.join(".", Arrays.copyOfRange(arr, 2, arr.length - 1)) + "']"; } else { - column = "t." + createColumnAlias(term.getColumn()); + column = "t['" + createColumnAlias(term.getColumn(), false) + "']"; } List values = TermValue.of(term); @@ -315,9 +315,9 @@ public class DeviceTrigger extends DeviceSelectorSpec implements Serializable { String property = arr[1]; switch (valueType) { case current: - return "this.properties." + property; + return "this['properties." + property + "']"; case recent: - return "coalesce(this.properties." + property + ",device.property.recent(deviceId,'" + property + "',timestamp))"; + return "coalesce(this['properties." + property + "']" + ",device.property.recent(deviceId,'" + property + "',timestamp))"; case last: return "device.property.recent(deviceId,'" + property + "',timestamp)"; } @@ -328,21 +328,28 @@ public class DeviceTrigger extends DeviceSelectorSpec implements Serializable { return "this['" + String.join(".", Arrays.copyOfRange(arr, 1, arr.length)) + "']"; } - static String createColumnAlias(String column) { + static String createColumnAlias(String column, boolean wrapColumn) { if (!column.contains(".")) { return wrapColumnName(column); } String[] arr = column.split("[.]"); + String alias; //properties.temp.current if ("properties".equals(arr[0])) { String property = arr[1]; - return wrapColumnName(property + "_" + arr[arr.length - 1]); + alias = property + "_" + arr[arr.length - 1]; } else { if (arr.length > 1) { - return wrapColumnName(String.join("_", Arrays.copyOfRange(arr, 1, arr.length))); + alias = String.join("_", Arrays.copyOfRange(arr, 1, arr.length)); + } else { + alias = column.replace(".", "_"); } - return wrapColumnName(column.replace(".", "_")); } + return wrapColumn ? wrapColumnName(alias) : alias; + } + + static String createColumnAlias(String column) { + return createColumnAlias(column, true); } static String wrapColumnName(String column) { @@ -433,4 +440,4 @@ public class DeviceTrigger extends DeviceSelectorSpec implements Serializable { } -} +} \ No newline at end of file diff --git a/jetlinks-standalone/src/main/resources/application-embedded.yml b/jetlinks-standalone/src/main/resources/application-embedded.yml index 34386f53..8585e8ff 100644 --- a/jetlinks-standalone/src/main/resources/application-embedded.yml +++ b/jetlinks-standalone/src/main/resources/application-embedded.yml @@ -14,14 +14,12 @@ spring: password: pool: max-size: 32 - data: - elasticsearch: - client: - reactive: - endpoints: ${elasticsearch.client.host}:${elasticsearch.client.port} - max-in-memory-size: 100MB - socket-timeout: ${elasticsearch.client.socket-timeout} - connection-timeout: ${elasticsearch.client.socket-timeout} + elasticsearch: + uris: localhost:9201 + socket-timeout: 10s + connection-timeout: 15s + webclient: + max-in-memory-size: 100MB easyorm: default-schema: PUBLIC # 数据库默认的schema dialect: h2 #数据库方言 @@ -31,13 +29,6 @@ elasticsearch: data-path: ./data/elasticsearch port: 9201 host: 0.0.0.0 - client: - host: localhost - port: 9201 - max-conn-total: 128 - connect-timeout: 5000 - socket-timeout: 5000 - connection-request-timeout: 8000 index: default-strategy: time-by-month #默认es的索引按月进行分表, direct则为直接操作索引. settings: diff --git a/pom.xml b/pom.xml index 3c487d8b..4f905307 100644 --- a/pom.xml +++ b/pom.xml @@ -436,6 +436,18 @@ + + io.swagger.core.v3 + swagger-annotations + ${swagger.version} + + + + org.apache.commons + commons-text + 1.10.0 + + @@ -444,7 +456,6 @@ org.apache.commons commons-text - 1.9 @@ -480,14 +491,14 @@ org.testcontainers testcontainers - 1.17.2 + 1.17.4 test org.testcontainers junit-jupiter - 1.17.2 + 1.17.4 test