Merge remote-tracking branch 'origin/master'

This commit is contained in:
zhou-hao 2020-03-16 11:22:57 +08:00
commit 554bdfed0e
8 changed files with 27 additions and 29 deletions

View File

@ -110,13 +110,11 @@ public abstract class AbstractElasticSearchIndexStrategy implements ElasticSearc
return new HashMap<>();
}
return metadata.stream()
.collect(Collectors.toMap(PropertyMetadata::getId, this::createElasticProperty));
.collect(Collectors.toMap(PropertyMetadata::getId, prop -> this.createElasticProperty(prop.getValueType())));
}
protected Map<String, Object> createElasticProperty(PropertyMetadata metadata) {
protected Map<String, Object> createElasticProperty(DataType type) {
Map<String, Object> property = new HashMap<>();
DataType type = metadata.getValueType();
if (type instanceof DateTimeType) {
property.put("type", "date");
property.put("format", ElasticDateFormat.getFormat(ElasticDateFormat.epoch_millis, ElasticDateFormat.simple_date, ElasticDateFormat.strict_date));
@ -132,6 +130,9 @@ public abstract class AbstractElasticSearchIndexStrategy implements ElasticSearc
property.put("type", "boolean");
} else if (type instanceof GeoType) {
property.put("type", "geo_point");
} else if (type instanceof ArrayType) {
ArrayType arrayType = ((ArrayType) type);
return createElasticProperty(arrayType.getElementType());
} else if (type instanceof ObjectType) {
property.put("type", "nested");
ObjectType objectType = ((ObjectType) type);

View File

@ -179,7 +179,7 @@ public class DefaultElasticSearchService implements ElasticSearchService {
.zipWith(indexManager.getIndexMetadata(index))
.flatMapMany(tp2 ->
group.map(buffer -> {
IndexRequest request = new IndexRequest(tp2.getT1(),"_doc");
IndexRequest request = new IndexRequest(tp2.getT1(), "_doc");
Object o = JSON.toJSON(buffer.getPayload());
if (o instanceof Map) {
request.source(tp2.getT2().convertToElastic((Map<String, Object>) o));
@ -194,9 +194,9 @@ public class DefaultElasticSearchService implements ElasticSearchService {
.flatMap(lst -> {
BulkRequest request = new BulkRequest();
lst.forEach(request::add);
return ReactorActionListener.<BulkResponse>mono(listener -> {
restClient.getWriteClient().bulkAsync(request, RequestOptions.DEFAULT, listener);
});
return ReactorActionListener.<BulkResponse>mono(listener ->
restClient.getWriteClient().bulkAsync(request, RequestOptions.DEFAULT, listener)
);
}).thenReturn(buffers.size());
}

View File

@ -98,6 +98,7 @@ public class DefaultMessageGateway implements MessageGateway {
return Flux.defer(() -> root.find(message.getTopic())
.flatMapIterable(TopicPart::getSessionId)
.flatMap(id -> Mono.justOrEmpty(sessions.get(id)))
.distinct(ConnectionSession::getId)
.filter(connectionSession -> connectionSession.isAlive() && filter.test(connectionSession))
.flatMap(session ->
session.connection

View File

@ -34,6 +34,12 @@ public class SystemLoggerEventHandler {
.addProperty("name", new StringType())
.addProperty("level", new StringType())
.addProperty("message", new StringType())
.addProperty("className",new StringType())
.addProperty("exceptionStack",new StringType())
.addProperty("methodName",new StringType())
.addProperty("threadId",new StringType())
.addProperty("threadName",new StringType())
.addProperty("id",new StringType())
.addProperty("context", new ObjectType()
.addProperty("requestId",new StringType())
.addProperty("server",new StringType()))

View File

@ -14,10 +14,13 @@ public enum DeviceLogType implements EnumDict<String> {
writeProperty("属性修改"),
reportProperty("属性上报"),
child("子设备消息"),
childReply("子设备消息回复"),
functionInvoke("调用功能"),
readPropertyReply("读取属性回复"),
writePropertyReply("修改属性回复"),
functionReply("调用功能回复"),
register("设备注册"),
unregister("设备注销"),
offline("离线"),
online("上线"),
@ -31,7 +34,6 @@ public enum DeviceLogType implements EnumDict<String> {
return name();
}
public static DeviceLogType of(DeviceMessage message) {
switch (message.getMessageType()) {
case EVENT:
@ -42,6 +44,8 @@ public enum DeviceLogType implements EnumDict<String> {
return offline;
case CHILD:
return child;
case CHILD_REPLY:
return childReply;
case REPORT_PROPERTY:
return reportProperty;
case INVOKE_FUNCTION_REPLY:
@ -50,10 +54,13 @@ public enum DeviceLogType implements EnumDict<String> {
return readPropertyReply;
case WRITE_PROPERTY_REPLY:
return writePropertyReply;
case REGISTER:
return register;
case UN_REGISTER:
return unregister;
default:
return other;
}
}

View File

@ -6,6 +6,7 @@
<artifactId>jetlinks-community</artifactId>
<groupId>org.jetlinks.community</groupId>
<version>1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -271,7 +271,7 @@ public class DefaultDeviceSessionManager implements DeviceSessionManager {
if (null != old) {
//1. 可能是多个设备使用了相同的id.
//2. 可能是同一个设备,注销后立即上线,由于种种原因,先处理了上线后处理了注销逻辑.
log.warn("device[{}] session exists,disconnect old session:{}", old.getDeviceId(), session);
log.warn("device[{}] session exists,disconnect old session:{}", old.getDeviceId(), old);
//加入关闭连接队列
scheduleJobQueue.add(old::close);
} else {

View File

@ -1,18 +0,0 @@
package org.jetlinks.community.standalone.configuration;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
@Component
public class IndexPageWebFilter implements WebFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
if (exchange.getRequest().getURI().getPath().equals("/")) {
return chain.filter(exchange.mutate().request(exchange.getRequest().mutate().path("/index.html").build()).build());
}
return chain.filter(exchange);
}
}