Merge remote-tracking branch 'origin/2.0' into 2.0

This commit is contained in:
zhouhao 2022-11-10 09:46:26 +08:00
commit db89b37be8
19 changed files with 533 additions and 98 deletions

View File

@ -81,9 +81,9 @@ services:
- "spring.r2dbc.url=r2dbc:postgresql://postgres:5432/jetlinks" #数据库连接地址
- "spring.r2dbc.username=postgres"
- "spring.r2dbc.password=jetlinks"
- "spring.data.elasticsearch.client.reactive.endpoints=elasticsearch:9200"
# - "spring.data.elasticsearch.client.reactive.username=admin"
# - "spring.data.elasticsearch.client.reactive.password=admin"
- "spring.elasticsearch.urls=elasticsearch:9200"
# - "spring.elasticsearch.username=admin"
# - "spring.elasticsearch.password=admin"
# - "spring.reactor.debug-agent.enabled=false" #设置为false能提升性能
- "spring.redis.host=redis"
- "spring.redis.port=6379"

View File

@ -23,6 +23,7 @@ import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.io.File;
import java.time.Duration;
import java.util.function.Supplier;
@Slf4j
@ -35,6 +36,10 @@ public class PersistenceDeviceSessionManager extends ClusterDeviceSessionManager
@Setter
private String filePath;
@Getter
@Setter
private Duration flushInterval = Duration.ofMinutes(10);
public PersistenceDeviceSessionManager(RpcManager rpcManager) {
super(rpcManager);
}
@ -72,6 +77,24 @@ public class PersistenceDeviceSessionManager extends ClusterDeviceSessionManager
}
repository = initStore(filePath);
if (!flushInterval.isZero() && !flushInterval.isNegative()) {
disposable.add(
Flux
.interval(flushInterval)
.onBackpressureDrop()
.concatMap(ignore -> Flux
.fromIterable(localSessions.values())
.mapNotNull(ref -> {
if (ref.loaded != null && ref.loaded.isWrapFrom(PersistentSession.class)) {
return ref.loaded.unwrap(PersistentSession.class);
}
return null;
})
.as(this::tryPersistent), 1)
.subscribe()
);
}
disposable.add(
listenEvent(event -> {
//移除持久化的会话
@ -95,6 +118,7 @@ public class PersistenceDeviceSessionManager extends ClusterDeviceSessionManager
.map(ref -> ref.loaded.unwrap(PersistentSession.class))
.as(this::tryPersistent)
.block();
repository.store.compactMoveChunks();
repository.store.close();
}
@ -133,10 +157,10 @@ public class PersistenceDeviceSessionManager extends ClusterDeviceSessionManager
.toSession(registry.get())
.doOnNext(session -> {
log.debug("resume session[{}]", session.getDeviceId());
localSessions.putIfAbsent(session.getDeviceId(), new DeviceSessionRef(
session.getDeviceId(),
this,
session));
localSessions.putIfAbsent(session.getDeviceId(),
new DeviceSessionRef(session.getDeviceId(),
this,
session));
})
.onErrorResume((err) -> {
log.debug("resume session[{}] error", entity.getDeviceId(), err);

View File

@ -4,6 +4,7 @@ import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.hswebframework.ezorm.core.param.Term;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.function.Consumer;
@ -15,10 +16,13 @@ import java.util.function.Consumer;
@Component
public class DefaultLinkTypeParser implements LinkTypeParser {
private TermTypeParser parser = new DefaultTermTypeParser();
private final TermTypeParser parser = new DefaultTermTypeParser();
@Override
public BoolQueryBuilder process(Term term, Consumer<Term> consumer, BoolQueryBuilder queryBuilders) {
if (term.getValue() == null && CollectionUtils.isEmpty(term.getTerms())) {
return queryBuilders;
}
if (term.getType() == Term.Type.or) {
handleOr(queryBuilders, term, consumer);
} else {
@ -52,4 +56,4 @@ public class DefaultLinkTypeParser implements LinkTypeParser {
}
}
}

View File

@ -6,9 +6,7 @@ import org.jetlinks.community.elastic.search.index.ElasticSearchIndexMetadata;
import org.jetlinks.core.metadata.Converter;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.core.metadata.types.DateTimeType;
import org.jetlinks.core.metadata.types.GeoPoint;
import org.jetlinks.core.metadata.types.GeoType;
import org.jetlinks.core.metadata.types.*;
import java.util.Date;
import java.util.HashMap;
@ -22,7 +20,9 @@ public class ElasticSearchConverter {
return QueryParamTranslator.convertSearchSourceBuilder(queryParam, metadata);
}
public static Map<String, Object> convertDataToElastic(Map<String, Object> data, List<PropertyMetadata> properties) {
public static Map<String, Object> convertDataToElastic(Map<String, Object> data,
List<PropertyMetadata> properties) {
Map<String, Object> newValue = new HashMap<>(data);
for (PropertyMetadata property : properties) {
DataType type = property.getValueType();
Object val = data.get(property.getId());
@ -32,37 +32,52 @@ public class ElasticSearchConverter {
//处理地理位置类型
if (type instanceof GeoType) {
GeoPoint point = ((GeoType) type).convert(val);
Map<String, Object> geoData = new HashMap<>();
geoData.put("lat", point.getLat());
geoData.put("lon", point.getLon());
data.put(property.getId(), geoData);
newValue.put(property.getId(), geoData);
} else if (type instanceof GeoShapeType) {
GeoShape shape = ((GeoShapeType) type).convert(val);
if (shape == null) {
throw new UnsupportedOperationException("不支持的GeoShape格式:" + val);
}
Map<String, Object> geoData = new HashMap<>();
geoData.put("type", shape.getType().name());
geoData.put("coordinates", shape.getCoordinates());
newValue.put(property.getId(), geoData);
} else if (type instanceof DateTimeType) {
Date date = ((DateTimeType) type).convert(val);
data.put(property.getId(), date.getTime());
newValue.put(property.getId(), date.getTime());
} else if (type instanceof Converter) {
data.put(property.getId(), ((Converter<?>) type).convert(val));
newValue.put(property.getId(), ((Converter<?>) type).convert(val));
}
}
return data;
return newValue;
}
public static Map<String, Object> convertDataFromElastic(Map<String, Object> data, List<PropertyMetadata> properties) {
public static Map<String, Object> convertDataFromElastic(Map<String, Object> data,
List<PropertyMetadata> properties) {
Map<String, Object> newData = new HashMap<>(data);
for (PropertyMetadata property : properties) {
DataType type = property.getValueType();
Object val = data.get(property.getId());
Object val = newData.get(property.getId());
if (val == null) {
continue;
}
//处理地理位置类型
if (type instanceof GeoType) {
data.put(property.getId(), ((GeoType) type).convertToMap(val));
} else if (type instanceof DateTimeType) {
newData.put(property.getId(), ((GeoType) type).convertToMap(val));
}else if (type instanceof GeoShapeType) {
newData.put(property.getId(),GeoShape.of(val).toMap());
} else if (type instanceof DateTimeType) {
Date date = ((DateTimeType) type).convert(val);
data.put(property.getId(), date);
newData.put(property.getId(), date);
} else if (type instanceof Converter) {
data.put(property.getId(), ((Converter<?>) type).convert(val));
newData.put(property.getId(), ((Converter<?>) type).convert(val));
}
}
return data;
return newData;
}
}
}

View File

@ -2,6 +2,8 @@ package org.jetlinks.community.network.http.server.vertx;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledHeapByteBuf;
import io.netty.util.ReferenceCountUtil;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpMethod;
@ -224,7 +226,12 @@ public class VertxHttpExchange implements HttpExchange, HttpResponse, HttpReques
.<Void>create(sink -> {
Buffer buf = Buffer.buffer(buffer);
setResponseDefaultLength(buf.length());
response.write(buf, v -> sink.success());
response.write(buf, v -> {
sink.success();
if(!(buffer instanceof UnpooledHeapByteBuf)){
ReferenceCountUtil.safeRelease(buffer);
}
});
});
}

View File

@ -1,6 +1,8 @@
package org.jetlinks.community.network.mqtt.client;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.util.ReferenceCountUtil;
import io.vertx.core.buffer.Buffer;
import lombok.Getter;
import lombok.Setter;
@ -199,19 +201,24 @@ public class VertxMqttClient implements MqttClient {
private Mono<Void> doPublish(MqttMessage message) {
return Mono.create((sink) -> {
Buffer buffer = Buffer.buffer(message.getPayload());
ByteBuf payload = message.getPayload();
Buffer buffer = Buffer.buffer(payload);
client.publish(message.getTopic(),
buffer,
MqttQoS.valueOf(message.getQosLevel()),
message.isDup(),
message.isRetain(),
result -> {
if (result.succeeded()) {
log.info("publish mqtt [{}] message success: {}", client.clientId(), message);
sink.success();
} else {
log.info("publish mqtt [{}] message error : {}", client.clientId(), message, result.cause());
sink.error(result.cause());
try {
if (result.succeeded()) {
log.info("publish mqtt [{}] message success: {}", client.clientId(), message);
sink.success();
} else {
log.info("publish mqtt [{}] message error : {}", client.clientId(), message, result.cause());
sink.error(result.cause());
}
} finally {
ReferenceCountUtil.safeRelease(payload);
}
});
});

View File

@ -5,6 +5,7 @@ import io.netty.buffer.Unpooled;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.util.ReferenceCountUtil;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.SocketAddress;
import io.vertx.mqtt.MqttEndpoint;
@ -32,6 +33,7 @@ import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@Slf4j
@ -41,7 +43,7 @@ class VertxMqttConnection implements MqttConnection {
private long keepAliveTimeoutMs;
@Getter
private long lastPingTime = System.currentTimeMillis();
private volatile boolean closed = false, accepted = false, autoAckSub = true, autoAckUnSub = true, autoAckMsg = false;
private volatile boolean closed = false, accepted = false, autoAckSub = true, autoAckUnSub = true, autoAckMsg = true;
private static final MqttAuth emptyAuth = new MqttAuth() {
@Override
public String getUsername() {
@ -259,19 +261,22 @@ class VertxMqttConnection implements MqttConnection {
ping();
return Mono
.<Void>create(sink -> {
Buffer buffer = Buffer.buffer(message.getPayload());
endpoint.publish(message.getTopic(),
buffer,
MqttQoS.valueOf(message.getQosLevel()),
message.isDup(),
message.isRetain(),
result -> {
if (result.succeeded()) {
sink.success();
} else {
sink.error(result.cause());
}
}
ByteBuf buf = message.getPayload();
Buffer buffer = Buffer.buffer(buf);
endpoint.publish(
message.getTopic(),
buffer,
MqttQoS.valueOf(message.getQosLevel()),
message.isDup(),
message.isRetain(),
result -> {
if (result.succeeded()) {
sink.success();
} else {
sink.error(result.cause());
}
ReferenceCountUtil.safeRelease(buf);
}
);
});
}

View File

@ -1,5 +1,7 @@
package org.jetlinks.community.network.tcp.client;
import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCountUtil;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetSocket;
@ -78,9 +80,11 @@ public class VertxTcpClient implements TcpClient {
sink.error(new SocketException("socket closed"));
return;
}
Buffer buffer = Buffer.buffer(message.getPayload());
ByteBuf buf = message.getPayload();
Buffer buffer = Buffer.buffer(buf);
int len = buffer.length();
socket.write(buffer, r -> {
ReferenceCountUtil.safeRelease(buf);
if (r.succeeded()) {
keepAlive();
sink.success();

View File

@ -31,6 +31,7 @@ import org.jetlinks.rule.engine.api.task.TaskExecutor;
import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
import org.jetlinks.rule.engine.defaults.FunctionTaskExecutor;
import org.reactivestreams.Publisher;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
@ -48,6 +49,7 @@ import java.util.stream.Collectors;
@AllArgsConstructor
@Component
public class DeviceMessageSendTaskExecutorProvider implements TaskExecutorProvider {
public static final String EXECUTOR = "device-message-sender";

View File

@ -80,6 +80,10 @@ public class ThingPropertyDetail implements ThingProperty {
return this;
}
public ThingPropertyDetail createTime(long createTime) {
this.createTime = createTime;
return this;
}
public ThingPropertyDetail formatTime(String formatTime) {
this.formatTime = formatTime;

View File

@ -3,6 +3,7 @@ package org.jetlinks.community.things.data.operations;
import org.hswebframework.ezorm.core.dsl.Query;
import org.hswebframework.web.api.crud.entity.PagerResult;
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
import org.jetlinks.community.things.data.ThingsDataConstants;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.core.things.ThingMetadata;
import org.jetlinks.core.things.ThingsRegistry;
@ -60,6 +61,8 @@ public abstract class ColumnModeQueryOperationsBase extends AbstractQueryOperati
.next(ThingPropertyDetail
.of(value, entry.getValue())
.thingId(data.getString(metricBuilder.getThingIdProperty(), null))
.timestamp(data.getTimestamp())
.createTime(data.getLong(ThingsDataConstants.COLUMN_CREATE_TIME,data.getTimestamp()))
.generateId()
));
}
@ -94,6 +97,8 @@ public abstract class ColumnModeQueryOperationsBase extends AbstractQueryOperati
data -> ThingPropertyDetail
.of(data.get(property).orElse(null), properties.get(property))
.thingId(data.getString(metricBuilder.getThingIdProperty(), null))
.timestamp(data.getTimestamp())
.createTime(data.getLong(ThingsDataConstants.COLUMN_CREATE_TIME, data.getTimestamp()))
.generateId()
);
}

View File

@ -19,6 +19,7 @@ import javax.persistence.Column;
import javax.persistence.Index;
import javax.persistence.Table;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import java.util.Date;
@Getter
@ -45,8 +46,8 @@ public class DeviceTagEntity extends GenericEntity<String> {
private String name;
@Column(length = 256, nullable = false)
@NotBlank(message = "[value]不能为空", groups = CreateGroup.class)
@Length(max = 256, min = 1, message = "[value]长度不能大于256", groups = CreateGroup.class)
@NotNull(message = "[value]不能为空", groups = CreateGroup.class)
@Length(max = 256, message = "[value]长度不能大于256", groups = CreateGroup.class)
@Schema(description = "标签值")
private String value;

View File

@ -1,15 +1,21 @@
package org.jetlinks.community.device.service;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.MapUtils;
import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
import org.jetlinks.community.PropertyConstants;
import org.jetlinks.community.buffer.PersistenceBuffer;
import org.jetlinks.community.configure.cluster.Cluster;
import org.jetlinks.community.device.entity.DeviceInstanceEntity;
import org.jetlinks.community.device.entity.DeviceTagEntity;
import org.jetlinks.community.device.enums.DeviceFeature;
import org.jetlinks.community.device.enums.DeviceState;
import org.jetlinks.community.gateway.annotation.Subscribe;
import org.jetlinks.community.utils.ErrorUtils;
import org.jetlinks.core.device.DeviceConfigKey;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
@ -18,17 +24,26 @@ import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.message.*;
import org.jetlinks.core.metadata.DeviceMetadata;
import org.jetlinks.core.utils.FluxUtils;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.reactor.ql.utils.CastUtils;
import org.jetlinks.supports.official.JetLinksDeviceMetadataCodec;
import org.springframework.dao.QueryTimeoutException;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.BufferOverflowStrategy;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.time.Duration;
import java.util.Date;
import java.util.HashMap;
@ -51,6 +66,8 @@ public class DeviceMessageBusinessHandler {
private final EventBus eventBus;
private final Disposable.Composite disposable = Disposables.composite();
/**
* 自动注册设备信息
* <p>
@ -285,6 +302,40 @@ public class DeviceMessageBusinessHandler {
}
@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
public static class StateBuf implements Externalizable {
//有效期一小时
static long expires = Duration.ofHours(1).toMillis();
private String id;
private long time;
@Override
public void writeExternal(ObjectOutput out) throws IOException {
out.writeUTF(id);
out.writeLong(time);
}
@Override
public void readExternal(ObjectInput in) throws IOException {
id = in.readUTF();
time = in.readLong();
}
public boolean isEffective() {
return System.currentTimeMillis() - time < expires;
}
}
@PreDestroy
public void shutdown() {
disposable.dispose();
}
@PostConstruct
public void init() {
@ -295,22 +346,36 @@ public class DeviceMessageBusinessHandler {
.justLocal()//只订阅本地
.build();
//订阅设备上下线消息,同步数据库中的设备状态,
//最小间隔800毫秒,最大缓冲数量500,最长间隔2秒.
//如果2条消息间隔大于0.8秒则不缓冲直接更新
//否则缓冲,数量超过500后批量更新
//无论缓冲区是否超过500条,都每2秒更新一次.
FluxUtils.bufferRate(eventBus
.subscribe(subscription, DeviceMessage.class)
.map(DeviceMessage::getDeviceId),
800, Integer.getInteger("device.state.sync.batch", 500), Duration.ofSeconds(2))
.onBackpressureBuffer(64,
list -> log.warn("无法处理更多设备状态同步!"),
BufferOverflowStrategy.DROP_OLDEST)
.publishOn(Schedulers.boundedElastic(), 64)
.concatMap(list -> deviceService.syncStateBatch(Flux.just(list), false).map(List::size))
.onErrorContinue((err, obj) -> log.error(err.getMessage(), err))
.subscribe((i) -> log.info("同步设备状态成功:{}", i));
//缓冲同步设备上线信息,在突发大量上下线的情况,减少数据库的压力
PersistenceBuffer<StateBuf> buffer =
new PersistenceBuffer<>(
"./data/device-state-buffer",
"device-state.queue",
StateBuf::new,
flux -> deviceService
.syncStateBatch(flux
.filter(StateBuf::isEffective)
.map(StateBuf::getId)
.distinct()
.collectList()
.flux(), false)
.then(Reactors.ALWAYS_FALSE))
.name("device-state-synchronizer")
.parallelism(1)
.bufferTimeout(Duration.ofSeconds(1))
.retryWhenError(e -> ErrorUtils
.hasException(e,
IOException.class,
QueryTimeoutException.class))
.bufferSize(1000);
buffer.start();
disposable.add(eventBus
.subscribe(subscription, DeviceMessage.class)
.subscribe(msg -> buffer.write(new StateBuf(msg.getDeviceId(), msg.getTimestamp()))));
disposable.add(buffer);
}

View File

@ -86,7 +86,11 @@
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-semconv</artifactId>
<version>1.12.0-alpha</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,279 @@
package org.jetlinks.community.network.manager.debug;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.hswebframework.web.id.IDGenerator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.trace.DeviceTracer;
import org.jetlinks.core.trace.EventBusSpanExporter;
import org.jetlinks.core.trace.ProtocolTracer;
import org.jetlinks.core.trace.TraceHolder;
import org.jetlinks.core.trace.data.SpanDataInfo;
import org.jetlinks.core.utils.TopicUtils;
import org.jetlinks.community.gateway.annotation.Subscribe;
import org.jetlinks.community.gateway.external.SubscribeRequest;
import org.jetlinks.community.gateway.external.SubscriptionProvider;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
@Component
@AllArgsConstructor
@Slf4j
public class DeviceDebugSubscriptionProvider implements SubscriptionProvider {
private final EventBus eventBus;
private final DeviceRegistry registry;
@Override
public String id() {
return "device-debug";
}
@Override
public String name() {
return "设备诊断";
}
@Override
public String[] getTopicPattern() {
return new String[]{"/debug/device/*/trace"};
}
@Override
public Flux<?> subscribe(SubscribeRequest request) {
String deviceId = TopicUtils
.getPathVariables("/debug/device/{deviceId}/trace", request.getTopic())
.get("deviceId");
return startDebug(deviceId);
}
/**
* @param deviceId deviceId
* @see DeviceTracer
* @see EventBusSpanExporter
*/
Flux<TraceData> startDebug(String deviceId) {
if (TraceHolder.isDisabled()) {
return Flux
.just(TraceData
.of(TraceDataType.log,
true,
"0",
"error",
"链路追踪功能已禁用,请联系管理员.",
System.currentTimeMillis(),
System.currentTimeMillis()));
}
//订阅设备跟踪信息
return Flux
.merge(this
.getTraceData(DeviceTracer.SpanName.operation(deviceId, "*"))
.flatMap(this::convertDeviceTrace),
registry
.getDevice(deviceId)
.flatMap(device -> device
.getProtocol()
.map(pro -> ProtocolTracer.SpanName.operation(pro.getId(), "*")))
.flatMapMany(this::getTraceData)
.flatMap(this::convertProtocolTrace)
);
}
private Mono<TraceData> convertProtocolTrace(SpanDataInfo traceData) {
String errorInfo = traceData
.getEvent(SemanticAttributes.EXCEPTION_EVENT_NAME)
.flatMap(event -> event.getAttribute(SemanticAttributes.EXCEPTION_STACKTRACE))
.orElse(null);
String operation = traceData.getName().substring(traceData.getName().lastIndexOf("/") + 1);
//协议跟踪只展示错误信息,因为协议无法定位到具体的设备,如果现实全部跟踪信息可能会有很多信息
if (StringUtils.hasText(errorInfo)) {
return Mono.just(TraceData
.of(TraceDataType.log,
true,
traceData.getTraceId(),
operation,
getDeviceTraceDetail(traceData),
traceData.getStartWithNanos() / 1000 / 1000,
traceData.getStartWithNanos() / 1000 / 1000
));
}
return Mono.empty();
}
private boolean hasError(SpanDataInfo data) {
return data
.getEvent(SemanticAttributes.EXCEPTION_EVENT_NAME)
.isPresent();
}
private Object getDeviceTraceDetail(SpanDataInfo data) {
String message = data
.getAttribute(DeviceTracer.SpanKey.message)
.orElse(null);
String response = data
.getAttribute(DeviceTracer.SpanKey.response)
.orElse(null);
if (StringUtils.hasText(message)) {
if (StringUtils.hasText(response)) {
return String.join("\n\n", response);
}
return message;
}
if (StringUtils.hasText(response)) {
return response;
}
String errorInfo = data
.getEvent(SemanticAttributes.EXCEPTION_EVENT_NAME)
.flatMap(event -> event.getAttribute(SemanticAttributes.EXCEPTION_STACKTRACE))
.orElse(null);
if (StringUtils.hasText(errorInfo)) {
return errorInfo;
}
return JSON.toJSONString(data.getAttributes(), SerializerFeature.PrettyFormat);
}
private Mono<TraceData> convertDeviceTrace(SpanDataInfo traceData) {
String name = traceData.getName();
String operation = name.substring(name.lastIndexOf("/") + 1);
return Mono.just(TraceData
.of(TraceDataType.data,
hasError(traceData),
traceData.getTraceId(),
operation,
getDeviceTraceDetail(traceData),
traceData.getStartWithNanos() / 1000 / 1000,
traceData.getStartWithNanos() / 1000 / 1000
));
}
private Flux<SpanDataInfo> getTraceData(String name) {
//启用跟踪
Disposable disposable = enableSpan(name);
return eventBus
.subscribe(Subscription
.builder()
.subscriberId("device_debug_tracer")
.topics("/trace/*" + name)
.broker()
.local()
.build(),
SpanDataInfo.class)
//完成时关闭跟踪
.doFinally(s -> disposable.dispose());
}
private Disposable enableSpan(String name) {
Disposable.Composite disposable = Disposables.composite();
String id = IDGenerator.UUID.generate();
eventBus
.publish("/_sys/_trace/opt", new TraceOpt(id, name, true))
.subscribe();
disposable.add(() -> eventBus
.publish("/_sys/_trace/opt", new TraceOpt(id, name, false))
.subscribe());
return disposable;
}
@Subscribe(value = "/_sys/_trace/opt", features = {Subscription.Feature.broker, Subscription.Feature.local})
public Mono<Void> handleTraceEnable(TraceOpt opt) {
if (opt.enable) {
log.debug("enable trace {} id:{}", opt.span, opt.id);
TraceHolder.enable(opt.span, opt.id);
} else {
log.debug("remove trace {} id:{}", opt.span, opt.id);
TraceHolder.removeEnabled(opt.span, opt.id);
}
return Mono.empty();
}
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public static class TraceOpt {
private String id;
private String span;
private boolean enable;
}
public enum TraceDataType {
/**
* 和设备有关联的数据
*/
data,
/**
* 和设备没有关联的日志信息
*/
log
}
@Setter
@Getter
@AllArgsConstructor(staticName = "of")
@NoArgsConstructor
@ToString
public static class TraceData implements Serializable {
static Set<String> downstreamOperation = new HashSet<>(
Arrays.asList(
"downstream", "encode", "request"
)
);
private static final long serialVersionUID = 1L;
// 跟踪数据类型
private TraceDataType type;
//是否有错误信息
private boolean error;
// 跟踪ID
private String traceId;
/**
* @see DeviceTracer.SpanName
* 操作. encode,decode
*/
private String operation;
// 数据内容
private Object detail;
//开始时间 毫秒
private long startTime;
//结束时间 毫秒
private long endTime;
//是否上行操作
public boolean isUpstream() {
return !isDownstream();
}
//是否下行操作
public boolean isDownstream() {
return operation != null && downstreamOperation.contains(operation);
}
}
}

View File

@ -95,9 +95,9 @@ public class AlarmRecordRankMeasurement extends StaticMeasurement {
Comparator<AggregationData> comparator;
if (Objects.equals(parameter.getString("order",""), "asc")){
comparator = Comparator.comparingInt(d-> d.getInt("count", 0));
comparator = Comparator.comparingLong(d-> d.getLong("count", 0L));
}else {
comparator = Comparator.<AggregationData>comparingInt(d-> d.getInt("count", 0)).reversed();
comparator = Comparator.<AggregationData>comparingLong(d-> d.getLong("count", 0L)).reversed();
}
AggregationQueryParam param = createQueryParam(parameter);
@ -122,11 +122,11 @@ public class AlarmRecordRankMeasurement extends StaticMeasurement {
private String targetName;
private Integer count;
private long count;
public SimpleResult(AggregationData data) {
String targetId = data.getString("targetId", "");
this.setCount(data.getInt("count", 0));
this.setCount(data.getLong("count", 0L));
this.setTargetName(data.getString("targetName", targetId));
this.setTargetId(data.getString("targetId", ""));
}

View File

@ -264,9 +264,9 @@ public class DeviceTrigger extends DeviceSelectorSpec implements Serializable {
String column;
if (arr.length > 3 && arr[0].equals("properties")) {
column = "t['" + createColumnAlias(term.getColumn()) + "." + String.join(".", Arrays.copyOfRange(arr, 2, arr.length - 1)) + "']";
column = "t['" + createColumnAlias(term.getColumn(), false) + "." + String.join(".", Arrays.copyOfRange(arr, 2, arr.length - 1)) + "']";
} else {
column = "t." + createColumnAlias(term.getColumn());
column = "t['" + createColumnAlias(term.getColumn(), false) + "']";
}
List<TermValue> values = TermValue.of(term);
@ -315,9 +315,9 @@ public class DeviceTrigger extends DeviceSelectorSpec implements Serializable {
String property = arr[1];
switch (valueType) {
case current:
return "this.properties." + property;
return "this['properties." + property + "']";
case recent:
return "coalesce(this.properties." + property + ",device.property.recent(deviceId,'" + property + "',timestamp))";
return "coalesce(this['properties." + property + "']" + ",device.property.recent(deviceId,'" + property + "',timestamp))";
case last:
return "device.property.recent(deviceId,'" + property + "',timestamp)";
}
@ -328,21 +328,28 @@ public class DeviceTrigger extends DeviceSelectorSpec implements Serializable {
return "this['" + String.join(".", Arrays.copyOfRange(arr, 1, arr.length)) + "']";
}
static String createColumnAlias(String column) {
static String createColumnAlias(String column, boolean wrapColumn) {
if (!column.contains(".")) {
return wrapColumnName(column);
}
String[] arr = column.split("[.]");
String alias;
//properties.temp.current
if ("properties".equals(arr[0])) {
String property = arr[1];
return wrapColumnName(property + "_" + arr[arr.length - 1]);
alias = property + "_" + arr[arr.length - 1];
} else {
if (arr.length > 1) {
return wrapColumnName(String.join("_", Arrays.copyOfRange(arr, 1, arr.length)));
alias = String.join("_", Arrays.copyOfRange(arr, 1, arr.length));
} else {
alias = column.replace(".", "_");
}
return wrapColumnName(column.replace(".", "_"));
}
return wrapColumn ? wrapColumnName(alias) : alias;
}
static String createColumnAlias(String column) {
return createColumnAlias(column, true);
}
static String wrapColumnName(String column) {
@ -433,4 +440,4 @@ public class DeviceTrigger extends DeviceSelectorSpec implements Serializable {
}
}
}

View File

@ -14,14 +14,12 @@ spring:
password:
pool:
max-size: 32
data:
elasticsearch:
client:
reactive:
endpoints: ${elasticsearch.client.host}:${elasticsearch.client.port}
max-in-memory-size: 100MB
socket-timeout: ${elasticsearch.client.socket-timeout}
connection-timeout: ${elasticsearch.client.socket-timeout}
elasticsearch:
uris: localhost:9201
socket-timeout: 10s
connection-timeout: 15s
webclient:
max-in-memory-size: 100MB
easyorm:
default-schema: PUBLIC # 数据库默认的schema
dialect: h2 #数据库方言
@ -31,13 +29,6 @@ elasticsearch:
data-path: ./data/elasticsearch
port: 9201
host: 0.0.0.0
client:
host: localhost
port: 9201
max-conn-total: 128
connect-timeout: 5000
socket-timeout: 5000
connection-request-timeout: 8000
index:
default-strategy: time-by-month #默认es的索引按月进行分表, direct则为直接操作索引.
settings:

17
pom.xml
View File

@ -436,6 +436,18 @@
</exclusions>
</dependency>
<dependency>
<groupId>io.swagger.core.v3</groupId>
<artifactId>swagger-annotations</artifactId>
<version>${swagger.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
<version>1.10.0</version>
</dependency>
</dependencies>
</dependencyManagement>
@ -444,7 +456,6 @@
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
<version>1.9</version>
</dependency>
<dependency>
@ -480,14 +491,14 @@
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.17.2</version>
<version>1.17.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.17.2</version>
<version>1.17.4</version>
<scope>test</scope>
</dependency>