diff --git a/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/DefaultImportExportService.java b/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/DefaultImportExportService.java index da68b51d..9ff62ebb 100644 --- a/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/DefaultImportExportService.java +++ b/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/DefaultImportExportService.java @@ -1,7 +1,13 @@ package org.jetlinks.community.io.excel; +import org.hswebframework.reactor.excel.converter.RowWrapper; +import org.hswebframework.utils.StringUtils; import org.jetlinks.community.io.excel.easyexcel.ExcelReadDataListener; +import org.jetlinks.community.io.file.FileManager; +import org.jetlinks.community.io.utils.FileUtils; import org.springframework.core.io.Resource; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.http.MediaType; import org.springframework.stereotype.Component; import org.springframework.web.reactive.function.client.WebClient; @@ -11,6 +17,8 @@ import reactor.core.publisher.Mono; import java.io.FileInputStream; import java.io.InputStream; +import static org.hswebframework.reactor.excel.ReactorExcel.read; + /** * @author bsetfeng * @since 1.0 @@ -20,8 +28,12 @@ public class DefaultImportExportService implements ImportExportService { private WebClient client; - public DefaultImportExportService(WebClient.Builder builder) { + private final FileManager fileManager; + + public DefaultImportExportService(WebClient.Builder builder, + FileManager fileManager) { client = builder.build(); + this.fileManager = fileManager; } public Flux> doImport(Class clazz, String fileUrl) { @@ -34,6 +46,24 @@ public class DefaultImportExportService implements ImportExportService { return ExcelReadDataListener.of(stream, clazz); } + + @Override + public Flux readData(String fileUrl, String fileId, RowWrapper wrapper) { + if (!StringUtils.isNullOrEmpty(fileUrl)) { + return getInputStream(fileUrl) + .flatMapMany(inputStream -> read(inputStream, FileUtils.getExtension(fileUrl), wrapper)); + } else { + + return Mono + .zip(fileManager + .read(fileId) + .as(DataBufferUtils::join) + .map(DataBuffer::asInputStream), + fileManager.getFile(fileId)) + .flatMapMany(t2 -> read(t2.getT1(), t2.getT2().getExtension(), wrapper)); + } + } + public Mono getInputStream(String fileUrl) { return Mono.defer(() -> { diff --git a/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/ImportExportService.java b/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/ImportExportService.java index 256d023b..a4ef733c 100644 --- a/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/ImportExportService.java +++ b/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/excel/ImportExportService.java @@ -1,6 +1,7 @@ package org.jetlinks.community.io.excel; +import org.hswebframework.reactor.excel.converter.RowWrapper; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -19,4 +20,8 @@ public interface ImportExportService { Mono getInputStream(String fileUrl); + Flux readData(String fileUrl, String fileId, RowWrapper wrapper); + + + } diff --git a/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/file/DefaultFileManager.java b/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/file/DefaultFileManager.java new file mode 100644 index 00000000..40f19bb9 --- /dev/null +++ b/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/file/DefaultFileManager.java @@ -0,0 +1,213 @@ +package org.jetlinks.community.io.file; + +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufUtil; +import lombok.AllArgsConstructor; +import org.apache.commons.codec.digest.DigestUtils; +import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository; +import org.hswebframework.web.exception.BusinessException; +import org.hswebframework.web.id.IDGenerator; +import org.springframework.core.io.FileSystemResource; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.NettyDataBufferFactory; +import org.springframework.http.ContentDisposition; +import org.springframework.http.HttpRange; +import org.springframework.http.MediaType; +import org.springframework.http.client.MultipartBodyBuilder; +import org.springframework.http.codec.multipart.FilePart; +import org.springframework.web.reactive.function.BodyInserters; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.io.File; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.security.MessageDigest; +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; +import java.util.Collections; +import java.util.Objects; +import java.util.function.Function; + + +public class DefaultFileManager implements FileManager { + + private final FileProperties properties; + + private final DataBufferFactory bufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT); + + private final ReactiveRepository repository; + + + private final WebClient client; + + public DefaultFileManager(WebClient.Builder builder, + FileProperties properties, + ReactiveRepository repository) { + new File(properties.getStorageBasePath()).mkdirs(); + this.properties = properties; + this.client = builder + .clone() + .filter(this.properties.createWebClientRute()) + .build(); + this.repository = repository; + } + + @Override + public Mono saveFile(FilePart filePart) { + return saveFile(filePart.filename(), filePart.content()); + } + + private DataBuffer updateDigest(MessageDigest digest, DataBuffer dataBuffer) { + dataBuffer = DataBufferUtils.retain(dataBuffer); + digest.update(dataBuffer.asByteBuffer()); + DataBufferUtils.release(dataBuffer); + return dataBuffer; + } + + public Mono saveFileToCluster(String name, Flux stream) { + String serverId = properties.selectServerNode(); + MultipartBodyBuilder builder = new MultipartBodyBuilder(); + builder.asyncPart("file", stream, DataBuffer.class) + .headers(header -> header + .setContentDisposition(ContentDisposition + .builder("form-data") + .name("file") + .filename(name) + .build())) + .contentType(MediaType.APPLICATION_OCTET_STREAM); + return client + .post() + .uri("http://" + serverId + "/file/" +serverId) + .attribute(FileProperties.serverNodeIdAttr, serverId) + .contentType(MediaType.MULTIPART_FORM_DATA) + .body(BodyInserters.fromMultipartData(builder.build())) + .retrieve() + .bodyToMono(FileInfo.class); + } + + public Mono doSaveFile(String name, Flux stream) { + LocalDate now = LocalDate.now(); + FileInfo fileInfo = new FileInfo(); + fileInfo.setId(IDGenerator.MD5.generate()); + fileInfo.withFileName(name); + + String storagePath = now.format(DateTimeFormatter.BASIC_ISO_DATE) + + "/" + fileInfo.getId() + "." + fileInfo.getExtension(); + + MessageDigest md5 = DigestUtils.getMd5Digest(); + MessageDigest sha256 = DigestUtils.getSha256Digest(); + String storageBasePath = properties.getStorageBasePath(); + String serverNodeId = properties.getServerNodeId(); + Path path = Paths.get(storageBasePath, storagePath); + path.toFile().getParentFile().mkdirs(); + return stream + .map(buffer -> updateDigest(md5, updateDigest(sha256, buffer))) + .as(buf -> DataBufferUtils + .write(buf, path, + StandardOpenOption.WRITE, + StandardOpenOption.CREATE_NEW, + StandardOpenOption.TRUNCATE_EXISTING)) + .then(Mono.defer(() -> { + File savedFile = Paths.get(storageBasePath, storagePath).toFile(); + if (!savedFile.exists()) { + return Mono.error(new BusinessException("error.file_storage_failed")); + } + fileInfo.setMd5(ByteBufUtil.hexDump(md5.digest())); + fileInfo.setSha256(ByteBufUtil.hexDump(sha256.digest())); + fileInfo.setLength(savedFile.length()); + fileInfo.setCreateTime(System.currentTimeMillis()); + FileEntity entity = FileEntity.of(fileInfo, storagePath, serverNodeId); + return repository + .insert(entity) + .then(Mono.fromSupplier(entity::toInfo)); + })); + } + + @Override + public Mono saveFile(String name, Flux stream) { + if (properties.getClusterRute().isEmpty() + || properties.getClusterRute().containsKey(properties.getServerNodeId())) { + return doSaveFile(name, stream); + } + //配置里集群,但是并不支持本节点,则保存到其他节点 + return saveFileToCluster(name, stream); + } + + @Override + public Mono getFile(String id) { + return repository + .findById(id) + .map(FileEntity::toInfo); + } + + private Flux readFile(String filePath, long position) { + return DataBufferUtils + .read(new FileSystemResource(Paths.get(properties.getStorageBasePath(), filePath)), + position, + bufferFactory, + properties.getReadBufferSize()); + } + + private Flux readFile(FileEntity file, long position) { + if (Objects.equals(file.getServerNodeId(), properties.getServerNodeId())) { + return readFile(file.getStoragePath(), position); + } + return readFromAnotherServer(file, position); + } + + protected Flux readFromAnotherServer(FileEntity file, long position) { + return client + .get() + .uri("http://" + file.getServerNodeId() + "/file/{serverNodeId}/{fileId}", file.getServerNodeId(), file.getId()) + .attribute(FileProperties.serverNodeIdAttr, file.getServerNodeId()) + .headers(header -> header.setRange(Collections.singletonList(HttpRange.createByteRange(position)))) + .retrieve() + .bodyToFlux(DataBuffer.class); + } + + @Override + public Flux read(String id) { + return read(id, 0); + } + + @Override + public Flux read(String id, long position) { + return repository + .findById(id) + .flatMapMany(file -> readFile(file, position)); + } + + @Override + public Flux read(String id, Function> beforeRead) { + return repository + .findById(id) + .flatMapMany(file -> { + DefaultReaderContext context = new DefaultReaderContext(file.toInfo(), 0); + return beforeRead + .apply(context) + .thenMany(Flux.defer(() -> readFile(file, context.position))); + }); + } + + @AllArgsConstructor + private static class DefaultReaderContext implements ReaderContext { + private final FileInfo info; + private long position; + + @Override + public FileInfo info() { + return info; + } + + @Override + public void position(long position) { + this.position = position; + } + } + +} diff --git a/jetlinks-components/network-component/tcp-component/pom.xml b/jetlinks-components/network-component/tcp-component/pom.xml index 3c8290cd..c654146c 100644 --- a/jetlinks-components/network-component/tcp-component/pom.xml +++ b/jetlinks-components/network-component/tcp-component/pom.xml @@ -30,6 +30,12 @@ ${project.version} + + org.apache.commons + commons-text + 1.9 + + \ No newline at end of file diff --git a/jetlinks-components/notify-component/notify-email/pom.xml b/jetlinks-components/notify-component/notify-email/pom.xml index d7fc0ee4..9938fc77 100644 --- a/jetlinks-components/notify-component/notify-email/pom.xml +++ b/jetlinks-components/notify-component/notify-email/pom.xml @@ -44,6 +44,13 @@ 1.14.3 + + org.jetlinks.community + io-component + ${project.version} + + + \ No newline at end of file diff --git a/jetlinks-components/notify-component/notify-email/src/main/java/org/jetlinks/community/notify/email/embedded/DefaultEmailNotifier.java b/jetlinks-components/notify-component/notify-email/src/main/java/org/jetlinks/community/notify/email/embedded/DefaultEmailNotifier.java index 3f92e308..88802b0d 100755 --- a/jetlinks-components/notify-component/notify-email/src/main/java/org/jetlinks/community/notify/email/embedded/DefaultEmailNotifier.java +++ b/jetlinks-components/notify-component/notify-email/src/main/java/org/jetlinks/community/notify/email/embedded/DefaultEmailNotifier.java @@ -11,12 +11,15 @@ import org.hswebframework.web.utils.ExpressionUtils; import org.hswebframework.web.utils.TemplateParser; import org.hswebframework.web.validator.ValidatorUtils; import org.jetlinks.core.Values; +import org.jetlinks.community.io.file.FileManager; import org.jetlinks.community.notify.*; import org.jetlinks.community.notify.email.EmailProvider; import org.jetlinks.community.notify.template.TemplateManager; import org.jsoup.Jsoup; import org.jsoup.nodes.Document; import org.jsoup.nodes.Element; +import org.springframework.core.io.*; +import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.ByteArrayResource; import org.springframework.core.io.FileSystemResource; import org.springframework.core.io.InputStreamSource; @@ -67,17 +70,23 @@ public class DefaultEmailNotifier extends AbstractNotifier { @Setter private boolean enableFileSystemAttachment = Boolean.getBoolean("email.attach.local-file.enabled"); + private final FileManager fileManager; + public static Scheduler scheduler = Schedulers.elastic(); - public DefaultEmailNotifier(NotifierProperties properties, TemplateManager templateManager) { + public DefaultEmailNotifier(NotifierProperties properties, + TemplateManager templateManager, + FileManager fileManager) { this(properties.getId(), new JSONObject(properties.getConfiguration()).toJavaObject(DefaultEmailProperties.class), - templateManager); + templateManager, + fileManager); } public DefaultEmailNotifier(String id, DefaultEmailProperties properties, - TemplateManager templateManager) { + TemplateManager templateManager, + FileManager fileManager) { super(templateManager); ValidatorUtils.tryValidate(properties); JavaMailSenderImpl mailSender = new JavaMailSenderImpl(); @@ -89,6 +98,7 @@ public class DefaultEmailNotifier extends AbstractNotifier { this.notifierId = id; this.sender = properties.getSender(); this.javaMailSender = mailSender; + this.fileManager = fileManager; } @Nonnull @@ -159,7 +169,7 @@ public class DefaultEmailNotifier extends AbstractNotifier { } - protected Mono convertResource(String resource) { + protected Mono convertResource(String resource) { if (resource.startsWith("http")) { return WebClient .create() @@ -172,12 +182,17 @@ public class DefaultEmailNotifier extends AbstractNotifier { return Mono.just( new ByteArrayResource(Base64.decodeBase64(base64)) ); - } else if (enableFileSystemAttachment) { + } else if (enableFileSystemAttachment && resource.contains("/")) { return Mono.just( new FileSystemResource(resource) ); } else { - throw new UnsupportedOperationException("不支持的文件地址:" + resource); + return fileManager + .read(resource) + .as(DataBufferUtils::join) + .map(dataBuffer -> new ByteArrayResource(dataBuffer.asByteBuffer().array())) + .onErrorResume(e-> Mono.error(()-> new UnsupportedOperationException("不支持的文件地址:" + resource))) + .switchIfEmpty(Mono.error(()-> new UnsupportedOperationException("不支持的文件地址:" + resource))); } } diff --git a/jetlinks-components/notify-component/notify-email/src/main/java/org/jetlinks/community/notify/email/embedded/DefaultEmailNotifierProvider.java b/jetlinks-components/notify-component/notify-email/src/main/java/org/jetlinks/community/notify/email/embedded/DefaultEmailNotifierProvider.java index dc48bca3..1832a492 100644 --- a/jetlinks-components/notify-component/notify-email/src/main/java/org/jetlinks/community/notify/email/embedded/DefaultEmailNotifierProvider.java +++ b/jetlinks-components/notify-component/notify-email/src/main/java/org/jetlinks/community/notify/email/embedded/DefaultEmailNotifierProvider.java @@ -1,6 +1,7 @@ package org.jetlinks.community.notify.email.embedded; import com.alibaba.fastjson.JSON; +import org.jetlinks.community.io.file.FileManager; import org.jetlinks.community.notify.*; import org.jetlinks.community.notify.email.EmailProvider; import org.jetlinks.community.notify.template.TemplateManager; @@ -22,8 +23,12 @@ public class DefaultEmailNotifierProvider implements NotifierProvider, TemplateP private final TemplateManager templateManager; - public DefaultEmailNotifierProvider(TemplateManager templateManager) { + private final FileManager fileManager; + + public DefaultEmailNotifierProvider(TemplateManager templateManager, + FileManager fileManager) { this.templateManager = templateManager; + this.fileManager = fileManager; } @Nonnull @@ -111,7 +116,7 @@ public class DefaultEmailNotifierProvider implements NotifierProvider, TemplateP @Nonnull @Override public Mono createNotifier(@Nonnull NotifierProperties properties) { - return Mono.fromSupplier(() -> new DefaultEmailNotifier(properties, templateManager)); + return Mono.fromSupplier(() -> new DefaultEmailNotifier(properties, templateManager, fileManager)); } @Override diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceInstanceController.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceInstanceController.java index 8a964927..e4241e81 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceInstanceController.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceInstanceController.java @@ -446,7 +446,8 @@ public class DeviceInstanceController implements @SaveAction @Operation(summary = "导入设备数据") public Flux doBatchImportByProduct(@PathVariable @Parameter(description = "产品ID") String productId, - @RequestParam @Parameter(description = "文件地址,支持csv,xlsx文件格式") String fileUrl) { + @RequestParam(required = false) @Parameter(description = "文件地址,支持csv,xlsx文件格式") String fileUrl, + @RequestParam(required = false) @Parameter(description = "文件Id") String fileId) { return Authentication .currentReactive() .flatMapMany(auth -> { @@ -461,8 +462,7 @@ public class DeviceInstanceController implements .getDeviceProductDetail(productId) .map(tp4 -> Tuples.of(new DeviceWrapper(tp4.getT3().getTags(), tp4.getT4()), tp4.getT1())) .flatMapMany(wrapper -> importExportService - .getInputStream(fileUrl) - .flatMapMany(inputStream -> read(inputStream, FileUtils.getExtension(fileUrl), wrapper.getT1())) + .readData(fileUrl, fileId, wrapper.getT1()) .doOnNext(info -> info.setProductName(wrapper.getT2().getName())) ) .map(info -> { diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/GatewayDeviceController.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/GatewayDeviceController.java index 4a8b0dee..e05d002b 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/GatewayDeviceController.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/GatewayDeviceController.java @@ -137,7 +137,14 @@ public class GatewayDeviceController { .execute() .then(registry .getDevice(deviceId) - .flatMap(operator -> operator.setConfig(DeviceConfigKey.parentGatewayId, gatewayId))) + .flatMap(operator -> operator.setConfig(DeviceConfigKey.parentGatewayId, gatewayId)) + ).then(registry.getDevice(gatewayId) + .flatMap(gwOperator -> gwOperator.getProtocol() + .map(protocolSupport -> protocolSupport.onChildBind(gwOperator, + Flux.from(registry.getDevice(deviceId))) + ) + ) + ) ) .then(getGatewayInfo(gatewayId)); } @@ -168,9 +175,14 @@ public class GatewayDeviceController { .getDevice(id) .flatMap(operator -> operator.setConfig(DeviceConfigKey.parentGatewayId, gatewayId))) .then() - )) - .then(getGatewayInfo(gatewayId)); - + ).then(registry.getDevice(gatewayId) + .flatMap(gwOperator -> gwOperator.getProtocol() + .map(protocolSupport -> protocolSupport.onChildBind(gwOperator, + Flux.fromIterable(deviceIdList).flatMap(id -> registry.getDevice(id))) + ) + ) + ) + ).then(getGatewayInfo(gatewayId)); } @PostMapping("/{gatewayId}/unbind/{deviceId}") @@ -188,6 +200,13 @@ public class GatewayDeviceController { .flatMap(i -> registry .getDevice(deviceId) .flatMap(operator -> operator.removeConfig(DeviceConfigKey.parentGatewayId.getKey()))) + .then(registry.getDevice(gatewayId) + .flatMap(gwOperator -> gwOperator.getProtocol() + .map(protocolSupport -> protocolSupport.onChildUnbind(gwOperator, + Flux.from(registry.getDevice(deviceId))) + ) + ) + ) .then(getGatewayInfo(gatewayId)); }