增加通用物模块

This commit is contained in:
zhouhao 2022-09-26 17:27:59 +08:00
parent 8d2d26a28a
commit b8adab7af3
43 changed files with 3844 additions and 0 deletions

View File

@ -0,0 +1,39 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>jetlinks-components</artifactId>
<groupId>org.jetlinks.community</groupId>
<version>1.20.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>things-component</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.jetlinks</groupId>
<artifactId>jetlinks-core</artifactId>
<version>${jetlinks.version}</version>
</dependency>
<dependency>
<groupId>org.jetlinks.community</groupId>
<artifactId>common-component</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.jetlinks.community</groupId>
<artifactId>timeseries-component</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,41 @@
package org.jetlinks.community.things;
import org.jetlinks.core.message.HeaderKey;
import org.jetlinks.core.things.ThingType;
import org.jetlinks.community.PropertyConstants;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
public interface ThingConstants {
HeaderKey<String> templateId = PropertyConstants.Key.of("templateId");
HeaderKey<Map<String, Object>> contextVar = PropertyConstants.Key.of("_var", ConcurrentHashMap::new, ConcurrentHashMap.class);
HeaderKey<Set<String>> connectorTrace = PropertyConstants.Key.of("_connectors",
ConcurrentHashMap::newKeySet,
ConcurrentHashMap.KeySetView.class);
interface Topics {
static String properties() {
return "/thing/*/property/*";
}
static String[] properties(ThingType thingType, String thingId) {
return new String[]{
thingType.getTopicPrefix("*", thingId) + "/message/property/report",
thingType.getTopicPrefix("*", thingId) + "/message/property/read/reply",
thingType.getTopicPrefix("*", thingId) + "/message/property/write/reply",
};
}
}
}

View File

@ -0,0 +1,39 @@
package org.jetlinks.community.things;
import org.jetlinks.community.things.data.operations.SaveOperations;
import org.jetlinks.community.things.data.operations.TemplateOperations;
import org.jetlinks.community.things.data.operations.ThingOperations;
import reactor.core.publisher.Mono;
/**
* 物数据仓库,用于保存和查询物模型相关数据: 属性,事件,以及日志
*
* @author zhouhao
* @since 2.0
*/
public interface ThingsDataRepository {
/**
* @return 返回保存操作接口, 用于对物数据进行保存
*/
SaveOperations opsForSave();
/**
* 返回物操作接口, 基于物实例进行数据操作,:查询单个物实例的属性历史.
*
* @param thingType 物类型
* @param thingId 物实例ID
* @return 操作接口
*/
Mono<ThingOperations> opsForThing(String thingType, String thingId);
/**
* 返回物模版操作接口,基于物模版进行数据操作,: 查询物模版下所有物实例的数据.
*
* @param thingType 物类型
* @param templateId 物模版ID
* @return 操作接口
*/
Mono<TemplateOperations> opsForTemplate(String thingType, String templateId);
}

View File

@ -0,0 +1,19 @@
package org.jetlinks.community.things.configuration;
import org.jetlinks.core.things.DefaultThingsRegistry;
import org.jetlinks.core.things.ThingsRegistrySupport;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import javax.annotation.Nonnull;
public class AutoRegisterThingsRegistry extends DefaultThingsRegistry implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(@Nonnull Object bean,@Nonnull String beanName) throws BeansException {
if(bean instanceof ThingsRegistrySupport){
addSupport(((ThingsRegistrySupport) bean));
}
return BeanPostProcessor.super.postProcessAfterInitialization(bean, beanName);
}
}

View File

@ -0,0 +1,55 @@
package org.jetlinks.community.things.configuration;
import lombok.Generated;
import org.jetlinks.community.things.data.*;
import org.jetlinks.core.defaults.DeviceThingsRegistrySupport;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.things.ThingsRegistry;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@Configuration
@Generated
public class ThingsConfiguration {
@Bean
public NoneThingsDataRepositoryStrategy noneThingsDataRepositoryStrategy() {
return new NoneThingsDataRepositoryStrategy();
}
@Bean(destroyMethod = "shutdown")
public AutoUpdateThingsDataManager thingsDataManager(EventBus eventBus) {
String fileName = "./data/things-property/data";
return new AutoUpdateThingsDataManager(fileName, eventBus);
}
@Bean
@Primary
public AutoRegisterThingsRegistry thingsRegistry() {
return new AutoRegisterThingsRegistry();
}
@Bean
@ConditionalOnBean(DeviceRegistry.class)
public DeviceThingsRegistrySupport deviceThingsRegistrySupport(DeviceRegistry registry) {
return new DeviceThingsRegistrySupport(registry);
}
@Bean
public DefaultThingsDataRepository thingDataService(ThingsRegistry registry,
ObjectProvider<ThingsDataCustomizer> customizers,
ObjectProvider<ThingsDataRepositoryStrategy> policies) {
DefaultThingsDataRepository service = new DefaultThingsDataRepository(registry);
policies.forEach(service::addPolicy);
for (ThingsDataCustomizer customizer : customizers) {
customizer.custom(service);
}
return service;
}
}

View File

@ -0,0 +1,58 @@
package org.jetlinks.community.things.data;
import lombok.AllArgsConstructor;
import org.jetlinks.community.things.data.operations.*;
public abstract class AbstractThingDataRepositoryStrategy extends CacheSaveOperationsStrategy {
@Override
public abstract SaveOperations createOpsForSave(OperationsContext context);
protected abstract QueryOperations createForQuery(String thingType,
String templateId,
String thingId,
OperationsContext context);
protected abstract DDLOperations createForDDL(String thingType,
String templateId,
String thingId,
OperationsContext context);
@Override
public final ThingOperations opsForThing(String thingType,
String templateId,
String thingId,
OperationsContext context) {
return new ThingOperationsHolder(thingType, templateId, thingId, context);
}
@Override
public final TemplateOperations opsForTemplate(String thingType,
String templateId,
OperationsContext context) {
return new ThingOperationsHolder(thingType, templateId, null, context);
}
@AllArgsConstructor
class ThingOperationsHolder implements ThingOperations, TemplateOperations {
private final String thingType;
private final String templateId;
private final String thingId;
private final OperationsContext context;
@Override
public QueryOperations forQuery() {
return createForQuery(thingType, templateId, thingId, context);
}
@Override
public DDLOperations forDDL() {
return createForDDL(thingType, templateId, thingId, context);
}
}
}

View File

@ -0,0 +1,93 @@
package org.jetlinks.community.things.data;
import io.swagger.v3.oas.annotations.Hidden;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import org.hswebframework.ezorm.core.param.Term;
import org.hswebframework.ezorm.core.param.TermType;
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
import org.jetlinks.community.Interval;
import org.jetlinks.community.utils.ConverterUtils;
import org.jetlinks.reactor.ql.utils.CastUtils;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.Date;
import java.util.List;
@Getter
@Setter
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class AggregationRequest {
//时间间隔
//为空时,不按时间分组
@Schema(description = "间隔,如: 1d", type = "string", defaultValue = "1d")
@Nullable
@Builder.Default
Interval interval = Interval.ofDays(1);
//时间格式
@Schema(defaultValue = "时间格式,如:yyyy-MM-dd", description = "yyyy-MM-dd")
@Builder.Default
String format = "yyyy-MM-dd";
@Schema(description = "时间从,如: 2020-09-01 00:00:00,支持表达式: now-1d")
@Builder.Default
Date from = new DateTime()
.plusMonths(-1)
.withHourOfDay(0)
.withMinuteOfHour(0)
.withSecondOfMinute(0)
.toDate();
@Schema(description = "时间到,如: 2020-09-30 00:00:00,支持表达式: now-1d")
@Builder.Default
Date to = new DateTime()
.withHourOfDay(23)
.withMinuteOfHour(59)
.withSecondOfMinute(59)
.toDate();
@Schema(description = "数量限制")
@Builder.Default
int limit = 30;
//过滤条件
@Schema(description = "过滤条件")
@Builder.Default
QueryParamEntity filter = QueryParamEntity.of();
public AggregationRequest copy() {
return new AggregationRequest(interval, format, from, to, limit, filter.clone());
}
@Hidden
public void setQuery(QueryParamEntity filter) {
setFilter(filter);
}
public void prepareTimestampCondition() {
for (Term term : filter.getTerms()) {
if ("timestamp".equals(term.getColumn())) {
if (TermType.btw.equals(term.getTermType())) {
List<Object> values = ConverterUtils.convertToList(term.getValue());
if (values.size() > 0) {
from = CastUtils.castDate(values.get(0));
}
if (values.size() > 1) {
to = CastUtils.castDate(values.get(1));
}
term.setValue(null);
} else if (TermType.gt.equals(term.getTermType()) || TermType.gte.equals(term.getTermType())) {
from = CastUtils.castDate(term.getValue());
term.setValue(null);
} else if (TermType.lt.equals(term.getTermType()) || TermType.lte.equals(term.getTermType())) {
to = CastUtils.castDate(term.getValue());
term.setValue(null);
}
}
}
}
}

View File

@ -0,0 +1,232 @@
package org.jetlinks.community.things.data;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.*;
import org.jetlinks.community.things.ThingConstants;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.message.property.PropertyMessage;
import org.jetlinks.core.things.ThingId;
import org.jetlinks.core.things.ThingProperty;
import org.jetlinks.core.things.ThingType;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* 按需更新的物数据管理器
* <p>
* 如果是首次获取将从事件总线订阅数据并更新.
* 如果超过10分钟没有获取过数据,则将取消订阅.
* <p>
* 缓存数据会持久化到指定的文件中.
* <p>
* 当获取的本地缓存不存在,并且集群的其他节点也没有获取过,可能将获取到旧数据.
*
* @author zhouhao
* @since 2.0
*/
public class AutoUpdateThingsDataManager extends LocalFileThingsDataManager {
private final Map<ThingId, Updater> updaters = Caffeine
.newBuilder()
//10分钟没有任何读取则dispose取消订阅
.expireAfterAccess(Duration.ofMinutes(10))
.<ThingId, Updater>removalListener((key, value, cause) -> {
if (cause == RemovalCause.EXPIRED) {
if (value != null) {
value.dispose();
}
}
})
.build()
.asMap();
private final EventBus eventBus;
public AutoUpdateThingsDataManager(String fileName, EventBus eventBus) {
super(fileName);
this.eventBus = eventBus;
}
@Override
public Mono<List<ThingProperty>> getProperties(String thingType, String thingId, String property, long from, long to) {
Updater updater = getUpdater(thingType, thingId);
updater.tryLoad(property);
Mono<Void> loader = updater.loader;
if (updater.loading && loader != null) {
return loader
.then(Mono.defer(() -> super.getProperties(thingType, thingId, property, from, to)));
}
return super.getProperties(thingType, thingId, property, from, to);
}
@Override
public Mono<ThingProperty> getLastProperty(String thingType,
String thingId,
String property,
long baseTime) {
Updater updater = getUpdater(thingType, thingId);
updater.tryLoad(property);
Mono<Void> loader = updater.loader;
if (updater.loading && loader != null) {
return loader
.then(Mono.defer(() -> super.getLastProperty(thingType, thingId, property, baseTime)));
}
return super.getLastProperty(thingType, thingId, property, baseTime);
}
@Override
public Mono<ThingProperty> getFirstProperty(String thingType, String thingId, String property) {
Updater updater = getUpdater(thingType, thingId);
updater.tryLoad(property);
Mono<Void> loader = updater.loader;
if (updater.loading && loader != null) {
return loader
.then(Mono.defer(() -> super.getFirstProperty(thingType, thingId, property)));
}
return super.getFirstProperty(thingType, thingId, property);
}
private Updater getUpdater(String thingType, String thingId) {
ThingId key = ThingId.of(thingType, thingId);
return updaters
.computeIfAbsent(key, this::createUpdater);
}
private Mono<Void> loadData(String thingType, String thingId, String property) {
//fixme 不加载不存在的数据,如果查库可能会导致瞬间压力过高
return Mono.empty();
}
protected Updater createUpdater(ThingId id) {
return new Updater(id.getType(), id.getId());
}
@SneakyThrows
private ByteBuf encodeHistory(PropertyHistory history) {
ByteBuf buf = Unpooled.buffer();
try (ObjectOutput out = createOutput(buf)) {
history.writeExternal(out);
}
return buf;
}
@SneakyThrows
private PropertyHistory decodeHistory(ByteBuf buf) {
PropertyHistory history = new PropertyHistory();
try (ObjectInput input = createInput(buf)) {
history.readExternal(input);
}
return history;
}
@Override
public void shutdown() {
super.shutdown();
updaters.values().forEach(Disposable::dispose);
}
@Getter
@Setter
@AllArgsConstructor(staticName = "of")
@NoArgsConstructor
public static class ThingHistoryRequest {
private String thingType;
private String thingId;
private String property;
}
protected class Updater implements Disposable {
private final String thingType;
private final String thingId;
private final Disposable disposable;
private final Set<String> include = ConcurrentHashMap.newKeySet();
private boolean loading;
private Mono<Void> loader;
public Updater(String thingType, String thingId) {
this.thingType = thingType;
this.thingId = thingId;
//订阅整个集群的消息来更新本地的缓存数据
disposable = eventBus
.subscribe(
Subscription.builder()
.subscriberId("thing-data-property-updater")
.topics(ThingConstants.Topics.properties(ThingType.of(thingType), thingId))
.local()
.broker()
.build(),
PropertyMessage.class
)
.doOnNext(this::doUpdate)
.subscribe();
}
private void doUpdate(PropertyMessage message) {
try {
Map<String, Object> properties = message.getProperties();
if (properties == null) {
return;
}
for (Map.Entry<String, Object> entry : properties.entrySet()) {
String property = entry.getKey();
if (include.contains(property)) {
updateProperty0(thingType,
thingId,
property,
message
.getPropertySourceTime(property)
.orElse(message.getTimestamp()),
entry.getValue(),
message.getPropertyState(property).orElse(null)
);
}
}
} catch (Throwable ignore) {
//ignore
}
}
private void tryLoad(String property) {
if (include.add(property)) {
this.loading = true;
this.loader = loadData(thingType, thingId, property)
.doAfterTerminate(() -> {
loading = false;
loader = null;
})
.cache();
}
}
@Override
public void dispose() {
disposable.dispose();
include.clear();
}
@Override
public boolean isDisposed() {
return disposable.isDisposed();
}
}
}

View File

@ -0,0 +1,21 @@
package org.jetlinks.community.things.data;
import org.jetlinks.community.things.data.operations.SaveOperations;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public abstract class CacheSaveOperationsStrategy implements ThingsDataRepositoryStrategy {
private final Map<OperationsContext, SaveOperations> caches = new ConcurrentHashMap<>();
@Override
public final SaveOperations opsForSave(OperationsContext context) {
//save可能是频繁操作,使用cache减少对象创建
return caches.computeIfAbsent(context, this::createOpsForSave);
}
protected abstract SaveOperations createOpsForSave(OperationsContext context);
}

View File

@ -0,0 +1,39 @@
package org.jetlinks.community.things.data;
import org.jetlinks.core.metadata.PropertyMetadata;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
public class DefaultMetricMetadataManager implements MetricMetadataManager {
private final Map<String, Map<String, PropertyMetadata>> repo = new ConcurrentHashMap<>();
@Override
public void register(String metric, List<PropertyMetadata> properties) {
repo.compute(metric, (key, old) -> {
if (old != null) {
old.clear();
} else {
old = new ConcurrentHashMap<>();
}
for (PropertyMetadata property : properties) {
old.put(property.getId(), property);
}
return old;
});
}
@Override
public Optional<PropertyMetadata> getColumn(String metric, String property) {
if (metric == null || property == null) {
return Optional.empty();
}
Map<String, PropertyMetadata> m = repo.get(metric);
if (m != null) {
return Optional.ofNullable(m.get(property));
}
return Optional.empty();
}
}

View File

@ -0,0 +1,168 @@
package org.jetlinks.community.things.data;
import org.hswebframework.web.exception.I18nSupportException;
import org.jetlinks.core.message.ThingMessage;
import org.jetlinks.core.things.ThingTemplate;
import org.jetlinks.core.things.ThingsRegistry;
import org.jetlinks.community.things.ThingsDataRepository;
import org.jetlinks.community.things.data.operations.*;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
/**
* 默认物数据服务,支持动态数据存储策略{@link ThingsDataRepositoryStrategy},
* <p>
* 支持不同物类型自定义表名逻辑{@link ThingsDataCustomizer#custom(ThingsDataContext)} {@link ThingsDataContext#customMetricBuilder(String, MetricBuilder)}
*
* @author zhouhao
* @since 2.0
*/
public class DefaultThingsDataRepository implements ThingsDataRepository, ThingsDataContext, SaveOperations {
private final Map<String, ThingsDataRepositoryStrategy> policies = new ConcurrentHashMap<>();
private final Map<String, ThingsDataRepositoryStrategy.OperationsContext> contexts = new ConcurrentHashMap<>();
private final ThingsRegistry registry;
private String defaultPolicy = "default-row";
private ThingsDataRepositoryStrategy.OperationsContext defaultContext = new ThingsDataRepositoryStrategy.OperationsContext(
MetricBuilder.DEFAULT, new DataSettings()
);
public DefaultThingsDataRepository(ThingsRegistry registry) {
this.registry = registry;
}
private ThingsDataRepositoryStrategy getPolicyNow(String policy) {
ThingsDataRepositoryStrategy dataPolicy = policies.get(policy);
if (dataPolicy == null) {
throw new I18nSupportException("error.thing_data_policy_unsupported", policy);
}
return dataPolicy;
}
private Mono<Tuple2<String, ThingsDataRepositoryStrategy>> getPolicyByThing(String thingType, String thingId) {
return registry
.getThing(thingType, thingId)
.flatMap(thing -> Mono
.zip(
thing.getTemplate().map(ThingTemplate::getId),
thing
.getConfig(ThingsDataConstants.storePolicyConfigKey)
.defaultIfEmpty(defaultPolicy)
.map(this::getPolicyNow)
)
);
}
private Mono<ThingsDataRepositoryStrategy> getPolicyByTemplate(String thingType, String templateId) {
return registry
.getTemplate(thingType, templateId)
.flatMap(template -> template
.getConfig(ThingsDataConstants.storePolicyConfigKey)
.defaultIfEmpty(defaultPolicy))
.map(this::getPolicyNow);
}
@Override
public SaveOperations opsForSave() {
return this;
}
@Override
public Mono<ThingOperations> opsForThing(String thingType, String thingId) {
return this
.getPolicyByThing(thingType, thingId)
.map((tp2) -> tp2
.getT2()
.opsForThing(thingType, tp2.getT1(), thingId, contexts.getOrDefault(thingType, defaultContext)));
}
@Override
public Mono<TemplateOperations> opsForTemplate(String thingType, String templateId) {
return this
.getPolicyByTemplate(thingType, templateId)
.map(policy -> policy.opsForTemplate(thingType, templateId, contexts.getOrDefault(thingType, defaultContext)));
}
@Override
public void customMetricBuilder(String thingType, MetricBuilder metricBuilder) {
contexts.compute(thingType, (k, old) -> {
if (old == null) {
return new ThingsDataRepositoryStrategy.OperationsContext(metricBuilder, defaultContext.getSettings());
}
return old.metricBuilder(metricBuilder);
});
}
@Override
public void customSettings(String thingType, DataSettings settings) {
contexts.compute(thingType, (k, old) -> {
if (old == null) {
return new ThingsDataRepositoryStrategy.OperationsContext(defaultContext.getMetricBuilder(), settings);
}
return old.settings(settings);
});
}
@Override
public void setDefaultPolicy(String policy) {
this.defaultPolicy = policy;
}
@Override
public void setDefaultSettings(DataSettings settings) {
for (Map.Entry<String, ThingsDataRepositoryStrategy.OperationsContext> entry : contexts.entrySet()) {
if (entry.getValue().getSettings() == defaultContext.getSettings()) {
entry.setValue(new ThingsDataRepositoryStrategy.OperationsContext(entry.getValue().getMetricBuilder(), settings));
}
}
this.defaultContext = new ThingsDataRepositoryStrategy.OperationsContext(MetricBuilder.DEFAULT, settings);
}
@Override
public void addPolicy(ThingsDataRepositoryStrategy policy) {
ThingsDataRepositoryStrategies.register(policy);
policies.put(policy.getId(), policy);
}
@Override
public Mono<Void> save(ThingMessage thingMessage) {
return doSave(thingMessage.getThingType(),
thingMessage.getThingId(),
opt -> opt.save(thingMessage));
}
@Override
public Mono<Void> save(Collection<? extends ThingMessage> thingMessage) {
return save(Flux.fromIterable(thingMessage));
}
@Override
public Mono<Void> save(Publisher<? extends ThingMessage> thingMessage) {
return Flux.from(thingMessage)
.groupBy(msg -> Tuples.of(msg.getThingType(), msg.getThingId()))
.flatMap(group -> doSave(group.key().getT1(), group.key().getT2(), opt -> opt.save(group)))
.then();
}
private Mono<Void> doSave(String thingType, String thingId, Function<SaveOperations, Mono<Void>> opt) {
return this
.getPolicyByThing(thingType, thingId)
.flatMap((tp2) -> opt.apply(tp2.getT2().opsForSave(contexts.getOrDefault(thingType, defaultContext))));
}
}

View File

@ -0,0 +1,653 @@
package org.jetlinks.community.things.data;
import com.github.benmanes.caffeine.cache.Caffeine;
import io.netty.buffer.*;
import io.netty.util.ReferenceCountUtil;
import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import org.h2.mvstore.Cursor;
import org.h2.mvstore.MVMap;
import org.h2.mvstore.MVStore;
import org.h2.mvstore.WriteBuffer;
import org.h2.mvstore.type.BasicDataType;
import org.jetlinks.core.things.ThingProperty;
import org.jetlinks.core.things.ThingsDataManager;
import org.jetlinks.core.utils.SerializeUtils;
import org.jetlinks.core.utils.StringBuilderUtils;
import org.jetlinks.community.codec.Serializers;
import reactor.core.publisher.Mono;
import reactor.function.Function3;
import javax.annotation.Nonnull;
import java.io.*;
import java.lang.reflect.Array;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
public class LocalFileThingsDataManager implements ThingsDataManager, ThingsDataWriter {
//单个属性最大缓存数量 java -Dthing.data.store.max-size=4
private final static int DEFAULT_MAX_STORE_SIZE_EACH_KEY = Integer
.parseInt(
System.getProperty("thing.data.store.max-size", "4")
);
protected final MVStore mvStore;
private final Map<String, Integer> tagCache = new ConcurrentHashMap<>();
private final Map<Long, PropertyHistory> historyCache =
Caffeine
.newBuilder()
.expireAfterAccess(Duration.ofMinutes(10))
.<Long, PropertyHistory>build()
.asMap();
private final MVMap<String, Integer> tagStore;
private final MVMap<Long, PropertyHistory> history;
private static MVStore open(String fileName) {
return new MVStore.Builder()
.fileName(fileName)
.autoCommitBufferSize(64 * 1024)
.compress()
.keysPerPage(1024)
.cacheSize(64)
.open();
}
@SuppressWarnings("all")
private static MVStore load(String fileName) {
File file = new File(fileName);
if (!file.getParentFile().exists()) {
file.getParentFile().mkdirs();
}
try {
return open(fileName);
} catch (Throwable err) {
if (file.exists()) {
file.renameTo(new File(fileName + "_load_err_" + System.currentTimeMillis()));
file.delete();
return open(fileName);
} else {
throw err;
}
}
}
public LocalFileThingsDataManager(String fileName) {
this(load(fileName));
}
public LocalFileThingsDataManager(MVStore store) {
this.mvStore = store;
this.tagStore = mvStore.openMap("tags");
this.history = mvStore
.openMap("store", new MVMap
.Builder<Long, PropertyHistory>()
.valueType(new HistoryType()));
}
public void shutdown() {
for (Map.Entry<Long, PropertyHistory> entry : historyCache.entrySet()) {
if (!entry.getValue().stored) {
entry.getValue().stored = true;
history.put(entry.getKey(), entry.getValue());
}
}
for (Map.Entry<Long, PropertyHistory> entry : history.entrySet()) {
if (!entry.getValue().stored) {
history.put(entry.getKey(), entry.getValue());
}
}
mvStore.compactMoveChunks();
mvStore.close(60_000);
}
@Override
public Mono<ThingProperty> getLastProperty(String thingType,
String thingId,
String property,
long baseTime) {
PropertyHistory propertyStore = getHistory(thingType, thingId, property);
if (propertyStore == null) {
return lastPropertyNotFound(thingType, thingId, property, baseTime);
}
Property pro = propertyStore.getProperty(baseTime);
if (pro == null) {
return lastPropertyNotFound(thingType, thingId, property, baseTime);
}
return pro.toProperty(property);
}
protected Mono<ThingProperty> lastPropertyNotFound(String thingType,
String thingId,
String property,
long baseTime) {
return Mono.empty();
}
protected Mono<ThingProperty> firstPropertyNotFound(String thingType,
String thingId,
String property) {
return Mono.empty();
}
@Override
public Mono<ThingProperty> getFirstProperty(String thingType,
String thingId,
String property) {
PropertyHistory propertyStore = getHistory(thingType, thingId, property);
if (propertyStore == null) {
return firstPropertyNotFound(thingType, thingId, property);
}
Property pro = propertyStore.first;
if (pro == null) {
return firstPropertyNotFound(thingType, thingId, property);
}
return pro.toProperty(property);
}
@Override
public Mono<List<ThingProperty>> getProperties(String thingType,
String thingId,
String property,
long baseTime) {
return this.getProperties(thingType,
thingId,
property,
0,
baseTime);
}
@Override
public Mono<List<ThingProperty>> getProperties(String thingType,
String thingId,
String property,
long from,
long to) {
PropertyHistory propertyStore = getHistory(thingType, thingId, property);
if (propertyStore == null) {
return Mono.empty();
}
return Mono.just(propertyStore.getProperties(property, from, to));
}
protected PropertyHistory getHistory(String thingType,
String thingId,
String property) {
long key = getPropertyStoreKey(thingType, thingId, property);
PropertyHistory his = historyCache.get(key);
if (his != null) {
return his;
}
his = history.get(key);
if (his != null) {
historyCache.putIfAbsent(key, his);
return his;
}
return null;
}
@Override
public Mono<Long> getLastPropertyTime(String thingType, String thingId, long baseTime) {
long time = scanProperty(thingType,
thingId,
0L,
baseTime,
(init, arg, history) -> {
Property store = history.getProperty(arg);
if (store != null) {
return Math.max(init, store.time);
}
return init;
});
return time == 0 ? Mono.empty() : Mono.just(time);
}
protected <T, ARG> T scanProperty(String thingType,
String thingId,
T init,
ARG arg,
Function3<T, ARG, PropertyHistory, T> historyConsumer) {
long thingTag = getThingTag(thingType, thingId);
int tagSize = tagStore.size();
//左移32位表示物ID标记
long fromTag = thingTag << 32;
//加上标签总大小,表示可能的所有属性key范围
long toTag = fromTag + tagSize;
//获取搜索的key范围
Long fromKey = history.higherKey(fromTag);
//没有key,说明没有数据
if (fromKey == null) {
return init;
}
Long toKey = history.lowerKey(toTag);
//查找大于此标记的key,可能是同一个物的属性数据
Cursor<Long, PropertyHistory> cursor = history.cursor(fromKey, toKey, false);
if (cursor == null) {
return init;
}
final int maxLoop = tagSize / 2;
int loop = maxLoop;
//迭代游标来对比数据
while (cursor.hasNext() && loop > 0) {
long _tag = cursor.getKey() >> 32;
if (_tag != thingTag) {
loop--;
cursor.next();
continue;
}
loop = maxLoop;
PropertyHistory propertyStore = cursor.getValue();
init = historyConsumer.apply(init, arg, propertyStore);
cursor.next();
}
return init;
}
@Override
public Mono<Long> getFirstPropertyTime(String thingType, String thingId) {
Long time = scanProperty(thingType,
thingId,
null,
null,
(init, arg, history) -> {
Property store = history.first;
if (store != null) {
if (init == null) {
return store.time;
}
return Math.min(init, store.time);
}
return init;
});
return time == null ? Mono.empty() : Mono.just(time);
}
protected final int getTag(String key) {
return tagCache
.computeIfAbsent(key, _key ->
tagStore.computeIfAbsent(_key, k -> tagStore.size() + 1));
}
@SneakyThrows
protected ObjectOutput createOutput(ByteBuf buffer) {
return Serializers.getDefault().createOutput(new ByteBufOutputStream(buffer));
}
@SneakyThrows
protected ObjectInput createInput(ByteBuf buffer) {
return Serializers.getDefault().createInput(new ByteBufInputStream(buffer, true));
}
@Nonnull
@Override
public final Mono<Void> updateProperty(@Nonnull String thingType, @Nonnull String thingId, @Nonnull ThingProperty property) {
return updateProperty(thingType,
thingId,
property.getProperty(),
property.getTimestamp(),
property.getValue(),
property.getState());
}
protected long getThingTag(String thingType, String thingId) {
return getTag(StringBuilderUtils.buildString(
thingType, thingId,
(a, b, sb) -> sb.append(a).append(':').append(b)));
}
protected long getPropertyStoreKey(String thingType, String thingId, String property) {
long thingTag = getThingTag(thingType, thingId);
int propertyTag = getTag(property);
//物ID对应的tag左移32位和属性tag相加,表示 一个物的某个属性.
return (thingTag << 32) + propertyTag;
}
@Nonnull
@Override
public Mono<Void> updateProperty(@Nonnull String thingType,
@Nonnull String thingId,
@Nonnull String property,
long timestamp,
@Nonnull Object value,
String state) {
updateProperty0(thingType, thingId, property, timestamp, value, state);
return Mono.empty();
}
protected final void updateProperty0(String thingType,
String thingId,
String property,
long timestamp,
Object value,
String state) {
long key = getPropertyStoreKey(thingType, thingId, property);
PropertyHistory propertyStore = historyCache
.computeIfAbsent(key, k -> history.computeIfAbsent(k, k1 -> new PropertyHistory()));
Property p = new Property();
p.setTime(timestamp);
p.setValue(value);
p.setState(state);
propertyStore.update(p);
propertyStore.tryStore(key, history::put);
}
protected final void updateProperty(String thingType,
String thingId,
String property,
PropertyHistory propertyHistory) {
long key = getPropertyStoreKey(thingType, thingId, property);
PropertyHistory propertyStore = history.computeIfAbsent(key, (ignore) -> new PropertyHistory());
if (propertyHistory.first != null) {
propertyStore.update(propertyHistory.first);
}
if (propertyHistory.refs != null) {
for (Property ref : propertyHistory.refs) {
propertyStore.update(ref);
}
}
}
public static class PropertyHistory implements Externalizable {
private Property first;
private Property[] refs;
private long minTime = -1;
private long elapsedTime;
private boolean stored;
public Property getProperty(long baseTime) {
if (refs == null) {
return null;
}
for (Property ref : refs) {
if (ref != null && ref.time <= baseTime) {
return ref;
}
}
return null;
}
public List<ThingProperty> getProperties(String property, long from, long to) {
if (refs == null) {
return Collections.emptyList();
}
if (DEFAULT_MAX_STORE_SIZE_EACH_KEY == 0) {
return Collections.emptyList();
}
List<ThingProperty> properties = new ArrayList<>(Math.min(32, DEFAULT_MAX_STORE_SIZE_EACH_KEY));
for (Property ref : refs) {
if (ref != null && ref.time >= from && ref.time < to) {
ThingProperty prop = ref.toPropertyNow(property);
if (prop != null) {
properties.add(prop);
}
}
}
return properties;
}
public void tryStore(long key, BiConsumer<Long, PropertyHistory> store) {
long now = System.currentTimeMillis();
long elapsed = elapsedTime;
elapsedTime = now;
if (now - elapsed >= 5_000) {
stored = true;
store.accept(key, this);
} else {
stored = false;
}
}
//更新,并返回距离上传更新的时间差
public void update(Property ref) {
//init
if (refs == null) {
refs = new Property[0];
}
//更新首次时间
if (first == null || first.time >= ref.time) {
first = ref;
}
if (minTime > 0) {
//时间回退?
if (ref.time < minTime) {
return;
}
}
boolean newEl = false;
if (refs.length < DEFAULT_MAX_STORE_SIZE_EACH_KEY) {
refs = Arrays.copyOf(refs, refs.length + 1);
newEl = true;
}
Property last = refs[0];
//fast
if (last == null || ref.time >= last.time || newEl) {
refs[refs.length - 1] = ref;
}
//slow
else {
for (int i = 1; i < refs.length; i++) {
last = refs[i];
if (ref.time == last.time) {
refs[i] = ref;
} else if (ref.time > last.time) {
System.arraycopy(refs, i, refs, i + 1, refs.length - i - 1);
refs[i] = ref;
break;
}
}
}
Arrays.sort(refs, Comparator.comparingLong(r -> r == null ? 0 : -r.time));
minTime = refs[refs.length - 1].time;
return;
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
out.writeShort(refs.length);
for (Property ref : refs) {
ref.writeExternal(out);
}
out.writeBoolean(first != null);
if (first != null) {
first.writeExternal(out);
}
}
@Override
public void readExternal(ObjectInput in) throws IOException {
this.stored = true;
int len = in.readShort();
refs = new Property[len];
for (int i = 0; i < len; i++) {
refs[i] = new Property();
refs[i].readExternal(in);
}
if (in.readBoolean()) {
first = new Property();
first.readExternal(in);
}
}
public int memory() {
int i = 0;
if (first != null) {
i += first.memory();
}
if (refs != null) {
for (Property ref : refs) {
if (ref != null) {
i += ref.memory();
}
}
}
return i;
}
}
@Getter
@Setter
public static class Property implements Externalizable {
private long time;
private String state;
private Object value;
private volatile Mono<ThingProperty> _temp;
public Mono<ThingProperty> toProperty(String property) {
if (_temp == null) {
_temp = Mono.just(ThingProperty.of(property, value, time, state));
}
return _temp;
}
@SneakyThrows
@SuppressWarnings("all")
public ThingProperty toPropertyNow(String property) {
if (_temp == null) {
_temp = Mono.just(ThingProperty.of(property, value, time, state));
}
if (_temp instanceof Callable) {
return ((Callable<ThingProperty>) _temp).call();
}
return _temp.toFuture().getNow(null);
}
public int memory() {
int i = 8;
if (state != null) {
i += state.length() * 2;
}
if (value instanceof Number) {
i += 8;
} else {
i += 64;
}
return i;
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
out.writeLong(time);
SerializeUtils.writeObject(state, out);
SerializeUtils.writeObject(value, out);
}
@Override
public void readExternal(ObjectInput in) throws IOException {
time = in.readLong();
state = (String) SerializeUtils.readObject(in);
value = SerializeUtils.readObject(in);
}
}
private class HistoryType extends BasicDataType<PropertyHistory> {
@Override
public int compare(PropertyHistory a, PropertyHistory b) {
if (a.refs == null && b.refs == null) {
return 0;
}
if (a.refs == null) {
return -1;
}
if (b.refs == null) {
return 1;
}
return Long.compare(a.refs[0].time, b.refs[0].time);
}
@Override
public int getMemory(PropertyHistory obj) {
return obj.memory();
}
@Override
@SneakyThrows
public void write(WriteBuffer buff, PropertyHistory data) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
try (ObjectOutput output = createOutput(buffer)) {
data.writeExternal(output);
buff.put(buffer.nioBuffer());
output.flush();
} finally {
ReferenceCountUtil.safeRelease(buffer);
}
}
@Override
@SneakyThrows
public void write(WriteBuffer buff, Object obj, int len) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
try (ObjectOutput output = createOutput(buffer)) {
for (int i = 0; i < len; i++) {
((PropertyHistory) Array.get(obj, i)).writeExternal(output);
}
output.flush();
buff.put(buffer.nioBuffer());
} finally {
ReferenceCountUtil.safeRelease(buffer);
}
}
@Override
@SneakyThrows
public void read(ByteBuffer buff, Object obj, int len) {
try (ObjectInput input = createInput(Unpooled.wrappedBuffer(buff))) {
for (int i = 0; i < len; i++) {
PropertyHistory data = new PropertyHistory();
data.readExternal(input);
Array.set(obj, i, data);
}
}
}
@Override
public PropertyHistory[] createStorage(int size) {
return new PropertyHistory[size];
}
@Override
@SneakyThrows
public PropertyHistory read(ByteBuffer buff) {
PropertyHistory data = new PropertyHistory();
try (ObjectInput input = createInput(Unpooled.wrappedBuffer(buff))) {
data.readExternal(input);
}
return data;
}
}
}

View File

@ -0,0 +1,14 @@
package org.jetlinks.community.things.data;
import org.jetlinks.core.metadata.PropertyMetadata;
import java.util.List;
import java.util.Optional;
public interface MetricMetadataManager {
void register(String metric, List<PropertyMetadata> properties);
Optional<PropertyMetadata> getColumn(String metric, String property);
}

View File

@ -0,0 +1,141 @@
package org.jetlinks.community.things.data;
import org.hswebframework.web.api.crud.entity.PagerResult;
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
import org.jetlinks.core.message.ThingMessage;
import org.jetlinks.core.metadata.Feature;
import org.jetlinks.core.things.ThingMetadata;
import org.jetlinks.community.things.data.operations.*;
import org.jetlinks.community.timeseries.query.AggregationData;
import org.reactivestreams.Publisher;
import org.springframework.core.Ordered;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.util.Collection;
public class NoneThingsDataRepositoryStrategy implements
ThingsDataRepositoryStrategy,
SaveOperations,
ThingOperations,
TemplateOperations,
QueryOperations,
DDLOperations {
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
@Override
public Flux<Feature> getFeatures() {
return SaveOperations.super.getFeatures();
}
@Override
public String getId() {
return "none";
}
@Override
public String getName() {
return "不存储";
}
@Override
public SaveOperations opsForSave(OperationsContext context) {
return this;
}
@Override
public ThingOperations opsForThing(String thingType, String templateId, String thingId, OperationsContext context) {
return this;
}
@Override
public TemplateOperations opsForTemplate(String thingType, String templateId, OperationsContext context) {
return this;
}
@Override
public Mono<Void> save(ThingMessage thingMessage) {
return Mono.empty();
}
@Override
public Mono<Void> save(Collection<? extends ThingMessage> thingMessage) {
return Mono.empty();
}
@Override
public Mono<Void> save(Publisher<? extends ThingMessage> thingMessage) {
return Mono.empty();
}
@Override
public QueryOperations forQuery() {
return this;
}
@Override
public DDLOperations forDDL() {
return this;
}
@Nonnull
@Override
public Flux<ThingPropertyDetail> queryEachProperty(@Nonnull QueryParamEntity query, @Nonnull String... property) {
return Flux.empty();
}
@Nonnull
@Override
public Flux<ThingPropertyDetail> queryProperty(@Nonnull QueryParamEntity query, @Nonnull String... property) {
return Flux.empty();
}
@Nonnull
@Override
public Mono<PagerResult<ThingPropertyDetail>> queryPropertyPage(@Nonnull QueryParamEntity query, @Nonnull String... property) {
return Mono.just(PagerResult.empty());
}
@Nonnull
@Override
public Flux<AggregationData> aggregationProperties(@Nonnull AggregationRequest request, @Nonnull PropertyAggregation... properties) {
return Flux.empty();
}
@Override
public Flux<ThingMessageLog> queryMessageLog(@Nonnull QueryParamEntity query) {
return Flux.empty();
}
@Override
public Mono<PagerResult<ThingMessageLog>> queryMessageLogPage(@Nonnull QueryParamEntity query) {
return Mono.just(PagerResult.empty());
}
@Nonnull
@Override
public Mono<PagerResult<ThingEvent>> queryEventPage(@Nonnull String eventId, @Nonnull QueryParamEntity query, boolean format) {
return Mono.just(PagerResult.empty());
}
@Nonnull
@Override
public Flux<ThingEvent> queryEvent(@Nonnull String eventId, @Nonnull QueryParamEntity query, boolean format) {
return Flux.empty();
}
@Override
public Mono<Void> registerMetadata(ThingMetadata metadata) {
return Mono.empty();
}
@Override
public Mono<Void> reloadMetadata(ThingMetadata metadata) {
return Mono.empty();
}
}

View File

@ -0,0 +1,41 @@
package org.jetlinks.community.things.data;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.hswebframework.web.validator.ValidatorUtils;
import org.jetlinks.community.timeseries.query.Aggregation;
import org.springframework.util.StringUtils;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class PropertyAggregation {
@Schema(description = "属性ID")
@NotBlank
private String property; //要聚合对字段
@Schema(description = "别名,默认和property一致")
private String alias; //别名
@Schema(description = "聚合方式,支持(count,sum,max,min,avg)", type = "string")
@NotNull
private Aggregation agg; //聚合函数
public String getAlias() {
if (StringUtils.isEmpty(alias)) {
return property;
}
return alias;
}
public void validate() {
ValidatorUtils.tryValidate(this);
}
}

View File

@ -0,0 +1,72 @@
package org.jetlinks.community.things.data;
import com.fasterxml.jackson.annotation.JsonIgnore;
import io.swagger.v3.oas.annotations.Hidden;
import lombok.Generated;
import lombok.Getter;
import lombok.Setter;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.EventMetadata;
import org.jetlinks.core.metadata.types.ObjectType;
import org.jetlinks.community.timeseries.TimeSeriesData;
import java.util.HashMap;
import java.util.Map;
@Generated
@Getter
@Setter
public class ThingEvent extends HashMap<String, Object> {
private String thingId;
public ThingEvent() {
}
public ThingEvent(Map<String, Object> map, String thingIdProperty) {
super(map);
this.thingId = (String) map.get(thingIdProperty);
}
public ThingEvent(TimeSeriesData data, String thingIdProperty) {
this(data.getData(), thingIdProperty);
putIfAbsent(ThingsDataConstants.COLUMN_TIMESTAMP, data.getTimestamp());
}
public static ThingEvent of(TimeSeriesData data, String thingIdProperty) {
return new ThingEvent(data,thingIdProperty);
}
@Override
@Hidden
@JsonIgnore
public boolean isEmpty() {
return super.isEmpty();
}
public long getTimestamp() {
return containsKey(ThingsDataConstants.COLUMN_TIMESTAMP) ? (long) get(ThingsDataConstants.COLUMN_TIMESTAMP) : 0;
}
@SuppressWarnings("all")
public ThingEvent putFormat(EventMetadata metadata) {
if (metadata != null) {
DataType type = metadata.getType();
if (type instanceof ObjectType) {
Map<String, Object> val = (Map<String, Object>) type.format(this);
val.forEach((k, v) -> put(k + "_format", v));
} else {
put("value_format", type.format(get("value")));
}
} else {
Object value = get("value");
if (value instanceof Map) {
((Map) value).forEach((k, v) -> put(k + "_format", v));
} else {
put("value_format", get("value"));
}
}
return this;
}
}

View File

@ -0,0 +1,126 @@
package org.jetlinks.community.things.data;
import com.alibaba.fastjson.annotation.JSONField;
import lombok.AllArgsConstructor;
import lombok.Generated;
import lombok.Getter;
import org.hswebframework.web.dict.I18nEnumDict;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.MessageType;
import java.util.*;
import java.util.stream.Collectors;
@AllArgsConstructor
@Getter
@Generated
public enum ThingLogType implements I18nEnumDict<String> {
event("事件上报"),
readProperty("读取属性"),
writeProperty("修改属性"),
writePropertyReply("修改属性回复"),
reportProperty("属性上报"),
readPropertyReply("读取属性回复"),
child("子设备消息"),
childReply("子设备消息回复"),
functionInvoke("调用功能"),
functionReply("调用功能回复"),
register("设备注册"),
unregister("设备注销"),
readFirmware("读取固件信息"),
readFirmwareReply("读取固件信息回复"),
reportFirmware("上报固件信息"),
pullFirmware("拉取固件信息"),
pullFirmwareReply("拉取固件信息回复"),
upgradeFirmware("推送固件信息"),
upgradeFirmwareReply("推送固件信息回复"),
upgradeFirmwareProgress("固件更新进度"),
log("日志"),
tag("标签更新"),
offline("离线"),
online("上线"),
other("其它"),
direct("透传"),
acknowledge("应答"),
metadata("上报物模型"),
stateCheck("状态检查"),
stateCheckReply("状态检查回复"),
//状态检查
disconnect("断开连接"),
disconnectReply("断开连接回复");
@JSONField(serialize = false)
private final String text;
@Override
public String getValue() {
return name();
}
private final static Map<MessageType, ThingLogType> typeMapping = new EnumMap<>(MessageType.class);
public final static List<String> nameList;
static {
nameList = Collections.unmodifiableList(
Arrays.stream(values())
.map(ThingLogType::name)
.collect(Collectors.toList())
);
typeMapping.put(MessageType.EVENT, event);
typeMapping.put(MessageType.ONLINE, online);
typeMapping.put(MessageType.OFFLINE, offline);
typeMapping.put(MessageType.CHILD, child);
typeMapping.put(MessageType.CHILD_REPLY, childReply);
typeMapping.put(MessageType.LOG, log);
typeMapping.put(MessageType.UPDATE_TAG, tag);
typeMapping.put(MessageType.REPORT_PROPERTY, reportProperty);
typeMapping.put(MessageType.READ_PROPERTY, readProperty);
typeMapping.put(MessageType.READ_PROPERTY_REPLY, readPropertyReply);
typeMapping.put(MessageType.INVOKE_FUNCTION, functionInvoke);
typeMapping.put(MessageType.INVOKE_FUNCTION_REPLY, functionReply);
typeMapping.put(MessageType.WRITE_PROPERTY, writeProperty);
typeMapping.put(MessageType.WRITE_PROPERTY_REPLY, writePropertyReply);
typeMapping.put(MessageType.REGISTER, register);
typeMapping.put(MessageType.UN_REGISTER, unregister);
typeMapping.put(MessageType.READ_FIRMWARE, readFirmware);
typeMapping.put(MessageType.READ_FIRMWARE_REPLY, readFirmwareReply);
typeMapping.put(MessageType.REPORT_FIRMWARE, reportFirmware);
typeMapping.put(MessageType.REQUEST_FIRMWARE, pullFirmware);
typeMapping.put(MessageType.REQUEST_FIRMWARE_REPLY, pullFirmwareReply);
typeMapping.put(MessageType.UPGRADE_FIRMWARE, upgradeFirmware);
typeMapping.put(MessageType.UPGRADE_FIRMWARE_REPLY, upgradeFirmwareReply);
typeMapping.put(MessageType.UPGRADE_FIRMWARE_PROGRESS, upgradeFirmwareProgress);
typeMapping.put(MessageType.ACKNOWLEDGE, acknowledge);
typeMapping.put(MessageType.DERIVED_METADATA, metadata);
typeMapping.put(MessageType.STATE_CHECK, stateCheck);
typeMapping.put(MessageType.STATE_CHECK_REPLY, stateCheckReply);
typeMapping.put(MessageType.DISCONNECT, disconnect);
typeMapping.put(MessageType.DISCONNECT_REPLY, disconnectReply);
typeMapping.put(MessageType.DIRECT, direct);
}
@Generated
public static ThingLogType of(Message message) {
return Optional.ofNullable(typeMapping.get(message.getMessageType())).orElse(ThingLogType.other);
}
// @Override
// public Object getWriteJSONObject() {
// return getValue();
// }
}

View File

@ -0,0 +1,38 @@
package org.jetlinks.community.things.data;
import lombok.Generated;
import lombok.Getter;
import lombok.Setter;
import org.jetlinks.community.timeseries.TimeSeriesData;
import org.jetlinks.community.utils.ObjectMappers;
import java.io.Serializable;
@Getter
@Setter
@Generated
public class ThingMessageLog implements Serializable {
private String id;
private String thingId;
private long createTime;
private long timestamp;
private ThingLogType type;
private String content;
public static ThingMessageLog of(TimeSeriesData data, String thingIdProperty) {
ThingMessageLog log = data.as(ThingMessageLog.class);
log.thingId = data.getString(thingIdProperty, log.thingId);
return log;
}
@Override
public String toString() {
return ObjectMappers.toJsonString(this);
}
}

View File

@ -0,0 +1,19 @@
package org.jetlinks.community.things.data;
import lombok.Getter;
import java.util.HashMap;
import java.util.Map;
@Getter
public class ThingProperties extends HashMap<String, Object> {
private final String thingId;
public ThingProperties(Map<String, Object> data,String thingIdProperty) {
super(data);
this.thingId = (String) data.get(thingIdProperty);
}
}

View File

@ -0,0 +1,182 @@
package org.jetlinks.community.things.data;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Generated;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.codec.digest.DigestUtils;
import org.jetlinks.core.metadata.Converter;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.core.metadata.UnitSupported;
import org.jetlinks.core.metadata.types.NumberType;
import org.jetlinks.core.metadata.unit.ValueUnit;
import org.jetlinks.core.things.ThingProperty;
import org.jetlinks.community.timeseries.TimeSeriesData;
import org.jetlinks.community.timeseries.query.AggregationData;
import org.springframework.util.StringUtils;
import javax.annotation.Nullable;
import java.util.Optional;
import java.util.function.Function;
@Getter
@Setter
@Generated
public class ThingPropertyDetail implements ThingProperty {
@Schema(description = "ID")
private String id;
@Schema(description = "物实例ID")
private String thingId;
@Schema(description = "属性ID")
private String property;
@Schema(description = "状态")
private String state;
@Schema(description = "属性值")
private Object value;
@Schema(description = "数字值")
private Object numberValue;
@Schema(description = "格式化后的值")
private Object formatValue;
@Schema(description = "属性名")
private String propertyName;
@Schema(description = "类型")
private String type;
@Schema(description = "单位")
private String unit;
@Schema(description = "时间戳")
private long timestamp;
@Schema(description = "创建时间")
private long createTime;
@Schema(description = "格式化后的时间")
private String formatTime;
public ThingPropertyDetail property(String property) {
this.property = property;
return this;
}
public ThingPropertyDetail thingId(String thingId) {
this.thingId = thingId;
return this;
}
public ThingPropertyDetail timestamp(long timestamp) {
this.timestamp = timestamp;
return this;
}
public ThingPropertyDetail formatTime(String formatTime) {
this.formatTime = formatTime;
return this;
}
public ThingPropertyDetail withProperty(PropertyMetadata metadata) {
if (metadata != null) {
setProperty(metadata.getId());
setPropertyName(metadata.getName());
DataType type = metadata.getValueType();
Object value = this.getValue();
try {
if (type instanceof NumberType) {
NumberType<?> numberType = ((NumberType<?>) type);
Number numberValue = NumberType
.convertScaleNumber(value,
numberType.getScale(),
numberType.getRound(),
Function.identity());
if (numberValue != null) {
this.setValue(value = numberValue);
}
this.setNumberValue(numberValue);
} else if (type instanceof Converter) {
value = ((Converter<?>) type).convert(value);
this.setValue(value);
}
this.setFormatValue(type.format(value));
} catch (Exception ignore) {
}
if (type instanceof UnitSupported) {
UnitSupported unitSupported = (UnitSupported) type;
this.setUnit(Optional.ofNullable(unitSupported.getUnit())
.map(ValueUnit::getSymbol)
.orElse(null));
}
this.setType(type.getType());
}
return this;
}
public static ThingPropertyDetail of(TimeSeriesData data,
Object value,
PropertyMetadata metadata) {
ThingPropertyDetail deviceProperty = data.as(ThingPropertyDetail.class);
deviceProperty.setCreateTime(data.getLong("createTime", data.getTimestamp()));
deviceProperty.setTimestamp(data.getTimestamp());
deviceProperty.setValue(value);
return deviceProperty.withProperty(metadata);
}
public static ThingPropertyDetail of(Object value,
PropertyMetadata metadata) {
ThingPropertyDetail property = new ThingPropertyDetail();
property.setTimestamp(System.currentTimeMillis());
property.setCreateTime(property.getTimestamp());
property.setValue(value);
return property.withProperty(metadata);
}
public static ThingPropertyDetail of(AggregationData data,
PropertyMetadata metadata) {
ThingPropertyDetail property = data.as(ThingPropertyDetail.class);
return property.withProperty(metadata);
}
@Nullable
public static ThingPropertyDetail of(TimeSeriesData timeSeriesData,
PropertyMetadata metadata) {
if (metadata == null) {
return null;
}
ThingPropertyDetail property = timeSeriesData.as(ThingPropertyDetail.class);
property.setTimestamp(timeSeriesData.getTimestamp());
return property
.withProperty(metadata)
.generateId();
}
public ThingPropertyDetail generateId() {
if (!StringUtils.hasText(id)) {
setId(DigestUtils.md5Hex(String.join("", thingId, property, String.valueOf(timestamp))));
}
return this;
}
@Override
public String toString() {
return timestamp + " : " + property + " = " + value;
}
}

View File

@ -0,0 +1,64 @@
package org.jetlinks.community.things.data;
import org.jetlinks.core.config.ConfigKey;
import org.jetlinks.core.metadata.PropertyMetadata;
public interface ThingsDataConstants {
String COLUMN_ID = "id";
String COLUMN_THING_ID = "thingId";
String COLUMN_TEMPLATE_ID = "templateId";
String COLUMN_PROPERTY_ID = "property";
String COLUMN_PROPERTY_TYPE = "type";
String COLUMN_PROPERTY_VALUE = "value";
String COLUMN_PROPERTY_NUMBER_VALUE = "numberValue";
String COLUMN_PROPERTY_TIME_VALUE = "timeValue";
String COLUMN_PROPERTY_GEO_VALUE = "geoValue";
String COLUMN_PROPERTY_ARRAY_VALUE = "arrayValue";
String COLUMN_PROPERTY_OBJECT_VALUE = "objectValue";
String COLUMN_EVENT_ID = "event";
String COLUMN_EVENT_VALUE = "value";
String COLUMN_TIMESTAMP = "timestamp";
String COLUMN_CREATE_TIME = "createTime";
String COLUMN_MESSAGE_ID = "messageId";
String COLUMN_LOG_TYPE = "type";
String COLUMN_LOG_CONTENT = "content";
ConfigKey<String> storePolicyConfigKey = ConfigKey.of("storePolicy");
String propertyStorageType = "storageType";
String propertyStorageTypeJson = "json-string";
String propertyStorageTypeIgnore = "ignore";
/**
* 判断属性是否使用json字符串来存储
*
* @param metadata 属性物模型
* @return 是否使用json字符串存储
*/
static boolean propertyIsJsonStringStorage(PropertyMetadata metadata) {
return metadata
.getExpand(propertyStorageType)
.map(propertyStorageTypeJson::equals)
.orElse(false);
}
/**
* 判断属性是否忽略存储
*
* @param metadata 属性物模型
* @return 属性是否忽略存储
*/
static boolean propertyIsIgnoreStorage(PropertyMetadata metadata) {
return metadata
.getExpand(propertyStorageType)
.map(propertyStorageTypeIgnore::equals)
.orElse(false);
}
}

View File

@ -0,0 +1,18 @@
package org.jetlinks.community.things.data;
import org.jetlinks.community.things.data.operations.DataSettings;
import org.jetlinks.community.things.data.operations.MetricBuilder;
public interface ThingsDataContext {
void customMetricBuilder(String thingType, MetricBuilder metricBuilder);
void customSettings(String thingType, DataSettings settings);
void setDefaultPolicy(String policy);
void setDefaultSettings(DataSettings settings);
void addPolicy(ThingsDataRepositoryStrategy policy);
}

View File

@ -0,0 +1,8 @@
package org.jetlinks.community.things.data;
public interface ThingsDataCustomizer {
void custom(ThingsDataContext context);
}

View File

@ -0,0 +1,24 @@
package org.jetlinks.community.things.data;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
public class ThingsDataRepositoryStrategies {
private final static Map<String, ThingsDataRepositoryStrategy> strategyMap = new ConcurrentHashMap<>();
static void register(ThingsDataRepositoryStrategy strategy) {
strategyMap.put(strategy.getId(), strategy);
}
public static List<ThingsDataRepositoryStrategy> getAll() {
List<ThingsDataRepositoryStrategy> strategies = new ArrayList<>(strategyMap.values());
strategies.sort(Comparator.comparingLong(ThingsDataRepositoryStrategy::getOrder));
return strategies;
}
public static Optional<ThingsDataRepositoryStrategy> getStrategy(String id) {
return Optional.ofNullable(strategyMap.get(id));
}
}

View File

@ -0,0 +1,50 @@
package org.jetlinks.community.things.data;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import org.jetlinks.core.metadata.Feature;
import org.jetlinks.community.things.data.operations.*;
import org.springframework.core.Ordered;
import reactor.core.publisher.Flux;
public interface ThingsDataRepositoryStrategy extends Ordered {
String getId();
String getName();
SaveOperations opsForSave(OperationsContext context);
ThingOperations opsForThing(String thingType, String templateId, String thingId, OperationsContext context);
TemplateOperations opsForTemplate(String thingType, String templateId, OperationsContext context);
@Override
default int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}
default Flux<Feature> getFeatures() {
return Flux.empty();
}
@EqualsAndHashCode(cacheStrategy = EqualsAndHashCode.CacheStrategy.LAZY)
@Getter
@AllArgsConstructor
class OperationsContext {
public static final OperationsContext DEFAULT = new OperationsContext(MetricBuilder.DEFAULT, new DataSettings());
private final MetricBuilder metricBuilder;
private final DataSettings settings;
public OperationsContext settings(DataSettings settings) {
return new OperationsContext(metricBuilder, settings);
}
public OperationsContext metricBuilder(MetricBuilder metricBuilder) {
return new OperationsContext(metricBuilder, settings);
}
}
}

View File

@ -0,0 +1,22 @@
package org.jetlinks.community.things.data;
import org.jetlinks.core.things.ThingProperty;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
public interface ThingsDataWriter {
@Nonnull
Mono<Void> updateProperty(@Nonnull String thingType,
@Nonnull String thingId,
@Nonnull ThingProperty property);
@Nonnull
Mono<Void> updateProperty(@Nonnull String thingType,
@Nonnull String thingId,
@Nonnull String property,
long timestamp,
@Nonnull Object value,
String state);
}

View File

@ -0,0 +1,151 @@
package org.jetlinks.community.things.data.operations;
import lombok.AllArgsConstructor;
import org.apache.commons.collections.CollectionUtils;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.EventMetadata;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.core.metadata.SimplePropertyMetadata;
import org.jetlinks.core.metadata.types.DateTimeType;
import org.jetlinks.core.metadata.types.ObjectType;
import org.jetlinks.core.metadata.types.StringType;
import org.jetlinks.core.things.ThingMetadata;
import org.jetlinks.community.ConfigMetadataConstants;
import org.jetlinks.community.things.data.ThingsDataConstants;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.function.Function3;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@AllArgsConstructor
public abstract class AbstractDDLOperations implements DDLOperations {
protected final String thingType;
protected final String templateId;
protected final String thingId;
protected final DataSettings settings;
protected final MetricBuilder metricBuilder;
protected List<PropertyMetadata> createBasicColumns() {
return Arrays
.asList(
SimplePropertyMetadata.of(ThingsDataConstants.COLUMN_ID, "ID", StringType.GLOBAL),
SimplePropertyMetadata.of(metricBuilder.getThingIdProperty(), "物ID", StringType.GLOBAL),
SimplePropertyMetadata.of(ThingsDataConstants.COLUMN_MESSAGE_ID, "消息ID", StringType.GLOBAL),
SimplePropertyMetadata.of(ThingsDataConstants.COLUMN_CREATE_TIME, "创建时间", DateTimeType.GLOBAL),
SimplePropertyMetadata.of(ThingsDataConstants.COLUMN_TIMESTAMP, "数据时间", DateTimeType.GLOBAL)
);
}
protected abstract List<PropertyMetadata> createPropertyProperties(List<PropertyMetadata> propertyMetadata);
protected List<PropertyMetadata> createLogProperties() {
List<PropertyMetadata> metadata = new ArrayList<>(createBasicColumns());
{
metadata.add(SimplePropertyMetadata.of(
ThingsDataConstants.COLUMN_LOG_TYPE,
"日志类型",
StringType.GLOBAL
));
}
{
metadata.add(SimplePropertyMetadata.of(
ThingsDataConstants.COLUMN_LOG_CONTENT,
"日志内容",
new StringType().expand(ConfigMetadataConstants.maxLength, 4096L)
));
}
return metadata;
}
protected List<PropertyMetadata> createEventProperties(EventMetadata event) {
List<PropertyMetadata> metadata = new ArrayList<>(
createBasicColumns()
);
DataType type = event.getType();
if (type instanceof ObjectType) {
if (CollectionUtils.isNotEmpty(((ObjectType) type).getProperties())) {
metadata.addAll(((ObjectType) type).getProperties());
}
} else {
metadata.add(
SimplePropertyMetadata.of(ThingsDataConstants.COLUMN_EVENT_VALUE, "事件数据", type)
);
}
return metadata;
}
protected List<PropertyMetadata> createEventProperties() {
List<PropertyMetadata> metadata = new ArrayList<>(
createBasicColumns()
);
metadata.add(
SimplePropertyMetadata.of(ThingsDataConstants.COLUMN_EVENT_VALUE, "事件数据", StringType.GLOBAL)
);
return metadata;
}
private Mono<Void> doWith(ThingMetadata metadata,
Function3<MetricType, String, List<PropertyMetadata>, Mono<Void>> handler) {
Mono<Void> properties = handler.apply(
MetricType.properties,
metricBuilder.createPropertyMetric(thingType, templateId, thingId),
createPropertyProperties(metadata.getProperties()));
Mono<Void> log = handler.apply(
MetricType.log,
metricBuilder.createLogMetric(thingType, templateId, thingId),
createLogProperties());
Mono<Void> event;
if (settings.getEvent().eventIsAllInOne()) {
event = handler.apply(MetricType.event,
metricBuilder.createEventAllInOneMetric(thingType, templateId, thingId),
createEventProperties());
} else {
event = Flux
.fromIterable(metadata.getEvents())
.flatMap(e -> handler.apply(MetricType.event,
metricBuilder.createEventMetric(thingType, templateId, thingId, e.getId()),
createEventProperties(e)))
.then();
}
return Flux
.concat(properties, log, event)
.then();
}
@Override
public final Mono<Void> registerMetadata(ThingMetadata metadata) {
return doWith(metadata, this::register);
}
@Override
public final Mono<Void> reloadMetadata(ThingMetadata metadata) {
return doWith(metadata, this::reload);
}
protected abstract Mono<Void> register(MetricType metricType, String metric, List<PropertyMetadata> properties);
protected abstract Mono<Void> reload(MetricType metricType, String metric, List<PropertyMetadata> properties);
public enum MetricType {
properties,
log,
event
}
}

View File

@ -0,0 +1,257 @@
package org.jetlinks.community.things.data.operations;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.hswebframework.ezorm.core.dsl.Query;
import org.hswebframework.ezorm.core.param.QueryParam;
import org.hswebframework.web.api.crud.entity.PagerResult;
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.core.things.Thing;
import org.jetlinks.core.things.ThingMetadata;
import org.jetlinks.core.things.ThingTemplate;
import org.jetlinks.core.things.ThingsRegistry;
import org.jetlinks.community.things.data.*;
import org.jetlinks.community.timeseries.TimeSeriesData;
import org.jetlinks.community.timeseries.query.AggregationData;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
@AllArgsConstructor
public abstract class AbstractQueryOperations implements QueryOperations {
protected final String thingType;
protected final String thingTemplateId;
protected final String thingId;
protected final MetricBuilder metricBuilder;
protected final DataSettings settings;
protected final ThingsRegistry registry;
protected abstract Flux<TimeSeriesData> doQuery(String metric, Query<?, QueryParamEntity> query);
protected abstract <T> Mono<PagerResult<T>> doQueryPage(String metric,
Query<?, QueryParamEntity> query,
Function<TimeSeriesData, T> mapper);
protected abstract Flux<AggregationData> doAggregation(String metric, AggregationRequest request, AggregationContext context);
protected Mono<ThingMetadata> getMetadata() {
//指定了物实例
if (StringUtils.hasText(thingId)) {
return registry
.getThing(thingType, thingId)
.flatMap(Thing::getMetadata);
}
return registry
.getTemplate(thingType, thingTemplateId)
.flatMap(ThingTemplate::getMetadata);
}
protected void applyQuery(Query<?, ? extends QueryParam> query) {
//默认时间戳倒序
if (CollectionUtils.isEmpty(query.getParam().getSorts())) {
query.orderByDesc(ThingsDataConstants.COLUMN_TIMESTAMP);
}
//指定物实例追加条件: where thingId = ?
if (StringUtils.hasText(thingId)) {
query.and(metricBuilder.getThingIdProperty(), thingId);
}
}
protected abstract Flux<ThingPropertyDetail> queryProperty(@Nonnull QueryParamEntity param,
@Nonnull ThingMetadata metadata,
@Nonnull Map<String, PropertyMetadata> properties);
protected abstract Mono<PagerResult<ThingPropertyDetail>> queryPropertyPage(@Nonnull QueryParamEntity param,
@Nonnull ThingMetadata metadata,
@Nonnull Map<String, PropertyMetadata> properties);
protected abstract Flux<ThingPropertyDetail> queryEachProperty(@Nonnull String metric,
@Nonnull Query<?, QueryParamEntity> query,
@Nonnull ThingMetadata metadata,
@Nonnull Map<String, PropertyMetadata> properties);
@Nonnull
@Override
public final Flux<ThingPropertyDetail> queryEachProperty(@Nonnull QueryParamEntity param,
@Nonnull String... property) {
return this
.getMetadata()
.flatMapMany(metadata -> {
Map<String, PropertyMetadata> properties = getProperties(metadata, property);
if (properties.isEmpty()) {
return Mono.empty();
}
if (properties.size() == 1) {
return queryProperty(param, metadata, properties);
}
String metric = metricBuilder.createPropertyMetric(thingType, thingTemplateId, thingId);
Query<?, QueryParamEntity> query = param.toNestQuery(this::applyQuery);
return queryEachProperty(metric, query, metadata, properties);
});
}
@Nonnull
@Override
public final Flux<ThingPropertyDetail> queryProperty(@Nonnull QueryParamEntity query,
@Nonnull String... property) {
return this
.getMetadata()
.flatMapMany(metadata -> {
Map<String, PropertyMetadata> properties = getProperties(metadata, property);
if (properties.isEmpty()) {
return Mono.empty();
}
return queryProperty(query.clone(), metadata, properties);
});
}
@Nonnull
@Override
public final Mono<PagerResult<ThingPropertyDetail>> queryPropertyPage(@Nonnull QueryParamEntity query,
@Nonnull String... property) {
return this
.getMetadata()
.flatMap(metadata -> {
Map<String, PropertyMetadata> properties = getProperties(metadata, property);
if (properties.isEmpty()) {
return Mono.empty();
}
return queryPropertyPage(query.clone(), metadata, properties);
})
.defaultIfEmpty(PagerResult.of(0, new ArrayList<>(), query));
}
private Map<String, PropertyMetadata> getProperties(ThingMetadata metadata, String... property) {
if (property.length == 0) {
return metadata
.getProperties()
.stream()
.collect(Collectors.toMap(PropertyMetadata::getId, Function.identity()));
}
Set<String> propertiesFilter = property.length == 1
? Collections.singleton(property[0])
: new HashSet<>(Arrays.asList(property));
return metadata
.getProperties()
.stream()
.filter(p -> propertiesFilter.contains(p.getId()))
.collect(Collectors.toMap(PropertyMetadata::getId, Function.identity()));
}
@Nonnull
@Override
public final Flux<AggregationData> aggregationProperties(@Nonnull AggregationRequest request,
@Nonnull PropertyAggregation... properties) {
String metric = metricBuilder.createPropertyMetric(thingType, thingTemplateId, thingId);
AggregationRequest aggRequest = request.copy();
aggRequest.getFilter().toNestQuery(this::applyQuery);
return getMetadata()
.flatMapMany(metadata -> doAggregation(metric, aggRequest, new AggregationContext(metadata, properties)));
}
@Override
public Flux<ThingMessageLog> queryMessageLog(@Nonnull QueryParamEntity param) {
String metric = metricBuilder.createLogMetric(thingType, thingTemplateId, thingId);
Query<?, QueryParamEntity> query = param.toNestQuery(this::applyQuery);
return doQuery(metric, query)
.map(data -> ThingMessageLog.of(data, metricBuilder.getThingIdProperty()));
}
@Override
public Mono<PagerResult<ThingMessageLog>> queryMessageLogPage(@Nonnull QueryParamEntity param) {
String metric = metricBuilder.createLogMetric(thingType, thingTemplateId, thingId);
Query<?, QueryParamEntity> query = param.toNestQuery(this::applyQuery);
return doQueryPage(metric, query, data -> ThingMessageLog.of(data, metricBuilder.getThingIdProperty()));
}
@Nonnull
@Override
public Mono<PagerResult<ThingEvent>> queryEventPage(@Nonnull String eventId,
@Nonnull QueryParamEntity param,
boolean format) {
Query<?, QueryParamEntity> query = param.toNestQuery();
String metric;
if (settings.getEvent().eventIsAllInOne()) {
metric = metricBuilder.createEventAllInOneMetric(thingType, thingTemplateId, thingId);
query.and(ThingsDataConstants.COLUMN_EVENT_ID, eventId);
} else {
metric = metricBuilder.createEventMetric(thingType, thingTemplateId, thingId, eventId);
}
if (format) {
return getMetadata()
.mapNotNull(metadata -> metadata.getEventOrNull(eventId))
.flatMap(metadata -> doQueryPage(metric, query, data -> ThingEvent
.of(data, metricBuilder.getThingIdProperty())
.putFormat(metadata)))
.defaultIfEmpty(PagerResult.of(0, new ArrayList<>(), param));
}
return doQueryPage(metric, query, data -> ThingEvent.of(data, metricBuilder.getThingIdProperty()))
.defaultIfEmpty(PagerResult.of(0, new ArrayList<>(), param));
}
@Nonnull
@Override
public final Flux<ThingEvent> queryEvent(@Nonnull String eventId, @Nonnull QueryParamEntity param, boolean format) {
Query<?, QueryParamEntity> query = param.toNestQuery();
String metric;
if (settings.getEvent().eventIsAllInOne()) {
metric = metricBuilder.createEventAllInOneMetric(thingType, thingTemplateId, thingId);
query.and(ThingsDataConstants.COLUMN_EVENT_ID, eventId);
} else {
metric = metricBuilder.createEventMetric(thingType, thingTemplateId, thingId, eventId);
}
if (format) {
return getMetadata()
.mapNotNull(metadata -> metadata.getEventOrNull(eventId))
.flatMapMany(metadata -> doQuery(metric, query).map(data -> ThingEvent
.of(data, metricBuilder.getThingIdProperty())
.putFormat(metadata)));
}
return doQuery(metric, query)
.map(data -> ThingEvent.of(data, metricBuilder.getThingIdProperty()));
}
@Getter
protected static class AggregationContext {
private final Map<String, String> propertyAlias;
private final Map<String, PropertyAggregation> aliasToProperty;
private final PropertyAggregation[] properties;
private final ThingMetadata metadata;
public AggregationContext(ThingMetadata metadata, PropertyAggregation... properties) {
this.metadata = metadata;
this.properties = properties;
propertyAlias = Arrays
.stream(properties)
.collect(Collectors.toMap(PropertyAggregation::getAlias,
PropertyAggregation::getProperty));
aliasToProperty = Arrays
.stream(properties)
.collect(Collectors.toMap(PropertyAggregation::getAlias,
Function.identity()));
}
}
}

View File

@ -0,0 +1,265 @@
package org.jetlinks.community.things.data.operations;
import com.google.common.collect.Maps;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.MapUtils;
import org.hswebframework.web.id.IDGenerator;
import org.hswebframework.web.utils.DigestUtils;
import org.jetlinks.core.message.DeviceLogMessage;
import org.jetlinks.core.message.Headers;
import org.jetlinks.core.message.ThingMessage;
import org.jetlinks.core.message.event.ThingEventMessage;
import org.jetlinks.core.message.property.PropertyMessage;
import org.jetlinks.core.message.property.ThingReportPropertyMessage;
import org.jetlinks.core.metadata.*;
import org.jetlinks.core.metadata.types.ObjectType;
import org.jetlinks.core.metadata.types.UnknownType;
import org.jetlinks.core.things.Thing;
import org.jetlinks.core.things.ThingMetadata;
import org.jetlinks.core.things.ThingTemplate;
import org.jetlinks.core.things.ThingsRegistry;
import org.jetlinks.core.utils.StringBuilderUtils;
import org.jetlinks.core.utils.TimestampUtils;
import org.jetlinks.community.things.ThingConstants;
import org.jetlinks.community.things.data.ThingLogType;
import org.jetlinks.community.timeseries.TimeSeriesData;
import org.jetlinks.community.utils.ObjectMappers;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
import java.util.*;
import static org.jetlinks.community.things.data.ThingsDataConstants.*;
@Slf4j
@AllArgsConstructor
public abstract class AbstractSaveOperations implements SaveOperations {
protected final ThingsRegistry registry;
protected final MetricBuilder metricBuilder;
protected final DataSettings settings;
@Override
public final Mono<Void> save(ThingMessage thingMessage) {
return this
.convertMessageToTimeSeriesData(thingMessage)
.flatMap(tp2 -> this.doSave(tp2.getT1(), tp2.getT2()))
.then();
}
@Override
public final Mono<Void> save(Collection<? extends ThingMessage> thingMessage) {
return save(Flux.fromIterable(thingMessage));
}
@Override
public final Mono<Void> save(Publisher<? extends ThingMessage> thingMessage) {
return Flux
.from(thingMessage)
.flatMap(this::convertMessageToTimeSeriesData)
.groupBy(Tuple2::getT1)
.flatMap(group -> this.doSave(group.key(), group.map(Tuple2::getT2)))
.then();
}
protected Map<String, Object> createLogData(ThingMessage message) {
Map<String, Object> data = Maps.newHashMapWithExpectedSize(8);
data.put(COLUMN_ID, IDGenerator.SNOW_FLAKE_STRING.generate());
data.put(metricBuilder.getThingIdProperty(), message.getThingId());
data.put(COLUMN_TIMESTAMP, message.getTimestamp());
data.put(COLUMN_CREATE_TIME, System.currentTimeMillis());
data.put(COLUMN_MESSAGE_ID, message.getMessageId());
data.put(COLUMN_LOG_TYPE, ThingLogType.of(message).name());
String log;
if (message instanceof DeviceLogMessage) {
log = ((DeviceLogMessage) message).getLog();
} else {
log = ObjectMappers.toJsonString(message.toJson());
}
data.put(COLUMN_LOG_CONTENT, log);
return data;
}
protected String getTemplateIdFromMessage(ThingMessage message) {
String templateId = message.getHeader(Headers.productId).orElse(null);
if (templateId == null) {
templateId = message.getHeader(ThingConstants.templateId).orElse(null);
}
return templateId == null ? "null" : templateId;
}
protected Flux<Tuple2<String, TimeSeriesData>> convertMessageToTimeSeriesData(ThingMessage message) {
boolean ignoreStorage = message.getHeaderOrDefault(Headers.ignoreStorage);
boolean ignoreLog = message.getHeaderOrDefault(Headers.ignoreLog);
if (ignoreStorage && ignoreLog) {
return Flux.empty();
}
String templateId = getTemplateIdFromMessage(message);
List<Publisher<Tuple2<String, TimeSeriesData>>> all = new ArrayList<>(2);
//没有忽略数据存储
if (!ignoreStorage) {
//事件上报
if (message instanceof ThingEventMessage) {
all.add(convertEventMessageToTimeSeriesData(templateId, ((ThingEventMessage) message)));
}
//属性相关消息
else if (message instanceof PropertyMessage) {
//配置了只保存属性上报
if (!settings.getProperty().isOnlySaveReport()
|| (message instanceof ThingReportPropertyMessage)) {
PropertyMessage propertyMessage = ((PropertyMessage) message);
Map<String, Object> properties = propertyMessage.getProperties();
if (MapUtils.isNotEmpty(properties)) {
//属性源时间
Map<String, Long> propertiesTimes = propertyMessage.getPropertySourceTimes();
if (propertiesTimes == null) {
propertiesTimes = Collections.emptyMap();
}
all.add(convertProperties(templateId, message, properties, propertiesTimes));
}
}
}
}
//配置了记录日志,并且消息头里没有标记忽略日志
if (settings.getLogFilter().match(message.getMessageType()) && !ignoreLog) {
all.add(createDeviceMessageLog(templateId, message));
}
return Flux.merge(all);
}
private Mono<Tuple2<String, TimeSeriesData>> convertEventMessageToTimeSeriesData(String templateId, ThingEventMessage message) {
return registry
.getTemplate(message.getThingType(), templateId)
.flatMap(thing -> {
//配置了所有事件存储在同一个表中时,这时支持设备自定义事件物模型,直接获取设备的物模型
if (settings.getEvent().eventIsAllInOne()) {
return thing.getMetadata();
}
//获取设备产品的物模型,为什么不直接获取模版?因为后期可能支持多版本.
return registry
.getThing(message.getThingType(), message.getThingId())
.flatMap(Thing::getTemplate)
.flatMap(ThingTemplate::getMetadata);
})
.<TimeSeriesData>handle((metadata, sink) -> {
if (settings.getEvent().shouldIgnoreUndefined()
&& metadata.getEventOrNull(message.getEvent()) == null) {
log.warn("{}[{}] event [{}] metadata undefined", message.getThingType(), message.getThingId(), message.getEvent());
return;
}
Map<String, Object> data = createEventData(message, metadata);
sink.next(TimeSeriesData.of(TimestampUtils.toMillis(message.getTimestamp()), data));
})
.map(data -> Tuples.of(createEventMetric(message.getThingType(), templateId, message.getThingId(), message.getEvent()), data));
}
private String createEventMetric(String thingType,
String thingTemplateId,
String thingId,
String eventId) {
return settings.getEvent().eventIsAllInOne()
? metricBuilder.createEventAllInOneMetric(thingType, thingTemplateId, thingId)
: metricBuilder.createEventMetric(thingType, thingTemplateId, thingId, eventId);
}
protected Object convertValue(Object value, DataType type) {
if (type instanceof Converter) {
return ((Converter<?>) type).convert(value);
}
return value;
}
protected Map<String, Object> createEventData(ThingEventMessage message, ThingMetadata metadata) {
Object value = message.getData();
DataType dataType = metadata
.getEvent(message.getEvent())
.map(EventMetadata::getType)
.orElseGet(UnknownType::new);
Object tempValue = convertValue(value, dataType);
Map<String, Object> data;
//使用json字符存储数据
if (settings.getEvent().isUsingJsonString()) {
data = Maps.newHashMapWithExpectedSize(16);
data.put(COLUMN_EVENT_VALUE, tempValue instanceof String ? tempValue : ObjectMappers.toJsonString(tempValue));
} else {
if (tempValue instanceof Map) {
@SuppressWarnings("all")
Map<String, Object> mapValue = ((Map) tempValue);
int size = mapValue.size();
data = Maps.newHashMapWithExpectedSize(size);
data.putAll(mapValue);
//严格模式,只记录物模型中记录的字段
if (settings.isStrict()) {
if (dataType instanceof ObjectType) {
Set<String> nonexistent = new HashSet<>(data.keySet());
ObjectType objType = ((ObjectType) dataType);
for (PropertyMetadata property : objType.getProperties()) {
nonexistent.remove(property.getId());
}
nonexistent.forEach(data::remove);
}
}
} else {
data = Maps.newHashMapWithExpectedSize(16);
data.put(COLUMN_EVENT_VALUE, tempValue);
}
}
//所有数据都存储在一个表里时,给表添加一个event值
if (settings.getEvent().eventIsAllInOne()) {
data.put(COLUMN_EVENT_ID, message.getEvent());
}
data.put(COLUMN_ID, createEventDataId(message));
data.put(metricBuilder.getThingIdProperty(), message.getThingId());
data.put(COLUMN_CREATE_TIME, System.currentTimeMillis());
data.put(COLUMN_TIMESTAMP, message.getTimestamp());
return data;
}
protected String createEventDataId(ThingMessage message) {
return DigestUtils
.md5Hex(StringBuilderUtils.buildString(message, (msg, builder) -> builder
.append(msg.getThingId())
.append('-')
.append(msg.getTimestamp())));
}
private Mono<Tuple2<String, TimeSeriesData>> createDeviceMessageLog(String templateId,
ThingMessage message) {
return Mono.just(Tuples.of(
metricBuilder.createLogMetric(message.getThingType(), templateId, message.getThingId()),
TimeSeriesData.of(message.getTimestamp(), createLogData(message))));
}
protected abstract Flux<Tuple2<String, TimeSeriesData>> convertProperties(String templateId,
ThingMessage message,
Map<String, Object> properties,
Map<String, Long> propertySourceTimes);
protected abstract Mono<Void> doSave(String metric, TimeSeriesData data);
protected abstract Mono<Void> doSave(String metric, Flux<TimeSeriesData> data);
@Override
public Flux<Feature> getFeatures() {
if (settings.getEvent().eventIsAllInOne()) {
return Flux.empty();
} else {
//事件不支持新增以及修改
return Flux.just(MetadataFeature.eventNotInsertable,
MetadataFeature.eventNotModifiable
);
}
}
}

View File

@ -0,0 +1,24 @@
package org.jetlinks.community.things.data.operations;
import org.jetlinks.core.metadata.PropertyMetadata;
import java.util.ArrayList;
import java.util.List;
public abstract class ColumnModeDDLOperationsBase extends AbstractDDLOperations{
public ColumnModeDDLOperationsBase(String thingType,
String templateId,
String thingId,
DataSettings settings,
MetricBuilder metricBuilder) {
super(thingType, templateId, thingId, settings, metricBuilder);
}
@Override
protected List<PropertyMetadata> createPropertyProperties(List<PropertyMetadata> propertyMetadata) {
List<PropertyMetadata> props = new ArrayList<>(createBasicColumns());
props.addAll(propertyMetadata);
return props;
}
}

View File

@ -0,0 +1,19 @@
package org.jetlinks.community.things.data.operations;
import org.hswebframework.web.api.crud.entity.PagerResult;
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
import org.jetlinks.community.things.data.ThingProperties;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
public interface ColumnModeQueryOperations extends QueryOperations {
@Nonnull
Flux<ThingProperties> queryAllProperties(@Nonnull QueryParamEntity query);
@Nonnull
Mono<PagerResult<ThingProperties>> queryAllPropertiesPage(@Nonnull QueryParamEntity query);
}

View File

@ -0,0 +1,128 @@
package org.jetlinks.community.things.data.operations;
import org.hswebframework.ezorm.core.dsl.Query;
import org.hswebframework.web.api.crud.entity.PagerResult;
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.core.things.ThingMetadata;
import org.jetlinks.core.things.ThingsRegistry;
import org.jetlinks.community.things.data.AggregationRequest;
import org.jetlinks.community.things.data.ThingProperties;
import org.jetlinks.community.things.data.ThingPropertyDetail;
import org.jetlinks.community.timeseries.TimeSeriesData;
import org.jetlinks.community.timeseries.query.AggregationData;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.util.Map;
import java.util.function.Function;
/**
* 列式模式查询操作基础类,实现列式模式下通用查询相关操作.
* <p>
* 列式模式表示: 属性相关消息
*
* @author zhouhao
* @since 2.0
*/
public abstract class ColumnModeQueryOperationsBase extends AbstractQueryOperations implements ColumnModeQueryOperations {
public ColumnModeQueryOperationsBase(String thingType,
String thingTemplateId,
String thingId,
MetricBuilder metricBuilder,
DataSettings settings,
ThingsRegistry registry) {
super(thingType, thingTemplateId, thingId, metricBuilder, settings, registry);
}
@Override
protected final Flux<ThingPropertyDetail> queryProperty(@Nonnull QueryParamEntity param,
@Nonnull ThingMetadata metadata,
@Nonnull Map<String, PropertyMetadata> properties) {
String metric = metricBuilder.createPropertyMetric(thingType, thingTemplateId, thingId);
return queryProperty(metric, param.toNestQuery(this::applyQuery), metadata, properties);
}
protected Flux<ThingPropertyDetail> queryProperty(@Nonnull String metric,
@Nonnull Query<?, QueryParamEntity> query,
@Nonnull ThingMetadata metadata,
@Nonnull Map<String, PropertyMetadata> properties) {
return this
.doQuery(metric, query)
.flatMap(data -> Flux
.create(sink -> {
for (Map.Entry<String, PropertyMetadata> entry : properties.entrySet()) {
data
.get(entry.getKey())
.ifPresent(value -> sink
.next(ThingPropertyDetail
.of(value, entry.getValue())
.thingId(data.getString(metricBuilder.getThingIdProperty(), null))
.generateId()
));
}
sink.complete();
}));
}
protected Flux<ThingPropertyDetail> queryEachProperty(@Nonnull String metric,
@Nonnull Query<?, QueryParamEntity> query,
@Nonnull ThingMetadata metadata,
@Nonnull Map<String, PropertyMetadata> properties) {
return queryProperty(metric, query, metadata, properties);
}
@Override
protected Mono<PagerResult<ThingPropertyDetail>> queryPropertyPage(@Nonnull QueryParamEntity param,
@Nonnull ThingMetadata metadata,
@Nonnull Map<String, PropertyMetadata> properties) {
if (properties.size() > 1) {
//列式模式不支持同时查询多个属性
return Mono.error(new UnsupportedOperationException("error.unsupported_query_multi_property"));
}
String metric = metricBuilder.createPropertyMetric(thingType, thingTemplateId, thingId);
Query<?, QueryParamEntity> query = param.toNestQuery(this::applyQuery);
String property = properties.keySet().iterator().next();
query.notNull(property);
return this
.doQueryPage(metric,
query,
data -> ThingPropertyDetail
.of(data.get(property).orElse(null), properties.get(property))
.thingId(data.getString(metricBuilder.getThingIdProperty(), null))
.generateId()
);
}
@Nonnull
@Override
public Mono<PagerResult<ThingProperties>> queryAllPropertiesPage(@Nonnull QueryParamEntity param) {
String metric = metricBuilder.createPropertyMetric(thingType, thingTemplateId, thingId);
Query<?, QueryParamEntity> query = param.toNestQuery(this::applyQuery);
return doQueryPage(metric, query, data -> new ThingProperties(data.getData(), metricBuilder.getThingIdProperty()));
}
@Nonnull
@Override
public Flux<ThingProperties> queryAllProperties(@Nonnull QueryParamEntity param) {
String metric = metricBuilder.createPropertyMetric(thingType, thingTemplateId, thingId);
Query<?, QueryParamEntity> query = param.toNestQuery(this::applyQuery);
return doQuery(metric, query)
.map(data -> new ThingProperties(data.getData(), metricBuilder.getThingIdProperty()));
}
@Override
protected abstract Flux<TimeSeriesData> doQuery(String metric, Query<?, QueryParamEntity> query);
@Override
protected abstract <T> Mono<PagerResult<T>> doQueryPage(String metric, Query<?, QueryParamEntity> query, Function<TimeSeriesData, T> mapper);
@Override
protected abstract Flux<AggregationData> doAggregation(String metric, AggregationRequest request, AggregationContext context);
}

View File

@ -0,0 +1,137 @@
package org.jetlinks.community.things.data.operations;
import com.google.common.collect.Maps;
import org.apache.commons.collections.MapUtils;
import org.hswebframework.web.utils.DigestUtils;
import org.jetlinks.core.message.ThingMessage;
import org.jetlinks.core.metadata.Converter;
import org.jetlinks.core.metadata.Feature;
import org.jetlinks.core.metadata.MetadataFeature;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.core.metadata.types.NumberType;
import org.jetlinks.core.things.ThingMetadata;
import org.jetlinks.core.things.ThingsRegistry;
import org.jetlinks.core.utils.StringBuilderUtils;
import org.jetlinks.core.utils.TimestampUtils;
import org.jetlinks.community.things.data.ThingsDataConstants;
import org.jetlinks.community.timeseries.TimeSeriesData;
import org.jetlinks.community.utils.ObjectMappers;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
import java.util.Map;
import static org.jetlinks.community.things.data.ThingsDataConstants.*;
public abstract class ColumnModeSaveOperationsBase extends AbstractSaveOperations {
public ColumnModeSaveOperationsBase(ThingsRegistry registry, MetricBuilder metricBuilder, DataSettings settings) {
super(registry, metricBuilder, settings);
}
protected String createPropertyDataId(ThingMessage message) {
return DigestUtils.md5Hex(
StringBuilderUtils
.buildString(message, (m, builder) -> {
builder
.append(m.getThingId())
.append('-')
.append(m.getTimestamp());
})
);
}
@Override
protected Flux<Tuple2<String, TimeSeriesData>> convertProperties(String templateId,
ThingMessage message,
Map<String, Object> properties,
Map<String, Long> propertySourceTimes) {
if (MapUtils.isEmpty(properties)) {
return Flux.empty();
}
return this
.registry
.getThing(message.getThingType(), message.getThingId())
.flatMapMany(device -> device
.getMetadata()
.mapNotNull(metadata -> {
int size = properties.size();
String id = createPropertyDataId(message);
Map<String, Object> data = Maps.newLinkedHashMapWithExpectedSize(size);
//转换属性数据
for (Map.Entry<String, Object> entry : properties.entrySet()) {
PropertyMetadata propertyMetadata = metadata.getPropertyOrNull(entry.getKey());
//没有配置物模型或者忽略了存储
if (propertyMetadata == null || propertyIsIgnoreStorage(propertyMetadata)) {
continue;
}
Object value = convertPropertyValue(entry.getValue(), propertyMetadata);
if (null != value) {
data.put(entry.getKey(), value);
}
}
//没有属性值,可能全部都配置了不存储
if (data.isEmpty()) {
return null;
}
data.put(metricBuilder.getThingIdProperty(), message.getThingId());
data.put(COLUMN_TIMESTAMP, TimestampUtils.toMillis(message.getTimestamp()));
data.put(COLUMN_CREATE_TIME, System.currentTimeMillis());
data.put(COLUMN_ID, id);
return Tuples.of(metricBuilder.createPropertyMetric(message.getThingType(), templateId, message.getThingId()),
TimeSeriesData.of(message.getTimestamp(), handlePropertiesData(metadata, data)));
}));
}
protected Map<String, Object> handlePropertiesData(ThingMetadata metadata, Map<String, Object> properties) {
return properties;
}
protected Object convertPropertyValue(Object value, PropertyMetadata metadata) {
if (value == null || metadata == null) {
return value;
}
//使用json字符串来存储
if (propertyIsJsonStringStorage(metadata)) {
return value instanceof String ? String.valueOf(value) : ObjectMappers.toJsonString(value);
}
//数字类型直接返回
if (metadata.getValueType() instanceof NumberType && value instanceof Number) {
return convertNumberValue(((NumberType<?>) metadata.getValueType()), ((Number) value));
}
if (metadata.getValueType() instanceof Converter) {
return ((Converter<?>) metadata.getValueType()).convert(value);
}
return value;
}
protected boolean propertyIsJsonStringStorage(PropertyMetadata metadata) {
return ThingsDataConstants.propertyIsJsonStringStorage(metadata);
}
protected Object convertNumberValue(NumberType<?> type, Number value) {
return value;
}
@Override
protected abstract Mono<Void> doSave(String metric, TimeSeriesData data);
@Override
protected abstract Mono<Void> doSave(String metric, Flux<TimeSeriesData> data);
@Override
public Flux<Feature> getFeatures() {
return Flux.concat(
super.getFeatures(),
Flux.just(
MetadataFeature.propertyNotModifiable,
MetadataFeature.propertyNotInsertable
)
);
}
}

View File

@ -0,0 +1,12 @@
package org.jetlinks.community.things.data.operations;
import org.jetlinks.core.things.ThingMetadata;
import reactor.core.publisher.Mono;
public interface DDLOperations {
Mono<Void> registerMetadata(ThingMetadata metadata);
Mono<Void> reloadMetadata(ThingMetadata metadata);
}

View File

@ -0,0 +1,53 @@
package org.jetlinks.community.things.data.operations;
import lombok.Getter;
import lombok.Setter;
import org.jetlinks.community.utils.MessageTypeMatcher;
@Getter
@Setter
public class DataSettings {
private boolean strict = true;
private Log logFilter = new Log();
private Event event = new Event();
private Property property = new Property();
@Getter
@Setter
public static class Log extends MessageTypeMatcher {
}
@Getter
@Setter
public static class Property {
//是否只保存属性上报消息
private boolean onlySaveReport = false;
}
@Getter
@Setter
public static class Event {
public static final Event DEFAULT = new Event();
//使用JSON字符来存储事件数据
private boolean usingJsonString;
//相同模版的事件数据使用同一个表来存储
private boolean allInOne;
//忽略未定义物模型的事件
private boolean ignoreUndefined = true;
public boolean eventIsAllInOne() {
return usingJsonString && allInOne;
}
public boolean shouldIgnoreUndefined() {
return ignoreUndefined || !eventIsAllInOne();
}
}
}

View File

@ -0,0 +1,39 @@
package org.jetlinks.community.things.data.operations;
import org.jetlinks.community.things.data.ThingsDataConstants;
public interface MetricBuilder {
MetricBuilder DEFAULT = new MetricBuilder() {
};
default String getThingIdProperty() {
return ThingsDataConstants.COLUMN_THING_ID;
}
default String createLogMetric(String thingType,
String thingTemplateId,
String thingId) {
return thingType + "_log_" + thingTemplateId;
}
default String createPropertyMetric(String thingType,
String thingTemplateId,
String thingId) {
return thingType + "_properties_" + thingTemplateId;
}
default String createEventAllInOneMetric(String thingType,
String thingTemplateId,
String thingId) {
return thingType + "_event_" + thingTemplateId + "_events";
}
default String createEventMetric(String thingType,
String thingTemplateId,
String thingId,
String eventId) {
return thingType + "_event_" + thingTemplateId + "_" + eventId;
}
}

View File

@ -0,0 +1,120 @@
package org.jetlinks.community.things.data.operations;
import org.hswebframework.web.api.crud.entity.PagerResult;
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
import org.jetlinks.core.Wrapper;
import org.jetlinks.core.metadata.EventMetadata;
import org.jetlinks.community.things.data.*;
import org.jetlinks.community.timeseries.query.AggregationData;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
/**
* 查询操作,用于查询物相关数据
*
* @author zhouhao
* @since 2.0
*/
public interface QueryOperations extends Wrapper {
/**
* 按条件查询每一个属性,通常用于同时查询多个属性值.
*
* <p>
* 根据{@link QueryParamEntity#isPaging()}决定是否进行分页,如果要查询全部数据,需要设置{@link QueryParamEntity#setPaging(boolean)}为false.
*
* @param query 查询条件
* @param property 指定要查询的属性,不指定则查询全部属性.
* @return 属性数据
*/
@Nonnull
Flux<ThingPropertyDetail> queryEachProperty(@Nonnull QueryParamEntity query,
@Nonnull String... property);
/**
* 查询属性数据,并返回属性数据列表,一条数据表示一个属性值.
* <p>
* 根据{@link QueryParamEntity#isPaging()}决定是否进行分页,如果要查询全部数据,需要设置{@link QueryParamEntity#setPaging(boolean)}为false.
*
* @param query 查询条件
* @param property 指定要查询的属性,不指定则查询全部属性.
* @return 属性数据
*/
@Nonnull
Flux<ThingPropertyDetail> queryProperty(@Nonnull QueryParamEntity query,
@Nonnull String... property);
/**
* 分页查询属性数据,通常用于查询单个属性的历史列表.
* <p>
* 如果指定多个属性,不同的存储策略实现可能返回结果不同.
*
* @param query 查询条件
* @param property 指定要查询的属性,不指定则查询全部属性.
* @return 属性数据
*/
@Nonnull
Mono<PagerResult<ThingPropertyDetail>> queryPropertyPage(@Nonnull QueryParamEntity query,
@Nonnull String... property);
/**
* 聚合查询属性数据
*
* @param request 聚合请求
* @param properties 属性聚合方式
* @return 聚合查询结果
*/
@Nonnull
Flux<AggregationData> aggregationProperties(@Nonnull AggregationRequest request,
@Nonnull PropertyAggregation... properties);
/**
* 查询设备日志,不返回分页结果
*
* @param query 查询条件
* @return 查询结果
*/
Flux<ThingMessageLog> queryMessageLog(@Nonnull QueryParamEntity query);
/**
* 分页查询设备日志
*
* @param query 查询条件
* @return 查询结果
*/
Mono<PagerResult<ThingMessageLog>> queryMessageLogPage(@Nonnull QueryParamEntity query);
/**
* 分页查询事件数据
*
* @param eventId 事件ID
* @param query 查询条件
* @param format 是否对数据进行格式化. {@link ThingEvent#putFormat(EventMetadata)}
* @return 分页查询结果
*/
@Nonnull
Mono<PagerResult<ThingEvent>> queryEventPage(@Nonnull String eventId,
@Nonnull QueryParamEntity query,
boolean format);
/**
* 查询事件数据,不返回分页结果.
* <p>
* 根据{@link QueryParamEntity#isPaging()}决定是否进行分页,如果要查询全部数据,需要设置{@link QueryParamEntity#setPaging(boolean)}为false.
*
* @param eventId 事件ID
* @param query 查询条件
* @param format 是否对数据进行格式化. {@link ThingEvent#putFormat(EventMetadata)}
* @return 事件数据
*/
@Nonnull
Flux<ThingEvent> queryEvent(@Nonnull String eventId,
@Nonnull QueryParamEntity query,
boolean format);
}

View File

@ -0,0 +1,38 @@
package org.jetlinks.community.things.data.operations;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.core.metadata.SimplePropertyMetadata;
import org.jetlinks.core.metadata.types.*;
import org.jetlinks.community.things.data.ThingsDataConstants;
import java.util.ArrayList;
import java.util.List;
public abstract class RowModeDDLOperationsBase extends AbstractDDLOperations{
public RowModeDDLOperationsBase(String thingType,
String templateId,
String thingId,
DataSettings settings,
MetricBuilder metricBuilder) {
super(thingType, templateId, thingId, settings, metricBuilder);
}
@Override
protected List<PropertyMetadata> createPropertyProperties(List<PropertyMetadata> propertyMetadata) {
List<PropertyMetadata> props = new ArrayList<>(createBasicColumns());
props.add(SimplePropertyMetadata.of(ThingsDataConstants.COLUMN_PROPERTY_ID, "属性ID", StringType.GLOBAL));
props.add(SimplePropertyMetadata.of(ThingsDataConstants.COLUMN_PROPERTY_NUMBER_VALUE, "数字值", DoubleType.GLOBAL));
props.add(SimplePropertyMetadata.of(ThingsDataConstants.COLUMN_PROPERTY_GEO_VALUE, "地理位置值", GeoType.GLOBAL));
props.add(SimplePropertyMetadata.of(ThingsDataConstants.COLUMN_PROPERTY_ARRAY_VALUE, "数组值", new ArrayType()));
props.add(SimplePropertyMetadata.of(ThingsDataConstants.COLUMN_PROPERTY_OBJECT_VALUE, "对象值", new ObjectType()));
props.add(SimplePropertyMetadata.of(ThingsDataConstants.COLUMN_PROPERTY_TIME_VALUE, "时间值", DateTimeType.GLOBAL));
props.add(SimplePropertyMetadata.of(ThingsDataConstants.COLUMN_PROPERTY_VALUE, "原始值", StringType.GLOBAL));
return props;
}
}

View File

@ -0,0 +1,93 @@
package org.jetlinks.community.things.data.operations;
import org.hswebframework.ezorm.core.dsl.Query;
import org.hswebframework.web.api.crud.entity.PagerResult;
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.core.things.ThingMetadata;
import org.jetlinks.core.things.ThingsRegistry;
import org.jetlinks.community.things.data.AggregationRequest;
import org.jetlinks.community.things.data.ThingPropertyDetail;
import org.jetlinks.community.things.data.ThingsDataConstants;
import org.jetlinks.community.timeseries.TimeSeriesData;
import org.jetlinks.community.timeseries.query.AggregationData;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.util.Map;
import java.util.function.Function;
public abstract class RowModeQueryOperationsBase extends AbstractQueryOperations {
public RowModeQueryOperationsBase(String thingType,
String thingTemplateId,
String thingId,
MetricBuilder metricBuilder,
DataSettings settings,
ThingsRegistry registry) {
super(thingType, thingTemplateId, thingId, metricBuilder, settings, registry);
}
@Override
protected final Flux<ThingPropertyDetail> queryProperty(@Nonnull QueryParamEntity param,
@Nonnull ThingMetadata metadata,
@Nonnull Map<String, PropertyMetadata> properties) {
String metric = metricBuilder.createPropertyMetric(thingType, thingTemplateId, thingId);
Query<?, QueryParamEntity> query = param.toNestQuery(q -> {
applyQuery(q);
// property in ('a','b','c')
q.in(ThingsDataConstants.COLUMN_PROPERTY_ID, properties.keySet());
});
return this
.doQuery(metric, query)
.mapNotNull(data -> ThingPropertyDetail
.of(data, properties.get(data.getString(ThingsDataConstants.COLUMN_PROPERTY_ID, null))))
;
}
protected Flux<ThingPropertyDetail> queryEachProperty(@Nonnull String metric,
@Nonnull Query<?, QueryParamEntity> query,
@Nonnull ThingMetadata metadata,
@Nonnull Map<String, PropertyMetadata> properties) {
return Flux
.fromIterable(properties.entrySet())
.flatMap(e -> this
.doQuery(metric, query
.getParam()
.clone()
.toQuery()
.and(ThingsDataConstants.COLUMN_PROPERTY_ID, e.getKey()))
.mapNotNull(data -> ThingPropertyDetail.of(data, properties.get(data.getString(ThingsDataConstants.COLUMN_PROPERTY_ID, null)))),
16);
}
@Override
protected final Mono<PagerResult<ThingPropertyDetail>> queryPropertyPage(@Nonnull QueryParamEntity param,
@Nonnull ThingMetadata metadata,
@Nonnull Map<String, PropertyMetadata> properties) {
String metric = metricBuilder.createPropertyMetric(thingType, thingTemplateId, thingId);
Query<?, QueryParamEntity> query = param.toNestQuery(q -> {
applyQuery(q);
// property in ('a','b','c')
q.in(ThingsDataConstants.COLUMN_PROPERTY_ID, properties.keySet());
});
return this
.doQueryPage(metric,
query,
data -> ThingPropertyDetail
.of(data, properties.get(data.getString(ThingsDataConstants.COLUMN_PROPERTY_ID, null))));
}
@Override
protected abstract Flux<TimeSeriesData> doQuery(String metric, Query<?, QueryParamEntity> query);
@Override
protected abstract <T> Mono<PagerResult<T>> doQueryPage(String metric,
Query<?, QueryParamEntity> query,
Function<TimeSeriesData, T> mapper);
@Override
protected abstract Flux<AggregationData> doAggregation(String metric, AggregationRequest request, AggregationContext context);
}

View File

@ -0,0 +1,193 @@
package org.jetlinks.community.things.data.operations;
import com.google.common.collect.Maps;
import org.apache.commons.collections.MapUtils;
import org.hswebframework.web.exception.BusinessException;
import org.hswebframework.web.id.IDGenerator;
import org.hswebframework.web.utils.DigestUtils;
import org.jetlinks.core.message.Headers;
import org.jetlinks.core.message.ThingMessage;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.core.metadata.types.*;
import org.jetlinks.core.things.ThingMetadata;
import org.jetlinks.core.things.ThingsRegistry;
import org.jetlinks.core.utils.StringBuilderUtils;
import org.jetlinks.core.utils.TimestampUtils;
import org.jetlinks.community.things.data.ThingsDataConstants;
import org.jetlinks.community.timeseries.TimeSeriesData;
import org.jetlinks.community.utils.ObjectMappers;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static org.jetlinks.community.things.data.ThingsDataConstants.*;
public abstract class RowModeSaveOperationsBase extends AbstractSaveOperations {
public RowModeSaveOperationsBase(ThingsRegistry registry, MetricBuilder metricBuilder, DataSettings settings) {
super(registry, metricBuilder, settings);
}
@Override
protected final Flux<Tuple2<String, TimeSeriesData>> convertProperties(
String templateId,
ThingMessage message,
Map<String, Object> properties,
Map<String, Long> propertySourceTimes) {
if (MapUtils.isEmpty(properties)) {
return Flux.empty();
}
return this
.registry
.getThing(message.getThingType(), message.getThingId())
.flatMapMany(device -> device
.getMetadata()
.flatMapIterable(metadata -> createPropertyTsData(templateId, metadata, message, properties, propertySourceTimes)));
}
private List<Tuple2<String, TimeSeriesData>> createPropertyTsData(String templateId,
ThingMetadata metadata,
ThingMessage message,
Map<String, Object> properties,
Map<String, Long> propertySourceTimes) {
List<Tuple2<String, TimeSeriesData>> data = new ArrayList<>(properties.size());
String metric = metricBuilder.createPropertyMetric(message.getThingType(), templateId, message.getThingId());
for (Map.Entry<String, Object> entry : properties.entrySet()) {
String property = entry.getKey();
Object value = entry.getValue();
//忽略存在没有的属性和忽略存储的属性
PropertyMetadata propertyMetadata = metadata.getPropertyOrNull(property);
if (value == null || propertyMetadata == null || propertyIsIgnoreStorage(propertyMetadata)) {
continue;
}
long timestamp = propertySourceTimes.getOrDefault(property, message.getTimestamp());
String dataId = createPropertyDataId(property, message, timestamp);
data.add(
Tuples.of(
metric,
TimeSeriesData.of(timestamp, this
.createRowPropertyData(dataId,
TimestampUtils.toMillis(timestamp),
message,
propertyMetadata,
value))
)
);
}
return data;
}
protected boolean propertyIsIgnoreStorage(PropertyMetadata metadata) {
return ThingsDataConstants.propertyIsIgnoreStorage(metadata);
}
protected boolean useTimestampId(ThingMessage message) {
return message.getHeaderOrDefault(Headers.useTimestampAsId);
}
protected String createPropertyDataId(String property, ThingMessage message, long timestamp) {
if (!useTimestampId(message)) {
return IDGenerator.SNOW_FLAKE_STRING.generate();
}
return DigestUtils.md5Hex(
StringBuilderUtils
.buildString(property, message, timestamp, (p, m, ts, builder) -> {
builder
.append(m.getThingId())
.append('-')
.append(p)
.append('-')
.append(ts);
})
);
}
protected Map<String, Object> createRowPropertyData(String id,
long timestamp,
ThingMessage message,
PropertyMetadata property,
Object value) {
Map<String, Object> propertyData = Maps.newLinkedHashMapWithExpectedSize(16);
propertyData.put(COLUMN_ID, id);
propertyData.put(metricBuilder.getThingIdProperty(), message.getThingId());
propertyData.put(COLUMN_TIMESTAMP, timestamp);
propertyData.put(COLUMN_PROPERTY_ID, property.getId());
propertyData.put(COLUMN_CREATE_TIME, System.currentTimeMillis());
fillRowPropertyValue(propertyData, property, value);
return propertyData;
}
protected Number convertNumberValue(Number number) {
//转换数字值
return number;
}
protected void fillRowPropertyValue(Map<String, Object> target, PropertyMetadata property, Object value) {
if (value == null) {
return;
}
DataType type = property.getValueType();
target.put(COLUMN_PROPERTY_TYPE, type.getId());
String convertedValue;
if (type instanceof NumberType) {
NumberType<?> numberType = (NumberType<?>) type;
Number number = value instanceof Number
? ((Number) value)
: numberType.convertNumber(value);
if (number == null) {
throw new BusinessException("error.cannot_convert", 500, value, type.getId());
}
convertedValue = String.valueOf(number);
target.put(COLUMN_PROPERTY_NUMBER_VALUE, convertNumberValue(number));
} else if (type instanceof DateTimeType) {
DateTimeType dateTimeType = (DateTimeType) type;
convertedValue = String.valueOf(value);
target.put(COLUMN_PROPERTY_TIME_VALUE, dateTimeType.convert(convertedValue));
} else if (propertyIsJsonStringStorage(property)) {
//使用json字符来存储
convertedValue = value instanceof String
? String.valueOf(value)
: ObjectMappers.toJsonString(value);
} else if (type instanceof ObjectType) {
ObjectType objectType = (ObjectType) type;
Object val = objectType.convert(value);
convertedValue = ObjectMappers.toJsonString(val);
target.put(COLUMN_PROPERTY_OBJECT_VALUE, val);
} else if (type instanceof ArrayType) {
ArrayType objectType = (ArrayType) type;
Object val = objectType.convert(value);
convertedValue = ObjectMappers.toJsonString(val);
target.put(COLUMN_PROPERTY_ARRAY_VALUE, val);
} else if (type instanceof GeoType) {
GeoType geoType = (GeoType) type;
GeoPoint val = geoType.convert(value);
convertedValue = String.valueOf(val);
target.put(COLUMN_PROPERTY_GEO_VALUE, val);
} else {
convertedValue = String.valueOf(value);
}
target.put(COLUMN_PROPERTY_VALUE, convertedValue);
}
boolean propertyIsJsonStringStorage(PropertyMetadata metadata) {
return ThingsDataConstants.propertyIsJsonStringStorage(metadata);
}
@Override
protected abstract Mono<Void> doSave(String metric, TimeSeriesData data);
@Override
protected abstract Mono<Void> doSave(String metric, Flux<TimeSeriesData> data);
}

View File

@ -0,0 +1,22 @@
package org.jetlinks.community.things.data.operations;
import org.jetlinks.core.message.ThingMessage;
import org.jetlinks.core.metadata.Feature;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Collection;
public interface SaveOperations {
Mono<Void> save(ThingMessage thingMessage);
Mono<Void> save(Collection<? extends ThingMessage> thingMessage);
Mono<Void> save(Publisher<? extends ThingMessage> thingMessage);
default Flux<Feature> getFeatures(){
return Flux.empty();
}
}

View File

@ -0,0 +1,9 @@
package org.jetlinks.community.things.data.operations;
public interface TemplateOperations {
QueryOperations forQuery();
DDLOperations forDDL();
}

View File

@ -0,0 +1,8 @@
package org.jetlinks.community.things.data.operations;
public interface ThingOperations {
QueryOperations forQuery();
DDLOperations forDDL();
}