From 5f871ebd1389ad67035eeeb23defcf0f7dfbb2a0 Mon Sep 17 00:00:00 2001 From: zhouhao Date: Tue, 14 Jun 2022 17:09:41 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../community/io/file/DefaultFileManager.java | 213 ------------------ .../notify-component/notify-email/pom.xml | 1 - 2 files changed, 214 deletions(-) delete mode 100644 jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/file/DefaultFileManager.java diff --git a/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/file/DefaultFileManager.java b/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/file/DefaultFileManager.java deleted file mode 100644 index 40f19bb9..00000000 --- a/jetlinks-components/io-component/src/main/java/org/jetlinks/community/io/file/DefaultFileManager.java +++ /dev/null @@ -1,213 +0,0 @@ -package org.jetlinks.community.io.file; - -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.ByteBufUtil; -import lombok.AllArgsConstructor; -import org.apache.commons.codec.digest.DigestUtils; -import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository; -import org.hswebframework.web.exception.BusinessException; -import org.hswebframework.web.id.IDGenerator; -import org.springframework.core.io.FileSystemResource; -import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.DataBufferFactory; -import org.springframework.core.io.buffer.DataBufferUtils; -import org.springframework.core.io.buffer.NettyDataBufferFactory; -import org.springframework.http.ContentDisposition; -import org.springframework.http.HttpRange; -import org.springframework.http.MediaType; -import org.springframework.http.client.MultipartBodyBuilder; -import org.springframework.http.codec.multipart.FilePart; -import org.springframework.web.reactive.function.BodyInserters; -import org.springframework.web.reactive.function.client.WebClient; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.io.File; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.StandardOpenOption; -import java.security.MessageDigest; -import java.time.LocalDate; -import java.time.format.DateTimeFormatter; -import java.util.Collections; -import java.util.Objects; -import java.util.function.Function; - - -public class DefaultFileManager implements FileManager { - - private final FileProperties properties; - - private final DataBufferFactory bufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT); - - private final ReactiveRepository repository; - - - private final WebClient client; - - public DefaultFileManager(WebClient.Builder builder, - FileProperties properties, - ReactiveRepository repository) { - new File(properties.getStorageBasePath()).mkdirs(); - this.properties = properties; - this.client = builder - .clone() - .filter(this.properties.createWebClientRute()) - .build(); - this.repository = repository; - } - - @Override - public Mono saveFile(FilePart filePart) { - return saveFile(filePart.filename(), filePart.content()); - } - - private DataBuffer updateDigest(MessageDigest digest, DataBuffer dataBuffer) { - dataBuffer = DataBufferUtils.retain(dataBuffer); - digest.update(dataBuffer.asByteBuffer()); - DataBufferUtils.release(dataBuffer); - return dataBuffer; - } - - public Mono saveFileToCluster(String name, Flux stream) { - String serverId = properties.selectServerNode(); - MultipartBodyBuilder builder = new MultipartBodyBuilder(); - builder.asyncPart("file", stream, DataBuffer.class) - .headers(header -> header - .setContentDisposition(ContentDisposition - .builder("form-data") - .name("file") - .filename(name) - .build())) - .contentType(MediaType.APPLICATION_OCTET_STREAM); - return client - .post() - .uri("http://" + serverId + "/file/" +serverId) - .attribute(FileProperties.serverNodeIdAttr, serverId) - .contentType(MediaType.MULTIPART_FORM_DATA) - .body(BodyInserters.fromMultipartData(builder.build())) - .retrieve() - .bodyToMono(FileInfo.class); - } - - public Mono doSaveFile(String name, Flux stream) { - LocalDate now = LocalDate.now(); - FileInfo fileInfo = new FileInfo(); - fileInfo.setId(IDGenerator.MD5.generate()); - fileInfo.withFileName(name); - - String storagePath = now.format(DateTimeFormatter.BASIC_ISO_DATE) - + "/" + fileInfo.getId() + "." + fileInfo.getExtension(); - - MessageDigest md5 = DigestUtils.getMd5Digest(); - MessageDigest sha256 = DigestUtils.getSha256Digest(); - String storageBasePath = properties.getStorageBasePath(); - String serverNodeId = properties.getServerNodeId(); - Path path = Paths.get(storageBasePath, storagePath); - path.toFile().getParentFile().mkdirs(); - return stream - .map(buffer -> updateDigest(md5, updateDigest(sha256, buffer))) - .as(buf -> DataBufferUtils - .write(buf, path, - StandardOpenOption.WRITE, - StandardOpenOption.CREATE_NEW, - StandardOpenOption.TRUNCATE_EXISTING)) - .then(Mono.defer(() -> { - File savedFile = Paths.get(storageBasePath, storagePath).toFile(); - if (!savedFile.exists()) { - return Mono.error(new BusinessException("error.file_storage_failed")); - } - fileInfo.setMd5(ByteBufUtil.hexDump(md5.digest())); - fileInfo.setSha256(ByteBufUtil.hexDump(sha256.digest())); - fileInfo.setLength(savedFile.length()); - fileInfo.setCreateTime(System.currentTimeMillis()); - FileEntity entity = FileEntity.of(fileInfo, storagePath, serverNodeId); - return repository - .insert(entity) - .then(Mono.fromSupplier(entity::toInfo)); - })); - } - - @Override - public Mono saveFile(String name, Flux stream) { - if (properties.getClusterRute().isEmpty() - || properties.getClusterRute().containsKey(properties.getServerNodeId())) { - return doSaveFile(name, stream); - } - //配置里集群,但是并不支持本节点,则保存到其他节点 - return saveFileToCluster(name, stream); - } - - @Override - public Mono getFile(String id) { - return repository - .findById(id) - .map(FileEntity::toInfo); - } - - private Flux readFile(String filePath, long position) { - return DataBufferUtils - .read(new FileSystemResource(Paths.get(properties.getStorageBasePath(), filePath)), - position, - bufferFactory, - properties.getReadBufferSize()); - } - - private Flux readFile(FileEntity file, long position) { - if (Objects.equals(file.getServerNodeId(), properties.getServerNodeId())) { - return readFile(file.getStoragePath(), position); - } - return readFromAnotherServer(file, position); - } - - protected Flux readFromAnotherServer(FileEntity file, long position) { - return client - .get() - .uri("http://" + file.getServerNodeId() + "/file/{serverNodeId}/{fileId}", file.getServerNodeId(), file.getId()) - .attribute(FileProperties.serverNodeIdAttr, file.getServerNodeId()) - .headers(header -> header.setRange(Collections.singletonList(HttpRange.createByteRange(position)))) - .retrieve() - .bodyToFlux(DataBuffer.class); - } - - @Override - public Flux read(String id) { - return read(id, 0); - } - - @Override - public Flux read(String id, long position) { - return repository - .findById(id) - .flatMapMany(file -> readFile(file, position)); - } - - @Override - public Flux read(String id, Function> beforeRead) { - return repository - .findById(id) - .flatMapMany(file -> { - DefaultReaderContext context = new DefaultReaderContext(file.toInfo(), 0); - return beforeRead - .apply(context) - .thenMany(Flux.defer(() -> readFile(file, context.position))); - }); - } - - @AllArgsConstructor - private static class DefaultReaderContext implements ReaderContext { - private final FileInfo info; - private long position; - - @Override - public FileInfo info() { - return info; - } - - @Override - public void position(long position) { - this.position = position; - } - } - -} diff --git a/jetlinks-components/notify-component/notify-email/pom.xml b/jetlinks-components/notify-component/notify-email/pom.xml index 9938fc77..7d75026f 100644 --- a/jetlinks-components/notify-component/notify-email/pom.xml +++ b/jetlinks-components/notify-component/notify-email/pom.xml @@ -50,7 +50,6 @@ ${project.version} - \ No newline at end of file