From 563f9a328e53d2a102d422f0d0aaf386b631d0ab Mon Sep 17 00:00:00 2001 From: Zhang Ji <125540670@qq.com> Date: Fri, 30 Jun 2023 11:43:38 +0800 Subject: [PATCH] =?UTF-8?q?feat(=E8=AE=BE=E5=A4=87):=20=E5=AF=BC=E5=85=A5?= =?UTF-8?q?=E8=AE=BE=E5=A4=87=E6=95=B0=E6=8D=AE=EF=BC=8C=E5=B9=B6=E6=8F=90?= =?UTF-8?q?=E4=BE=9B=E6=97=A5=E5=BF=97=E4=B8=8B=E8=BD=BD=20(#326)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(基础模块): 增加通用导入工具 * feat(设备): 导入设备数据,并提供日志下载 --- .../community/PropertyMetadataConstants.java | 4 +- jetlinks-components/io-component/pom.xml | 6 + .../community/io/excel/AbstractImporter.java | 114 +++++++ .../community/io/excel/ExcelUtils.java | 306 ++++++++++++++++++ .../community/io/excel/ImportHelper.java | 254 +++++++++++++++ .../io/excel/annotation/ExcelHeader.java | 67 ++++ .../io/excel/converter/ArrayConverter.java | 55 ++++ .../excel/converter/ConverterExcelOption.java | 15 + .../io/excel/converter/DateConverter.java | 74 +++++ .../io/excel/converter/EnumConverter.java | 45 +++ .../io/excel/converter/StringConverter.java | 18 ++ .../community/io/file/ClusterFileManager.java | 98 +++++- .../io/file/FileManagerConfiguration.java | 6 +- .../community/io/file/FileOption.java | 27 +- .../community/io/file/FileProperties.java | 8 + .../community/io/utils/FileUtils.java | 100 +++++- .../response/ImportDeviceInstanceResult.java | 10 +- .../device/web/DeviceInstanceController.java | 106 +++++- .../device/web/excel/DeviceExcelImporter.java | 80 +++++ .../device/web/excel/DeviceExcelInfo.java | 64 +++- 20 files changed, 1417 insertions(+), 40 deletions(-) create mode 100644 jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/AbstractImporter.java create mode 100644 jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/ExcelUtils.java create mode 100644 jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/ImportHelper.java create mode 100644 jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/annotation/ExcelHeader.java create mode 100644 jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/converter/ArrayConverter.java create mode 100644 jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/converter/ConverterExcelOption.java create mode 100644 jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/converter/DateConverter.java create mode 100644 jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/converter/EnumConverter.java create mode 100644 jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/converter/StringConverter.java create mode 100644 jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/excel/DeviceExcelImporter.java diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/PropertyMetadataConstants.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/PropertyMetadataConstants.java index 508a9bc0..888ca2b4 100644 --- a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/PropertyMetadataConstants.java +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/PropertyMetadataConstants.java @@ -29,8 +29,8 @@ public interface PropertyMetadataConstants { static boolean isManual(DeviceMessage message) { return message - .getHeader(PropertyMetadataConstants.Source.headerKey) - .map(PropertyMetadataConstants.Source.manual::equals) + .getHeader(Source.headerKey) + .map(Source.manual::equals) .orElse(false); } diff --git a/jetlinks-components/io-component/pom.xml b/jetlinks-components/io-component/pom.xml index 1a7cf4f2..e7a95cfd 100644 --- a/jetlinks-components/io-component/pom.xml +++ b/jetlinks-components/io-component/pom.xml @@ -65,5 +65,11 @@ ${jetlinks.version} + + org.jetlinks.community + common-component + ${project.version} + + \ No newline at end of file diff --git a/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/AbstractImporter.java b/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/AbstractImporter.java new file mode 100644 index 00000000..ff768848 --- /dev/null +++ b/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/AbstractImporter.java @@ -0,0 +1,114 @@ +package org.jetlinks.community.io.excel; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; +import org.jetlinks.community.io.file.FileInfo; +import org.jetlinks.community.io.file.FileManager; +import org.jetlinks.community.io.file.FileOption; +import org.jetlinks.community.io.utils.FileUtils; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.io.InputStream; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; + +/** + * 抽象数据导入服务,导入数据并返回实时数据结果,导入结束后返回导入结果文件. + * + * @param 数据类型 + * @author zhouhao + * @see FileManager + * @since 2.1 + */ +@AllArgsConstructor +public abstract class AbstractImporter { + + private final FileManager fileManager; + + private final WebClient client; + + protected abstract Mono handleData(Flux data); + + protected abstract T newInstance(); + + protected void customImport(ImportHelper helper) { + + } + + public Flux> doImport(String fileUrl) { + String format = FileUtils.getExtension(fileUrl); + + ImportHelper importHelper = new ImportHelper<>(this::newInstance, this::handleData); + + customImport(importHelper); + + return this + .getInputStream(fileUrl) + .flatMapMany(stream -> importHelper + .doImport(stream, format, ImportResult::of, + buf -> fileManager + .saveFile(getResultFileName(fileUrl, format), buf, FileOption.tempFile) + .map(ImportResult::of))); + } + + protected String getResultFileName(String sourceFileUrl, String format) { + + return "导入结果_" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy_MM_dd_HH_mm_ss")) + "." + format; + } + + public enum ImportResultType { + //数据 + data, + //详情文件 + detailFile + } + + @Getter + @Setter + public static class ImportResult { + @Schema(description = "导入结果类型") + private ImportResultType type; + + @Schema(description = "行号,从数据的第一行为0开始") + private long row; + + @Schema(description = "数据") + private T data; + + @Schema(description = "是否成功") + private boolean success; + + @Schema(description = "错误消息") + private String message; + + @Schema(description = "导入结果详情文件地址") + private String detailFile; + + public static ImportResult of(ImportHelper.Importing importing) { + ImportResult result = new ImportResult<>(); + result.type = ImportResultType.data; + result.row = importing.getRow(); + result.success = importing.isSuccess(); + result.message = importing.getErrorMessage(); + result.data = importing.getTarget(); + return result; + } + + public static ImportResult of(FileInfo fileInfo) { + ImportResult result = new ImportResult<>(); + result.type = ImportResultType.detailFile; + result.detailFile = fileInfo.getAccessUrl(); + return result; + } + + } + + @SuppressWarnings("all") + protected Mono getInputStream(String fileUrl) { + return FileUtils.readInputStream(client, fileUrl); + } +} diff --git a/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/ExcelUtils.java b/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/ExcelUtils.java new file mode 100644 index 00000000..15595469 --- /dev/null +++ b/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/ExcelUtils.java @@ -0,0 +1,306 @@ +package org.jetlinks.community.io.excel; + +import com.alibaba.excel.annotation.ExcelIgnore; +import com.alibaba.fastjson.JSONObject; +import com.google.common.collect.Maps; +import io.netty.buffer.ByteBufAllocator; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.SneakyThrows; +import org.hswebframework.ezorm.rdb.utils.PropertiesUtils; +import org.hswebframework.reactor.excel.CellDataType; +import org.hswebframework.reactor.excel.ExcelHeader; +import org.hswebframework.reactor.excel.ExcelOption; +import org.hswebframework.reactor.excel.ReactorExcel; +import org.hswebframework.web.bean.FastBeanCopier; +import org.jetlinks.community.io.excel.converter.ArrayConverter; +import org.jetlinks.community.io.excel.converter.ConverterExcelOption; +import org.jetlinks.community.io.excel.converter.DateConverter; +import org.jetlinks.community.io.excel.converter.EnumConverter; +import org.jetlinks.community.io.excel.converter.StringConverter; +import org.jetlinks.core.metadata.Jsonable; +import org.springframework.core.ResolvableType; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.NettyDataBufferFactory; +import org.springframework.util.StringUtils; +import reactor.core.publisher.Flux; +import reactor.function.Function3; + +import java.beans.PropertyDescriptor; +import java.io.InputStream; +import java.lang.reflect.Field; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +/** + * @since 2.1 + */ +public class ExcelUtils { + + public static Flux write(Class header, + Flux dataStream, + String format, + ExcelOption... opts) { + return write(getHeadersForWrite(header), dataStream, format, opts); + } + + public static Flux write(List headers, + Flux dataStream, + String format, + ExcelOption... opts) { + return ReactorExcel + .writeFor(format) + .justWrite() + .sheet(sheet -> { + Map headerMapping = Maps.newHashMapWithExpectedSize(headers.size()); + //header 映射 + sheet.headers(headers + .stream() + .filter(head -> headerMapping.put(head.getKey(), head) == null) + .collect(Collectors.toList())); + //数据 + sheet.rows( + dataStream.map(data -> { + Map map = data instanceof Jsonable ? ((Jsonable) data).toJson() : FastBeanCopier.copy(data, new HashMap<>()); + return transformValue(map, headerMapping, ConverterExcelOption::convertForWrite); + }) + ); + }) + .option(opts) + .writeBytes(64 * 1024) + .map(new NettyDataBufferFactory(ByteBufAllocator.DEFAULT)::wrap); + } + + public static Flux read(Supplier supplier, + InputStream inputStream, + String format, + ExcelOption... options) { + return read(supplier, getHeadersForRead(supplier.get().getClass()), inputStream, format, options); + } + + public static Flux read(Supplier supplier, + List headers, + InputStream inputStream, + String format, + ExcelOption... options) { + Map keyAndHeader = Maps.newHashMapWithExpectedSize(headers.size()); + Map textAndHeader = headers + .stream() + .peek(header -> keyAndHeader.put(header.getKey(), header)) + .collect(Collectors.toMap(ExcelHeader::getText, ExcelHeader::getKey, (a, b) -> a)); + + return ReactorExcel + .>readFor(format, HashMap::new) + .justReadByHeader() + .headers(textAndHeader) + .wrapper(ReactorExcel.mapWrapper()) + .readAndClose(inputStream, options) + .map(map -> { + + map = transformValue(map, keyAndHeader, ConverterExcelOption::convertForRead); + + T data = supplier.get(); + + if (data instanceof Jsonable) { + ((Jsonable) data).fromJson(new JSONObject(map)); + } else { + FastBeanCopier.copy(map, data); + } + return data; + }); + } + + static Map transformValue(Map source, + Map headers, + Function3 converter) { + return Maps.transformEntries(source, (key, val) -> { + ExcelHeader header = headers.get(key); + if (header != null) { + List options = header + .options() + .getOptions(ConverterExcelOption.class); + for (ConverterExcelOption option : options) { + val = converter.apply(option, val, header); + } + } + return val; + }); + } + + public static List getHeadersForWrite(Class clazz) { + return headersCache + .computeIfAbsent(clazz, ExcelUtils::parseHeader0) + .stream() + .filter(ExtExcelHeader::forWrite) + .collect(Collectors.toList()); + } + + public static List getHeadersForRead(Class clazz) { + return headersCache + .computeIfAbsent(clazz, ExcelUtils::parseHeader0) + .stream() + .filter(ExtExcelHeader::forRead) + .collect(Collectors.toList()); + } + + private static CellDataType convertCellType(Class clazz) { + + if (Number.class.isAssignableFrom(clazz)) { + return CellDataType.NUMBER; + } + if (clazz == Date.class || clazz == LocalDate.class || clazz == LocalDateTime.class) { + return CellDataType.DATE_TIME; + } + return CellDataType.STRING; + } + + + private static ConverterExcelOption createConverter(Class type, org.jetlinks.community.io.excel.annotation.ExcelHeader header) { + if (type.isEnum()) { + return new EnumConverter((Class) type); + } + if (header.dataType() == CellDataType.DATE_TIME + || Date.class.isAssignableFrom(type) + || LocalDate.class.isAssignableFrom(type) + || LocalDateTime.class.isAssignableFrom(type)) { + String format = header.format(); + if (!StringUtils.hasText(format)) { + format = "yyyy/MM/dd HH:mm:ss"; + } + return new DateConverter(format, type); + } + + if (type == String.class) { + return StringConverter.INSTANCE; + } + // TODO: 2023/6/19 更多类型转换 + + return null; + } + + @SuppressWarnings("all") + private static ConverterExcelOption createConverter(Field field, org.jetlinks.community.io.excel.annotation.ExcelHeader header) { + + if (field.getType().isArray()) { + Class elementType = field.getType().getComponentType(); + return new ArrayConverter(true, elementType, createConverter(elementType, header)); + } + + if (List.class.isAssignableFrom(field.getType())) { + Class elementType = ResolvableType + .forField(field) + .getGeneric(0) + .toClass(); + + return new ArrayConverter(false, elementType, createConverter(elementType, header)); + } + + return createConverter(field.getType(), header); + } + + @SneakyThrows + private static List parseHeader0(Class clazz) { + List headers = new ArrayList<>(); + Map sortIndex = new LinkedHashMap<>(); + + int index = 0; + for (PropertyDescriptor descriptor : PropertiesUtils.getDescriptors(clazz)) { + Field field = PropertiesUtils.getPropertyField(clazz, descriptor.getName()).orElse(null); + if (field == null) { + continue; + } + if (field.getAnnotation(ExcelIgnore.class) != null) { + continue; + } + index++; + + org.jetlinks.community.io.excel.annotation.ExcelHeader header = field + .getAnnotation(org.jetlinks.community.io.excel.annotation.ExcelHeader.class); + + if (header == null) { + header = descriptor + .getReadMethod() + .getAnnotation(org.jetlinks.community.io.excel.annotation.ExcelHeader.class); + } + if (header == null || header.ignore()) { + continue; + } + CellDataType cellType = header.dataType() == CellDataType.AUTO ? convertCellType(field.getType()) : header.dataType(); + + List option = new ArrayList<>(); + if (header.converter() != ConverterExcelOption.class) { + option.add(header.converter().getConstructor().newInstance()); + } else { + ConverterExcelOption excelOption = createConverter(field, header); + if (null != excelOption) { + option.add(excelOption); + } + } + + for (Class aClass : header.options()) { + option.add(aClass.getConstructor().newInstance()); + } + + String[] headerTexts = header.value(); + if (headerTexts.length == 0) { + Schema schema = field.getAnnotation(Schema.class); + if (schema != null) { + headerTexts = new String[]{schema.description()}; + } + } + + if (headerTexts.length == 0) { + headerTexts = new String[]{field.getName()}; + } + + for (String headerText : headerTexts) { + ExtExcelHeader excelHeader = new ExtExcelHeader(field.getName(), headerText, cellType, header); + + excelHeader.options().merge(option); + + if (header.order() != Integer.MAX_VALUE) { + sortIndex.put(excelHeader, header.order()); + } else { + sortIndex.put(excelHeader, index++); + } + headers.add(excelHeader); + } + + } + headers.sort(Comparator.comparingInt(h -> sortIndex.getOrDefault(h, Integer.MAX_VALUE))); + + return Collections.unmodifiableList(headers); + } + + private static final Map, List> headersCache = new ConcurrentHashMap<>(); + + static class ExtExcelHeader extends ExcelHeader { + private final org.jetlinks.community.io.excel.annotation.ExcelHeader annotation; + + public ExtExcelHeader(String key, + String text, + CellDataType type, + org.jetlinks.community.io.excel.annotation.ExcelHeader annotation) { + super(key, text, type); + this.annotation = annotation; + } + + public boolean forRead() { + return !annotation.ignoreRead(); + } + + public boolean forWrite() { + return !annotation.ignoreWrite(); + } + } +} diff --git a/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/ImportHelper.java b/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/ImportHelper.java new file mode 100644 index 00000000..09a37406 --- /dev/null +++ b/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/ImportHelper.java @@ -0,0 +1,254 @@ +package org.jetlinks.community.io.excel; + +import com.alibaba.fastjson.JSONObject; +import com.google.common.collect.Collections2; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.apache.commons.collections4.CollectionUtils; +import org.hswebframework.reactor.excel.CellDataType; +import org.hswebframework.reactor.excel.ExcelHeader; +import org.hswebframework.web.api.crud.entity.Entity; +import org.hswebframework.web.bean.FastBeanCopier; +import org.hswebframework.web.i18n.LocaleUtils; +import org.hswebframework.web.validator.CreateGroup; +import org.hswebframework.web.validator.ValidatorUtils; +import org.jetlinks.core.metadata.Jsonable; +import org.springframework.core.io.buffer.DataBuffer; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.io.InputStream; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +/** + * @since 2.1 + */ +public class ImportHelper { + + /** + * 实体构造器 + */ + private final Supplier instanceSupplier; + + /** + * 数据处理器,应当支持事务和幂等. + */ + private final Function, Mono> handler; + + + /** + * 批量处理缓冲区大小 + */ + private int bufferSize = 200; + + /** + * 当批量处理失败时,是否回退为单条数据处理. + */ + private boolean fallbackSingle; + + /** + * 自定义表头信息 + */ + private final List customHeaders = new ArrayList<>(); + + private Consumer afterRead = t -> { + if (t instanceof Entity) { + ((Entity) t).tryValidate(CreateGroup.class); + } else { + ValidatorUtils.tryValidate(t, CreateGroup.class); + } + }; + + public ImportHelper(Supplier supplier, Function, Mono> handler) { + this.instanceSupplier = supplier; + this.handler = handler; + } + + public ImportHelper addHeader(String key, String text) { + return addHeader(new ExcelHeader(key, text, CellDataType.STRING)); + } + + public ImportHelper addHeader(ExcelHeader header) { + customHeaders.add(header); + return this; + } + + public ImportHelper addHeaders(Collection header) { + customHeaders.addAll(header); + return this; + } + + public ImportHelper fallbackSingle(boolean fallbackSingle) { + this.fallbackSingle = fallbackSingle; + return this; + } + + public ImportHelper bufferSize(int bufferSize) { + this.bufferSize = bufferSize; + return this; + } + + public ImportHelper afterReadValidate(Class... group) { + return afterRead(t -> { + if (t instanceof Entity) { + ((Entity) t).tryValidate(group); + } else { + ValidatorUtils.tryValidate(t, group); + } + }); + } + + public ImportHelper afterRead(Consumer afterRead) { + this.afterRead = afterRead; + return this; + } + + private List createHeaders() { + List headers = new ArrayList<>( + ExcelUtils.getHeadersForRead(getInstanceType()) + ); + headers.addAll(customHeaders); + return headers; + } + + public Flux doImport(InputStream inputStream, + String format, + Function, R> resultMapper, + Function, Mono> infoWriter) { + Flux> cache = doImport(inputStream, format) + .replay() + .refCount(1, Duration.ofMillis(100)) + .as(LocaleUtils::transform); + + List headers = createHeaders(); + headers.add( + new ExcelHeader( + "$_result", LocaleUtils.resolveMessage("import.header.result", "导入结果"), CellDataType.STRING + ) + ); + return Flux.merge( + cache.mapNotNull(resultMapper), + ExcelUtils + .write(headers, cache + .map(importing -> { + Map map = new LinkedHashMap<>(importing.getSource()); + if (importing.isSuccess()) { + map.put("$_result", LocaleUtils.resolveMessage( + "import.result.success", "成功")); + } else { + String errorMessage = importing.getErrorMessage(); + map.put("$_result", LocaleUtils.resolveMessage( + "import.result.error", "失败:" + errorMessage, errorMessage)); + } + return map; + }), format) + .as(infoWriter) + ); + } + + @SuppressWarnings("all") + protected Class getInstanceType() { + return (Class) instanceSupplier.get().getClass(); + } + + public Flux> doImport(InputStream inputStream, String format) { + + return ExcelUtils + .>read(LinkedHashMap::new, + createHeaders(), + inputStream, + format) + .index(this::createImporting) + .buffer(bufferSize) + .concatMap(buffer -> this + .doImport(Collections2.filter(buffer, Importing::isSuccess)) + .thenMany(Flux.fromIterable(buffer))); + } + + private Mono doImport(Collection> buffer) { + if (CollectionUtils.isEmpty(buffer)) { + return Mono.empty(); + } + + Mono batchHandler = Flux + .fromIterable(buffer) + .map(Importing::getTarget) + .as(handler); + + //错误发生时回退到单个处理 + if (fallbackSingle && buffer.size() > 1) { + return batchHandler + .onErrorResume(err -> Flux + .fromIterable(buffer) + .flatMap(importing -> handler + .apply(Flux.just(importing.target)) + .onErrorResume(e -> { + importing.error(e); + return Mono.empty(); + })) + .then()); + } + return batchHandler + .onErrorResume(err -> { + for (Importing importing : buffer) { + importing.batchError = true; + importing.error(err); + } + return Mono.empty(); + }); + } + + private Importing createImporting(long index, Map data) { + T instance = instanceSupplier.get(); + Importing importing = new Importing<>(index, data, instance); + try { + if (instance instanceof Entity) { + ((Entity) instance).copyFrom(data); + } else if (instance instanceof Jsonable) { + ((Jsonable) instance).fromJson(new JSONObject(data)); + } else { + FastBeanCopier.copy(data, instance); + } + if (afterRead != null) { + afterRead.accept(instance); + } + } catch (Throwable e) { + importing.error(e); + } + return importing; + } + + + @RequiredArgsConstructor + @Getter + public static class Importing { + private final long row; + private final Map source; + private final T target; + private boolean batchError; + private boolean success = true; + + @Getter(AccessLevel.PRIVATE) + private transient Throwable error; + + public String getErrorMessage() { + //todo 更多异常信息判断 + return error == null ? null : error.getLocalizedMessage(); + } + + void error(Throwable error) { + this.success = false; + this.error = error; + } + } + +} diff --git a/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/annotation/ExcelHeader.java b/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/annotation/ExcelHeader.java new file mode 100644 index 00000000..18da6ab7 --- /dev/null +++ b/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/annotation/ExcelHeader.java @@ -0,0 +1,67 @@ +package org.jetlinks.community.io.excel.annotation; + +import io.swagger.v3.oas.annotations.media.Schema; +import org.hswebframework.reactor.excel.CellDataType; +import org.hswebframework.reactor.excel.ExcelOption; +import org.jetlinks.community.io.excel.converter.ConverterExcelOption; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import java.lang.reflect.Field; + +@Target({ElementType.FIELD,ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +public @interface ExcelHeader { + + /** + * @return excel表头 + * @see Schema#description() + * @see Field#getName() + */ + String[] value() default {}; + + /** + * @return 忽略导入导出 + */ + boolean ignore() default false; + + /** + * @return 仅忽略导出 + */ + boolean ignoreWrite() default false; + + /** + * @return 仅忽略导入 + */ + boolean ignoreRead() default false; + + /** + * @return 导出时的顺序 + */ + int order() default Integer.MAX_VALUE; + + /** + * @return 单元格数据类型 + */ + CellDataType dataType() default CellDataType.AUTO; + + /** + * @return 单元格格式 + */ + String format() default ""; + + /** + * @return 自定义数据转换器 + */ + Class converter() default ConverterExcelOption.class; + + /** + * @return 自定义其他选型配置 + */ + Class[] options() default {}; + +} diff --git a/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/converter/ArrayConverter.java b/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/converter/ArrayConverter.java new file mode 100644 index 00000000..534f0a2a --- /dev/null +++ b/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/converter/ArrayConverter.java @@ -0,0 +1,55 @@ +package org.jetlinks.community.io.excel.converter; + +import lombok.AllArgsConstructor; +import org.hswebframework.reactor.excel.ExcelHeader; +import org.hswebframework.web.bean.FastBeanCopier; +import org.jetlinks.community.utils.ConverterUtils; + +import java.util.List; + +/** + * @since 2.1 + */ +@AllArgsConstructor +public class ArrayConverter implements ConverterExcelOption{ + + private boolean array; + + private Class elementType; + + private ConverterExcelOption converter; + + + @Override + public Object convertForWrite(Object val, ExcelHeader header) { + return String.join(",", + ConverterUtils.convertToList(val, v -> { + if (converter == null) { + return String.valueOf(v); + } + return String.valueOf(converter.convertForWrite(v, header)); + })); + } + + @Override + public Object convertForRead(Object cell, ExcelHeader header) { + + List list = ConverterUtils + .convertToList(cell, val -> { + if (converter != null) { + val = converter.convertForRead(val, header); + } + if (elementType.isInstance(val)) { + return val; + } + return FastBeanCopier.DEFAULT_CONVERT + .convert(val, elementType, FastBeanCopier.EMPTY_CLASS_ARRAY); + }); + + if (array) { + return list.toArray(); + } + + return list; + } +} diff --git a/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/converter/ConverterExcelOption.java b/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/converter/ConverterExcelOption.java new file mode 100644 index 00000000..1f564000 --- /dev/null +++ b/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/converter/ConverterExcelOption.java @@ -0,0 +1,15 @@ +package org.jetlinks.community.io.excel.converter; + +import org.hswebframework.reactor.excel.ExcelHeader; +import org.hswebframework.reactor.excel.ExcelOption; + +/** + * @since 2.1 + */ +public interface ConverterExcelOption extends ExcelOption { + + Object convertForWrite(Object val, ExcelHeader header); + + Object convertForRead(Object cell, ExcelHeader header); + +} diff --git a/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/converter/DateConverter.java b/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/converter/DateConverter.java new file mode 100644 index 00000000..a25c0b39 --- /dev/null +++ b/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/converter/DateConverter.java @@ -0,0 +1,74 @@ +package org.jetlinks.community.io.excel.converter; + +import lombok.AllArgsConstructor; +import org.apache.poi.ss.usermodel.CellStyle; +import org.apache.poi.ss.usermodel.DataFormat; +import org.hswebframework.reactor.excel.ExcelHeader; +import org.hswebframework.reactor.excel.WritableCell; +import org.hswebframework.reactor.excel.poi.options.CellOption; +import org.jetlinks.reactor.ql.utils.CastUtils; +import org.joda.time.DateTime; +import org.joda.time.LocalDateTime; + +import java.time.LocalDate; +import java.time.ZoneId; +import java.util.Date; + +@AllArgsConstructor +public class DateConverter implements ConverterExcelOption, CellOption { + + private final String format; + + private final Class javaType; + + @Override + public Object convertForWrite(Object val, ExcelHeader header) { + return new DateTime(CastUtils.castDate(val)).toString(format); + } + + @Override + public Object convertForRead(Object val, ExcelHeader header) { + + if (null == val) { + return null; + } + + if (javaType.isInstance(val)) { + return val; + } + Date date = CastUtils.castDate(val); + if (javaType == Long.class || javaType == long.class) { + return date.getTime(); + } + if (javaType == LocalDateTime.class) { + return java.time.LocalDateTime.ofInstant(date.toInstant(), ZoneId.systemDefault()); + } + if (javaType == LocalDate.class) { + return java.time.LocalDateTime + .ofInstant(date.toInstant(), ZoneId.systemDefault()) + .toLocalDate(); + } + + return date; + } + + @Override + public void cell(org.apache.poi.ss.usermodel.Cell poiCell, WritableCell cell) { + CellStyle style = poiCell.getCellStyle(); + if (style == null) { + style = poiCell.getRow() + .getSheet() + .getWorkbook() + .createCellStyle(); + poiCell.setCellStyle(style); + } + DataFormat dataFormat = poiCell + .getRow() + .getSheet() + .getWorkbook() + .createDataFormat(); + + style.setDataFormat(dataFormat.getFormat(format)); + + } +} diff --git a/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/converter/EnumConverter.java b/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/converter/EnumConverter.java new file mode 100644 index 00000000..2ee2de63 --- /dev/null +++ b/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/converter/EnumConverter.java @@ -0,0 +1,45 @@ +package org.jetlinks.community.io.excel.converter; + +import lombok.AllArgsConstructor; +import org.hswebframework.reactor.excel.ExcelHeader; +import org.hswebframework.web.dict.EnumDict; +import org.hswebframework.web.i18n.LocaleUtils; + +import java.util.Locale; +import java.util.Objects; + +@AllArgsConstructor +public class EnumConverter implements ConverterExcelOption { + + @SuppressWarnings("all") + private final Class type; + + @Override + public Object convertForWrite(Object val, ExcelHeader header) { + if (val instanceof EnumDict) { + return ((EnumDict) val).getI18nMessage(LocaleUtils.current()); + } + if (val instanceof Enum) { + return ((Enum) val).name(); + } + + return val; + } + + @Override + @SuppressWarnings("all") + public Object convertForRead(Object val, ExcelHeader header) { + if (val == null) { + return null; + } + if (EnumDict.class.isAssignableFrom(type)) { + Locale locale = LocaleUtils.current(); + return EnumDict + .find((Class) type, e -> { + return e.eq(val) || Objects.equals(val, e.getI18nMessage(locale)); + }) + .orElse(null); + } + return Enum.valueOf(type, String.valueOf(val)); + } +} diff --git a/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/converter/StringConverter.java b/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/converter/StringConverter.java new file mode 100644 index 00000000..e5707df9 --- /dev/null +++ b/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/converter/StringConverter.java @@ -0,0 +1,18 @@ +package org.jetlinks.community.io.excel.converter; + +import org.hswebframework.reactor.excel.ExcelHeader; + +public class StringConverter implements ConverterExcelOption { + public static final StringConverter INSTANCE = new StringConverter(); + + @Override + public Object convertForWrite(Object val, ExcelHeader header) { + return val == null ? null : String.valueOf(val); + } + + @Override + public Object convertForRead(Object cell, ExcelHeader header) { + + return cell == null ? null : String.valueOf(cell); + } +} diff --git a/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/file/ClusterFileManager.java b/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/file/ClusterFileManager.java index 8264dc38..122c1b52 100644 --- a/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/file/ClusterFileManager.java +++ b/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/file/ClusterFileManager.java @@ -9,12 +9,16 @@ import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.codec.digest.DigestUtils; import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository; +import org.hswebframework.web.crud.events.EntityDeletedEvent; import org.hswebframework.web.exception.BusinessException; import org.hswebframework.web.exception.NotFoundException; import org.hswebframework.web.id.IDGenerator; +import org.jetlinks.community.config.ConfigManager; import org.jetlinks.core.rpc.RpcManager; +import org.springframework.context.event.EventListener; import org.springframework.core.io.FileSystemResource; import org.springframework.core.io.buffer.*; import org.springframework.http.codec.multipart.FilePart; @@ -27,14 +31,32 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import java.security.MessageDigest; +import java.time.Duration; import java.time.LocalDate; import java.time.format.DateTimeFormatter; import java.util.Objects; import java.util.function.Function; - +@Slf4j public class ClusterFileManager implements FileManager { + /** + *
{@code
+     * system:
+     *   config:
+     *     scopes:
+     *       - id: paths
+     *         name: 访问路径配置
+     *         public-access: true
+     *         properties:
+     *           - key: base-path
+     *             name: 接口根路径
+     *             default-value: ${api.base-path}
+     * }
+ */ + public static final String API_PATH_CONFIG_NAME = "paths"; + public static final String API_PATH_CONFIG_KEY = "base-path"; + private final FileProperties properties; private final NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT); @@ -43,19 +65,50 @@ public class ClusterFileManager implements FileManager { private final RpcManager rpcManager; + private final ConfigManager configManager; + public ClusterFileManager(RpcManager rpcManager, FileProperties properties, - ReactiveRepository repository) { + ReactiveRepository repository, + ConfigManager configManager) { new File(properties.getStorageBasePath()).mkdirs(); this.properties = properties; this.rpcManager = rpcManager; this.repository = repository; + this.configManager = configManager; rpcManager.registerService(new ServiceImpl()); + if (!properties.getTempFilePeriod().isZero()) { + Duration duration = Duration.ofHours(1); + if (duration.toMillis() > properties.getTempFilePeriod().toMillis()) { + duration = properties.getTempFilePeriod(); + } + Flux.interval(duration) + .onBackpressureDrop() + .concatMap(ignore -> repository + .createDelete() + .where(FileEntity::getServerNodeId, rpcManager.currentServerId()) + .lte(FileEntity::getCreateTime, + System.currentTimeMillis() - properties.getTempFilePeriod().toMillis()) + .and(FileEntity::getOptions, "in$any", FileOption.tempFile) + .execute() + .onErrorResume(err -> { + log.warn("delete temp file error", err); + return Mono.empty(); + })) + .subscribe(); + } + + } + + private Mono getApiBasePath() { + return configManager + .getProperties(API_PATH_CONFIG_NAME) + .mapNotNull(val -> val.getString(API_PATH_CONFIG_KEY, null)); } @Override public Mono saveFile(FilePart filePart, FileOption... options) { - return saveFile(filePart.filename(), filePart.content()); + return saveFile(filePart.filename(), filePart.content(), options); } private DataBuffer updateDigest(MessageDigest digest, DataBuffer dataBuffer) { @@ -84,9 +137,9 @@ public class ClusterFileManager implements FileManager { .map(buffer -> updateDigest(md5, updateDigest(sha256, buffer))) .as(buf -> DataBufferUtils .write(buf, path, - StandardOpenOption.WRITE, - StandardOpenOption.CREATE_NEW, - StandardOpenOption.TRUNCATE_EXISTING)) + StandardOpenOption.WRITE, + StandardOpenOption.CREATE_NEW, + StandardOpenOption.TRUNCATE_EXISTING)) .then(Mono.defer(() -> { File savedFile = Paths.get(storageBasePath, storagePath).toFile(); if (!savedFile.exists()) { @@ -101,7 +154,12 @@ public class ClusterFileManager implements FileManager { FileEntity entity = FileEntity.of(fileInfo, storagePath, serverNodeId); return repository .insert(entity) - .then(Mono.fromSupplier(entity::toInfo)); + .then(Mono.defer(() -> { + FileInfo response = entity.toInfo(); + return this + .getApiBasePath().doOnNext(response::withBasePath) + .thenReturn(response); + })); })); } @@ -138,9 +196,9 @@ public class ClusterFileManager implements FileManager { private Flux readFile(String filePath, long position) { return DataBufferUtils .read(new FileSystemResource(Paths.get(properties.getStorageBasePath(), filePath)), - position, - bufferFactory, - (int) properties.getReadBufferSize().toBytes()) + position, + bufferFactory, + (int) properties.getReadBufferSize().toBytes()) .onErrorMap(NoSuchFileException.class, e -> new NotFoundException()); } @@ -180,13 +238,27 @@ public class ClusterFileManager implements FileManager { .findById(id) .switchIfEmpty(Mono.error(NotFoundException::new)) .flatMapMany(file -> { - DefaultReaderContext context = new DefaultReaderContext(file.toInfo(), 0); - return beforeRead - .apply(context) + FileInfo fileInfo = file.toInfo(); + DefaultReaderContext context = new DefaultReaderContext(fileInfo, 0); + + return getApiBasePath() + .doOnNext(fileInfo::withBasePath) + .then(Mono.defer(() -> beforeRead.apply(context))) .thenMany(Flux.defer(() -> readFile(file, context.position))); }); } + @EventListener + public void handleDeleteEvent(EntityDeletedEvent event) { + for (FileEntity fileEntity : event.getEntity()) { + File file = Paths.get(properties.getStorageBasePath(), fileEntity.getStoragePath()).toFile(); + if (file.exists()) { + log.debug("delete file: {}", file.getAbsolutePath()); + file.delete(); + } + } + } + @AllArgsConstructor private static class DefaultReaderContext implements ReaderContext { private final FileInfo info; diff --git a/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/file/FileManagerConfiguration.java b/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/file/FileManagerConfiguration.java index 44a47c7b..f8917dbb 100644 --- a/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/file/FileManagerConfiguration.java +++ b/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/file/FileManagerConfiguration.java @@ -2,6 +2,7 @@ package org.jetlinks.community.io.file; import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository; import org.hswebframework.web.crud.annotation.EnableEasyormRepository; +import org.jetlinks.community.config.ConfigManager; import org.jetlinks.core.rpc.RpcManager; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; @@ -16,8 +17,9 @@ public class FileManagerConfiguration { @Bean public FileManager fileManager(RpcManager rpcManager, FileProperties properties, - ReactiveRepository repository){ - return new ClusterFileManager(rpcManager,properties,repository); + ReactiveRepository repository, + ConfigManager configManager){ + return new ClusterFileManager(rpcManager,properties,repository,configManager); } } diff --git a/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/file/FileOption.java b/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/file/FileOption.java index ee0a470b..9b1e6741 100644 --- a/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/file/FileOption.java +++ b/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/file/FileOption.java @@ -1,6 +1,31 @@ package org.jetlinks.community.io.file; +import org.springframework.util.StringUtils; + public enum FileOption { - publicAccess + /** + * 公开访问 + */ + publicAccess, + + /** + * 临时文件,将会被定时删除 + */ + tempFile; + + + public static FileOption[] parse(String str) { + if (!StringUtils.hasText(str)) { + return new FileOption[0]; + } + + String[] arr = str.split(","); + FileOption[] options = new FileOption[arr.length]; + + for (int i = 0; i < arr.length; i++) { + options[i] = FileOption.valueOf(arr[i]); + } + return options; + } } diff --git a/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/file/FileProperties.java b/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/file/FileProperties.java index 2f4f4be8..41a6af3c 100644 --- a/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/file/FileProperties.java +++ b/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/file/FileProperties.java @@ -5,6 +5,8 @@ import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.util.unit.DataSize; +import java.time.Duration; + @Getter @Setter @ConfigurationProperties("file.manager") @@ -14,4 +16,10 @@ public class FileProperties { private DataSize readBufferSize = DataSize.ofKilobytes(64); + private String accessBaseUrl; + + /** + * 临时文件保存有效期,0为一直有效 + */ + private Duration tempFilePeriod = Duration.ZERO; } diff --git a/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/utils/FileUtils.java b/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/utils/FileUtils.java index a0ecae49..aefcb108 100644 --- a/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/utils/FileUtils.java +++ b/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/utils/FileUtils.java @@ -1,21 +1,111 @@ package org.jetlinks.community.io.utils; +import io.netty.buffer.ByteBufAllocator; import lombok.SneakyThrows; import org.apache.commons.io.FilenameUtils; +import org.jetlinks.core.message.codec.http.HttpUtils; +import org.springframework.core.io.Resource; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.NettyDataBuffer; +import org.springframework.core.io.buffer.NettyDataBufferFactory; +import org.springframework.http.MediaType; +import org.springframework.util.StringUtils; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; -import java.net.URLDecoder; +import java.io.FileInputStream; +import java.io.InputStream; public class FileUtils { - @SneakyThrows public static String getExtension(String url) { - url = URLDecoder.decode(url, "utf8"); + url = HttpUtils.urlDecode(url); if (url.contains("?")) { - url = url.substring(0,url.lastIndexOf("?")); + url = url.substring(0, url.lastIndexOf("?")); } if (url.contains("#")) { - url = url.substring(0,url.lastIndexOf("#")); + url = url.substring(0, url.lastIndexOf("#")); } return FilenameUtils.getExtension(url); } + + public static String getFileName(String url) { + url = HttpUtils.urlDecode(url); + if (url.contains("?")) { + url = url.substring(0, url.lastIndexOf("?")); + } + if (url.contains("#")) { + url = url.substring(0, url.lastIndexOf("#")); + } + return url.substring(url.lastIndexOf("/") + 1); + } + + public static MediaType getMediaTypeByName(String name) { + return getMediaTypeByExtension(FilenameUtils.getExtension(name)); + } + + public static MediaType getMediaTypeByExtension(String extension) { + if (!StringUtils.hasText(extension)) { + return MediaType.APPLICATION_OCTET_STREAM; + } + switch (extension.toLowerCase()) { + case "jpg": + case "jpeg": + return MediaType.IMAGE_JPEG; + case "png": + return MediaType.IMAGE_PNG; + case "gif": + return MediaType.IMAGE_GIF; + case "mp4": + return MediaType.parseMediaType("video/mp4"); + case "flv": + return MediaType.parseMediaType("video/x-flv"); + case "text": + case "txt": + return MediaType.TEXT_PLAIN; + case "js": + return MediaType.APPLICATION_JSON; + default: + return MediaType.APPLICATION_OCTET_STREAM; + } + } + + public static Mono dataBufferToInputStream(Flux dataBufferFlux) { + NettyDataBufferFactory factory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT); + + return DataBufferUtils + .join(dataBufferFlux + .map(buffer -> { + if (buffer instanceof NettyDataBuffer) { + return buffer; + } + try { + return factory.wrap(buffer.asByteBuffer()); + } finally { + DataBufferUtils.release(buffer); + } + })) + .map(buffer -> buffer.asInputStream(true)); + + } + + public static Mono readInputStream(WebClient client, + String fileUrl) { + return Mono.defer(() -> { + if (fileUrl.startsWith("http")) { + return client + .get() + .uri(fileUrl) + .accept(MediaType.APPLICATION_OCTET_STREAM) + .exchangeToMono(clientResponse -> clientResponse.bodyToMono(Resource.class)) + .flatMap(resource -> Mono.fromCallable(resource::getInputStream)); + } else { + return Mono.fromCallable(() -> new FileInputStream(fileUrl)); + } + }); + + } + } diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/response/ImportDeviceInstanceResult.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/response/ImportDeviceInstanceResult.java index 4851b4a1..923f5907 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/response/ImportDeviceInstanceResult.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/response/ImportDeviceInstanceResult.java @@ -1,6 +1,7 @@ package org.jetlinks.community.device.response; import lombok.AllArgsConstructor; +import lombok.Generated; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; @@ -18,14 +19,19 @@ public class ImportDeviceInstanceResult { private String message; + private String detailFile; + + @Generated public static ImportDeviceInstanceResult success(SaveResult result) { - return new ImportDeviceInstanceResult(result, true, null); + return new ImportDeviceInstanceResult(result, true, null, null); } + @Generated public static ImportDeviceInstanceResult error(String message) { - return new ImportDeviceInstanceResult(null, false, message); + return new ImportDeviceInstanceResult(null, false, message, null); } + @Generated public static ImportDeviceInstanceResult error(Throwable message) { return error(message.getMessage()); } diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceInstanceController.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceInstanceController.java index 81150fd3..8ec51cee 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceInstanceController.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceInstanceController.java @@ -25,7 +25,6 @@ import org.hswebframework.web.exception.NotFoundException; import org.hswebframework.web.exception.ValidationException; import org.hswebframework.web.i18n.LocaleUtils; import org.hswebframework.web.id.IDGenerator; -import org.jetlinks.community.PropertyMetric; import org.jetlinks.community.device.entity.*; import org.jetlinks.community.device.enums.DeviceState; import org.jetlinks.community.device.response.DeviceDeployResult; @@ -36,12 +35,15 @@ import org.jetlinks.community.device.service.DeviceConfigMetadataManager; import org.jetlinks.community.device.service.LocalDeviceInstanceService; import org.jetlinks.community.device.service.LocalDeviceProductService; import org.jetlinks.community.device.service.data.DeviceDataService; +import org.jetlinks.community.device.web.excel.DeviceExcelImporter; import org.jetlinks.community.device.web.excel.DeviceExcelInfo; import org.jetlinks.community.device.web.excel.DeviceWrapper; import org.jetlinks.community.device.web.excel.PropertyMetadataExcelInfo; import org.jetlinks.community.device.web.excel.PropertyMetadataWrapper; import org.jetlinks.community.device.web.request.AggRequest; +import org.jetlinks.community.io.excel.AbstractImporter; import org.jetlinks.community.io.excel.ImportExportService; +import org.jetlinks.community.io.file.FileManager; import org.jetlinks.community.io.utils.FileUtils; import org.jetlinks.community.relation.RelationObjectProvider; import org.jetlinks.community.relation.service.RelationService; @@ -65,11 +67,14 @@ import org.springframework.data.util.Lazy; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.http.server.reactive.ServerHttpResponse; +import org.springframework.transaction.reactive.TransactionalOperator; import org.springframework.util.StringUtils; import org.springframework.web.bind.annotation.*; +import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; +import reactor.util.concurrent.Queues; import reactor.util.function.Tuple2; import reactor.util.function.Tuple4; import reactor.util.function.Tuples; @@ -112,6 +117,11 @@ public class DeviceInstanceController implements private final RelationService relationService; + private final TransactionalOperator transactionalOperator; + + private final FileManager fileManager; + + private final WebClient webClient; @SuppressWarnings("all") public DeviceInstanceController(LocalDeviceInstanceService service, @@ -121,7 +131,10 @@ public class DeviceInstanceController implements ReactiveRepository tagRepository, DeviceDataService deviceDataService, DeviceConfigMetadataManager metadataManager, - RelationService relationService) { + RelationService relationService, + TransactionalOperator transactionalOperator, + FileManager fileManager, + WebClient.Builder builder) { this.service = service; this.registry = registry; this.productService = productService; @@ -130,6 +143,9 @@ public class DeviceInstanceController implements this.deviceDataService = deviceDataService; this.metadataManager = metadataManager; this.relationService = relationService; + this.transactionalOperator = transactionalOperator; + this.fileManager = fileManager; + this.webClient = builder.build(); } @@ -521,8 +537,10 @@ public class DeviceInstanceController implements @SaveAction @Operation(summary = "导入设备数据") public Flux doBatchImportByProduct(@PathVariable @Parameter(description = "产品ID") String productId, + @RequestParam(defaultValue = "false") @Parameter(description = "自动启用") boolean autoDeploy, @RequestParam(required = false) @Parameter(description = "文件地址,支持csv,xlsx文件格式") String fileUrl, - @RequestParam(required = false) @Parameter(description = "文件Id") String fileId) { + @RequestParam(required = false) @Parameter(description = "文件Id") String fileId, + @RequestParam(defaultValue = "32") @Parameter int speed) { return Authentication .currentReactive() .flatMapMany(auth -> { @@ -549,20 +567,80 @@ public class DeviceInstanceController implements } return Tuples.of(entity, info.getTags()); }) - .buffer(100)//每100条数据保存一次 - .publishOn(Schedulers.single()) - .concatMap(buffer -> - Mono.zip( - service.save(Flux.fromIterable(buffer).map(Tuple2::getT1)), - tagRepository - .save(Flux.fromIterable(buffer).flatMapIterable(Tuple2::getT2)) - .defaultIfEmpty(SaveResult.of(0, 0)) - )) - .map(res -> ImportDeviceInstanceResult.success(res.getT1())) - .onErrorResume(err -> Mono.just(ImportDeviceInstanceResult.error(err))); + .as(flux -> handleImportDevice(flux, autoDeploy, speed)); }); } + @GetMapping(value = "/{productId}/import/_withlog", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + @SaveAction + @Operation(summary = "导入设备数据,并提供日志下载") + public Flux doBatchImportByProductWithLog( + @PathVariable @Parameter(description = "产品ID") String productId, + @RequestParam(defaultValue = "false") @Parameter(description = "自动启用") boolean autoDeploy, + @RequestParam @Parameter(description = "文件地址,支持csv,xlsx文件格式") String fileUrl, + @RequestParam(defaultValue = "32") @Parameter int speed + ) { + return Authentication + .currentReactive() + .flatMapMany(auth -> this + .getDeviceProductDetail(productId) + .map(tp4 -> new DeviceExcelImporter(fileManager, webClient, tp4.getT1(), tp4.getT4(), auth)) + .flatMapMany(importer -> importer + .doImport(fileUrl) + .groupBy( + result -> result.isSuccess() && result.getType() == AbstractImporter.ImportResultType.data, + Integer.MAX_VALUE + ) + .flatMap(group -> { + // 处理导入成功的设备 + if (group.key()) { + return group + .map(result -> Tuples.of(result.getData().getDevice(), result.getData().getTags())) + .as(flux -> handleImportDevice(flux, autoDeploy, speed)); + } + // 返回错误信息和导入结果详情文件地址 + return group + .map(result -> { + ImportDeviceInstanceResult response = new ImportDeviceInstanceResult(); + response.setSuccess(result.isSuccess()); + if (StringUtils.hasText(result.getMessage())) { + response.setMessage(String.format("第%d行:%s", result.getRow(), result.getMessage())); + } + response.setDetailFile(result.getDetailFile()); + return response; + }); + }) + ) + ); + } + + private Flux handleImportDevice(Flux>> flux, + boolean autoDeploy, + int speed) { + return flux + .buffer(100)//每100条数据保存一次 + .map(Flux::fromIterable) + .flatMap(buffer -> Mono + .zip(buffer + .map(Tuple2::getT1) + .as(service::save) + .flatMap(res -> { + if (autoDeploy) { + return service + .deploy(buffer.map(Tuple2::getT1)) + .then(Mono.just(res)); + } + return Mono.just(res); + }), + tagRepository + .save(buffer.flatMapIterable(Tuple2::getT2)) + .defaultIfEmpty(SaveResult.of(0, 0))) + .as(transactionalOperator::transactional), + Math.min(speed, Queues.XS_BUFFER_SIZE)) + .map(res -> ImportDeviceInstanceResult.success(res.getT1())) + .onErrorResume(err -> Mono.just(ImportDeviceInstanceResult.error(err))); + } + //获取导出模版 @GetMapping("/{productId}/template.{format}") @QueryAction diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/excel/DeviceExcelImporter.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/excel/DeviceExcelImporter.java new file mode 100644 index 00000000..a43b89e6 --- /dev/null +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/excel/DeviceExcelImporter.java @@ -0,0 +1,80 @@ +package org.jetlinks.community.device.web.excel; + +import lombok.Getter; +import org.hswebframework.web.authorization.Authentication; +import org.hswebframework.web.validator.ValidatorUtils; +import org.jetlinks.community.device.entity.DeviceProductEntity; +import org.jetlinks.community.io.excel.AbstractImporter; +import org.jetlinks.community.io.excel.ImportHelper; +import org.jetlinks.community.io.file.FileManager; +import org.jetlinks.core.metadata.ConfigPropertyMetadata; +import org.jetlinks.core.metadata.PropertyMetadata; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * 设备导入. + * + * @author zhangji 2023/6/28 + * @since 2.1 + */ +public class DeviceExcelImporter extends AbstractImporter { + + @Getter + private final DeviceProductEntity product; + + private final Map tagMapping = new HashMap<>(); + + private final Map configMapping = new HashMap<>(); + + private final Authentication auth; + + public DeviceExcelImporter(FileManager fileManager, + WebClient client, + DeviceProductEntity product, + List configs, + Authentication auth) { + super(fileManager, client); + this.product = product; + this.auth = auth; + List tags = product.parseMetadata().getTags(); + for (PropertyMetadata tag : tags) { + tagMapping.put(tag.getName(), tag); + } + for (ConfigPropertyMetadata config : configs) { + configMapping.put(config.getName(), config); + } + } + + @Override + protected Mono handleData(Flux data) { + return data + .doOnNext(ValidatorUtils::tryValidate) + .map(deviceExcelInfo -> deviceExcelInfo.initDeviceInstance(product, auth)) + .then(); + } + + @Override + protected DeviceExcelInfo newInstance() { + DeviceExcelInfo deviceExcelInfo = new DeviceExcelInfo(); + deviceExcelInfo.setTagMapping(tagMapping); + deviceExcelInfo.setConfigMapping(configMapping); + return deviceExcelInfo; + } + + @Override + protected void customImport(ImportHelper helper) { + helper.fallbackSingle(true); + for (PropertyMetadata tag : tagMapping.values()) { + helper.addHeader(tag.getId(), tag.getName()); + } + for (ConfigPropertyMetadata config : configMapping.values()) { + helper.addHeader(config.getProperty(), config.getName()); + } + } +} diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/excel/DeviceExcelInfo.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/excel/DeviceExcelInfo.java index fb457581..35868bd2 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/excel/DeviceExcelInfo.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/excel/DeviceExcelInfo.java @@ -1,12 +1,18 @@ package org.jetlinks.community.device.web.excel; +import com.alibaba.fastjson.JSONObject; import lombok.Getter; import lombok.Setter; import org.hswebframework.reactor.excel.CellDataType; import org.hswebframework.reactor.excel.ExcelHeader; +import org.hswebframework.web.authorization.Authentication; import org.hswebframework.web.bean.FastBeanCopier; +import org.hswebframework.web.validator.ValidatorUtils; +import org.jetlinks.community.device.entity.DeviceInstanceEntity; +import org.jetlinks.community.device.entity.DeviceProductEntity; import org.jetlinks.community.device.entity.DeviceTagEntity; import org.jetlinks.core.metadata.ConfigPropertyMetadata; +import org.jetlinks.core.metadata.Jsonable; import org.jetlinks.core.metadata.PropertyMetadata; import org.springframework.util.StringUtils; @@ -15,11 +21,13 @@ import java.util.*; @Getter @Setter -public class DeviceExcelInfo { +public class DeviceExcelInfo implements Jsonable { + @org.jetlinks.community.io.excel.annotation.ExcelHeader(value = "设备ID") @NotBlank(message = "设备ID不能为空") private String id; + @org.jetlinks.community.io.excel.annotation.ExcelHeader(value = "设备名称") @NotBlank(message = "设备名称不能为空") private String name; @@ -27,10 +35,17 @@ public class DeviceExcelInfo { private String productName; + @org.jetlinks.community.io.excel.annotation.ExcelHeader(value = "父设备ID") private String parentId; private List tags = new ArrayList<>(); + private DeviceInstanceEntity device; + + private Map tagMapping; + + private Map configMapping; + private Map configuration = new HashMap<>(); private long rowNumber; @@ -129,4 +144,51 @@ public class DeviceExcelInfo { return mapping; } + + public DeviceExcelInfo initDeviceInstance(DeviceProductEntity product, Authentication auth) { + DeviceInstanceEntity entity = FastBeanCopier.copy(this, new DeviceInstanceEntity()); + + entity.setProductId(product.getId()); + entity.setProductName(product.getName()); + + entity.setCreateTimeNow(); + entity.setCreatorId(auth.getUser().getId()); + entity.setCreatorName(auth.getUser().getName()); + + entity.setModifyTimeNow(); + entity.setModifierId(auth.getUser().getId()); + entity.setModifierName(auth.getUser().getName()); + + ValidatorUtils.tryValidate(entity); + + this.device = entity; + return this; + } + + @Override + public void fromJson(JSONObject json) { + Jsonable.super.fromJson(json); + + for (Map.Entry entry : tagMapping.entrySet()) { + PropertyMetadata maybeTag = entry.getValue(); + if (maybeTag != null) { + tag( + maybeTag.getId(), + entry.getKey(), + Optional.of(json.getString(maybeTag.getId())).orElse(null), + maybeTag.getValueType().getId() + ); + } + } + + for (Map.Entry entry : configMapping.entrySet()) { + ConfigPropertyMetadata maybeConfig = entry.getValue(); + if (maybeConfig != null) { + config( + maybeConfig.getProperty(), + Optional.of(json.getString(maybeConfig.getProperty())).orElse(null) + ); + } + } + } }