diff --git a/jetlinks-components/notify-component/notify-core/pom.xml b/jetlinks-components/notify-component/notify-core/pom.xml index eb47d0c8..da176f5a 100644 --- a/jetlinks-components/notify-component/notify-core/pom.xml +++ b/jetlinks-components/notify-component/notify-core/pom.xml @@ -35,6 +35,12 @@ common-component ${project.version} + + org.jetlinks.community + gateway-component + ${project.version} + compile + diff --git a/jetlinks-components/notify-component/notify-core/src/main/java/org/jetlinks/community/notify/DefaultNotifierManager.java b/jetlinks-components/notify-component/notify-core/src/main/java/org/jetlinks/community/notify/DefaultNotifierManager.java index c1228bee..c51d2078 100644 --- a/jetlinks-components/notify-component/notify-core/src/main/java/org/jetlinks/community/notify/DefaultNotifierManager.java +++ b/jetlinks-components/notify-component/notify-core/src/main/java/org/jetlinks/community/notify/DefaultNotifierManager.java @@ -1,6 +1,7 @@ package org.jetlinks.community.notify; import lombok.extern.slf4j.Slf4j; +import org.jetlinks.community.gateway.MessageGateway; import org.springframework.beans.BeansException; import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.stereotype.Component; @@ -21,8 +22,11 @@ public class DefaultNotifierManager implements NotifierManager, BeanPostProcesso private NotifyConfigManager configManager; - public DefaultNotifierManager(NotifyConfigManager manager) { + private MessageGateway messageGateway; + + public DefaultNotifierManager(NotifyConfigManager manager, MessageGateway messageGateway) { this.configManager = manager; + this.messageGateway = messageGateway; } protected Mono getProperties(NotifyType notifyType, @@ -42,6 +46,8 @@ public class DefaultNotifierManager implements NotifierManager, BeanPostProcesso .flatMap(map -> Mono.justOrEmpty(map.get(properties.getProvider()))) .switchIfEmpty(Mono.error(new UnsupportedOperationException("不支持的服务商:" + properties.getProvider()))) .flatMap(notifierProvider -> notifierProvider.createNotifier(properties)) + //转成代理,把通知事件发送到消息网关中. + .map(notifier -> new NotifierEventDispatcher<>(messageGateway, notifier)) .flatMap(notifier -> Mono.justOrEmpty(notifiers.put(properties.getId(), notifier)) .flatMap(Notifier::close)//如果存在旧的通知器则关掉之 .onErrorContinue((err, obj) -> log.error(err.getMessage(), err))//忽略异常 diff --git a/jetlinks-components/notify-component/notify-core/src/main/java/org/jetlinks/community/notify/Notifier.java b/jetlinks-components/notify-component/notify-core/src/main/java/org/jetlinks/community/notify/Notifier.java index 36313437..a95066b1 100644 --- a/jetlinks-components/notify-component/notify-core/src/main/java/org/jetlinks/community/notify/Notifier.java +++ b/jetlinks-components/notify-component/notify-core/src/main/java/org/jetlinks/community/notify/Notifier.java @@ -17,6 +17,11 @@ import java.util.function.Consumer; */ public interface Notifier { + /** + * @return 通知器ID + */ + String getNotifierId(); + /** * 获取通知类型,如: 语音通知 * @@ -73,4 +78,7 @@ public interface Notifier { @Nonnull Mono close(); + default > R unwrap(Class type) { + return type.cast(this); + } } diff --git a/jetlinks-components/notify-component/notify-core/src/main/java/org/jetlinks/community/notify/NotifierEventDispatcher.java b/jetlinks-components/notify-component/notify-core/src/main/java/org/jetlinks/community/notify/NotifierEventDispatcher.java new file mode 100644 index 00000000..f52470c8 --- /dev/null +++ b/jetlinks-components/notify-component/notify-core/src/main/java/org/jetlinks/community/notify/NotifierEventDispatcher.java @@ -0,0 +1,27 @@ +package org.jetlinks.community.notify; + +import org.jetlinks.community.gateway.MessageGateway; +import org.jetlinks.community.notify.event.NotifierEvent; +import org.jetlinks.community.notify.template.Template; +import reactor.core.publisher.Mono; + +public class NotifierEventDispatcher extends NotifierProxy { + + private final MessageGateway gateway; + + public NotifierEventDispatcher(MessageGateway gateway, Notifier target) { + super(target); + this.gateway = gateway; + } + + @Override + protected Mono onEvent(NotifierEvent event) { + // /notify/{notifierId}/success + + return gateway + .publish(String.join("/", "/notify", event.getNotifierId(), event.isSuccess() ? "success" : "error"), event.toSerializable()) + .then(); + } + + +} diff --git a/jetlinks-components/notify-component/notify-core/src/main/java/org/jetlinks/community/notify/NotifierProxy.java b/jetlinks-components/notify-component/notify-core/src/main/java/org/jetlinks/community/notify/NotifierProxy.java new file mode 100644 index 00000000..82a85f7e --- /dev/null +++ b/jetlinks-components/notify-component/notify-core/src/main/java/org/jetlinks/community/notify/NotifierProxy.java @@ -0,0 +1,106 @@ +package org.jetlinks.community.notify; + +import lombok.AllArgsConstructor; +import org.jetlinks.core.Values; +import org.jetlinks.community.notify.event.NotifierEvent; +import org.jetlinks.community.notify.template.Template; +import reactor.core.publisher.Mono; + +import javax.annotation.Nonnull; + +@AllArgsConstructor +public abstract class NotifierProxy implements Notifier { + + private final Notifier target; + + @Override + public String getNotifierId() { + return target.getNotifierId(); + } + + @Nonnull + @Override + public NotifyType getType() { + return target.getType(); + } + + @Nonnull + @Override + public Provider getProvider() { + return target.getProvider(); + } + + @Override + public > R unwrap(Class type) { + return target.unwrap(type); + } + + @Nonnull + @Override + public Mono send(@Nonnull String templateId, Values context) { + return target + .send(templateId, context) + .switchIfEmpty(Mono.defer(() -> onSuccess(templateId, context))) + .onErrorResume(err -> onError(templateId, context, err).then(Mono.error(err))); + } + + @Nonnull + @Override + public Mono send(@Nonnull T template, @Nonnull Values context) { + return target.send(template, context) + .switchIfEmpty(Mono.defer(() -> onSuccess(template, context))) + .onErrorResume(err -> onError(template, context, err).then(Mono.error(err))); + } + + protected Mono onError(T template, Values ctx, Throwable error) { + return onEvent(NotifierEvent.builder() + .cause(error) + .context(ctx.getAllValues()) + .notifierId(getNotifierId()) + .notifyType(getType()) + .provider(getProvider()) + .template(template) + .build()); + } + + protected Mono onError(String templateId, Values ctx, Throwable error) { + return onEvent(NotifierEvent.builder() + .cause(error) + .context(ctx.getAllValues()) + .notifierId(getNotifierId()) + .notifyType(getType()) + .provider(getProvider()) + .templateId(templateId) + .build()); + } + + protected Mono onSuccess(String templateId, Values ctx) { + return onEvent(NotifierEvent.builder() + .success(true) + .context(ctx.getAllValues()) + .notifierId(getNotifierId()) + .notifyType(getType()) + .provider(getProvider()) + .templateId(templateId) + .build()); + } + + protected Mono onSuccess(T template, Values ctx) { + return onEvent(NotifierEvent.builder() + .success(true) + .context(ctx.getAllValues()) + .notifierId(getNotifierId()) + .notifyType(getType()) + .provider(getProvider()) + .template(template) + .build()); + } + + protected abstract Mono onEvent(NotifierEvent event); + + @Nonnull + @Override + public Mono close() { + return target.close(); + } +} diff --git a/jetlinks-components/notify-component/notify-core/src/main/java/org/jetlinks/community/notify/event/NotifierEvent.java b/jetlinks-components/notify-component/notify-core/src/main/java/org/jetlinks/community/notify/event/NotifierEvent.java new file mode 100644 index 00000000..e7bc2144 --- /dev/null +++ b/jetlinks-components/notify-component/notify-core/src/main/java/org/jetlinks/community/notify/event/NotifierEvent.java @@ -0,0 +1,56 @@ +package org.jetlinks.community.notify.event; + +import lombok.Builder; +import lombok.Getter; +import lombok.Setter; +import org.hswebframework.utils.StringUtils; +import org.jetlinks.community.notify.NotifyType; +import org.jetlinks.community.notify.Provider; +import org.jetlinks.community.notify.template.Template; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.Map; + +@Getter +@Setter +@Builder +public class NotifierEvent { + + private boolean success; + + @Nullable + private Throwable cause; + + @Nonnull + private String notifierId; + + @Nonnull + private NotifyType notifyType; + + @Nonnull + private Provider provider; + + @Nullable + private String templateId; + + @Nullable + private Template template; + + @Nonnull + private Map context; + + public SerializableNotifierEvent toSerializable() { + return SerializableNotifierEvent.builder() + .success(success) + .notifierId(notifierId) + .notifyType(notifyType.getId()) + .provider(provider.getId()) + .templateId(templateId) + .template(template) + .context(context) + .cause(cause != null ? StringUtils.throwable2String(cause) : "") + .errorType(cause != null ? cause.getClass().getName() : null) + .build(); + } +} diff --git a/jetlinks-components/notify-component/notify-core/src/main/java/org/jetlinks/community/notify/event/SerializableNotifierEvent.java b/jetlinks-components/notify-component/notify-core/src/main/java/org/jetlinks/community/notify/event/SerializableNotifierEvent.java new file mode 100644 index 00000000..1df7df8a --- /dev/null +++ b/jetlinks-components/notify-component/notify-core/src/main/java/org/jetlinks/community/notify/event/SerializableNotifierEvent.java @@ -0,0 +1,42 @@ +package org.jetlinks.community.notify.event; + +import lombok.Builder; +import lombok.Getter; +import lombok.Setter; +import org.jetlinks.community.notify.template.Template; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.Map; + +@Getter +@Setter +@Builder +public class SerializableNotifierEvent { + + private boolean success; + + @Nullable + private String errorType; + + @Nullable + private String cause; + + @Nonnull + private String notifierId; + + @Nonnull + private String notifyType; + + @Nonnull + private String provider; + + @Nullable + private String templateId; + + @Nullable + private Template template; + + @Nonnull + private Map context; +} diff --git a/jetlinks-components/notify-component/notify-dingtalk/src/main/java/org/jetlinks/community/notify/dingtalk/DingTalkNotifier.java b/jetlinks-components/notify-component/notify-dingtalk/src/main/java/org/jetlinks/community/notify/dingtalk/DingTalkNotifier.java index c0885034..9fc796fb 100644 --- a/jetlinks-components/notify-component/notify-dingtalk/src/main/java/org/jetlinks/community/notify/dingtalk/DingTalkNotifier.java +++ b/jetlinks-components/notify-component/notify-dingtalk/src/main/java/org/jetlinks/community/notify/dingtalk/DingTalkNotifier.java @@ -1,5 +1,6 @@ package org.jetlinks.community.notify.dingtalk; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.hswebframework.web.exception.BusinessException; import org.jetlinks.core.Values; @@ -22,24 +23,27 @@ import java.util.concurrent.atomic.AtomicReference; @Slf4j public class DingTalkNotifier extends AbstractNotifier { - private AtomicReference accessToken = new AtomicReference<>(); + private final AtomicReference accessToken = new AtomicReference<>(); private long refreshTokenTime; - private long tokenTimeOut = Duration.ofSeconds(7000).toMillis(); + private final long tokenTimeOut = Duration.ofSeconds(7000).toMillis(); - private WebClient client; + private final WebClient client; private static final String tokenApi = "https://oapi.dingtalk.com/gettoken"; private static final String notify = "https://oapi.dingtalk.com/topapi/message/corpconversation/asyncsend_v2"; - private DingTalkProperties properties; + private final DingTalkProperties properties; + @Getter + private final String notifierId; - public DingTalkNotifier(WebClient client, DingTalkProperties properties, TemplateManager templateManager) { + public DingTalkNotifier(String id,WebClient client, DingTalkProperties properties, TemplateManager templateManager) { super(templateManager); this.client = client; this.properties = properties; + this.notifierId = id; } @Nonnull diff --git a/jetlinks-components/notify-component/notify-dingtalk/src/main/java/org/jetlinks/community/notify/dingtalk/DingTalkNotifierProvider.java b/jetlinks-components/notify-component/notify-dingtalk/src/main/java/org/jetlinks/community/notify/dingtalk/DingTalkNotifierProvider.java index 0cf71647..c265708f 100644 --- a/jetlinks-components/notify-component/notify-dingtalk/src/main/java/org/jetlinks/community/notify/dingtalk/DingTalkNotifierProvider.java +++ b/jetlinks-components/notify-component/notify-dingtalk/src/main/java/org/jetlinks/community/notify/dingtalk/DingTalkNotifierProvider.java @@ -30,15 +30,15 @@ public class DingTalkNotifierProvider implements NotifierProvider, TemplateProvi } public static final DefaultConfigMetadata notifierConfig = new DefaultConfigMetadata("通知配置", "") - .add("appKey", "appKey", "", new StringType().expand(ConfigMetadataConstants.required.value(true))) - .add("appSecret", "appSecret", "", new StringType()); + .add("appKey", "appKey", "", new StringType().expand(ConfigMetadataConstants.required.value(true))) + .add("appSecret", "appSecret", "", new StringType()); public static final DefaultConfigMetadata templateConfig = new DefaultConfigMetadata("模版配置", "") - .add("agentId", "应用ID", "", new StringType().expand(ConfigMetadataConstants.required.value(true))) - .add("userIdList", "收信人ID", "与部门ID不能同时为空", new StringType()) - .add("departmentIdList", "收信部门ID", "与收信人ID不能同时为空", new StringType()) - .add("toAllUser", "全部用户", "推送到全部用户", new BooleanType()) - .add("message", "内容", "最大不超过500字", new StringType().expand(ConfigMetadataConstants.maxLength.value(500L))); + .add("agentId", "应用ID", "", new StringType().expand(ConfigMetadataConstants.required.value(true))) + .add("userIdList", "收信人ID", "与部门ID不能同时为空", new StringType()) + .add("departmentIdList", "收信部门ID", "与收信人ID不能同时为空", new StringType()) + .add("toAllUser", "全部用户", "推送到全部用户", new BooleanType()) + .add("message", "内容", "最大不超过500字", new StringType().expand(ConfigMetadataConstants.maxLength.value(500L))); @Nonnull @Override @@ -64,7 +64,7 @@ public class DingTalkNotifierProvider implements NotifierProvider, TemplateProvi public Mono createNotifier(@Nonnull NotifierProperties properties) { return Mono.defer(() -> { DingTalkProperties dingTalkProperties = FastBeanCopier.copy(properties.getConfiguration(), new DingTalkProperties()); - return Mono.just(new DingTalkNotifier(client, ValidatorUtils.tryValidate(dingTalkProperties), templateManager)); + return Mono.just(new DingTalkNotifier(properties.getId(), client, ValidatorUtils.tryValidate(dingTalkProperties), templateManager)); }); } diff --git a/jetlinks-components/notify-component/notify-dingtalk/src/test/java/org/jetlinks/community/notify/dingtalk/DingTalkNotifierTest.java b/jetlinks-components/notify-component/notify-dingtalk/src/test/java/org/jetlinks/community/notify/dingtalk/DingTalkNotifierTest.java index 31951101..a105ca44 100644 --- a/jetlinks-components/notify-component/notify-dingtalk/src/test/java/org/jetlinks/community/notify/dingtalk/DingTalkNotifierTest.java +++ b/jetlinks-components/notify-component/notify-dingtalk/src/test/java/org/jetlinks/community/notify/dingtalk/DingTalkNotifierTest.java @@ -27,7 +27,7 @@ class DingTalkNotifierTest { messageTemplate.setMessage("test"+System.currentTimeMillis()); messageTemplate.setUserIdList("0458215455697857"); - DingTalkNotifier notifier=new DingTalkNotifier( + DingTalkNotifier notifier=new DingTalkNotifier("test", WebClient.builder().build(),properties,null ); diff --git a/jetlinks-components/notify-component/notify-email/src/main/java/org/jetlinks/community/notify/email/embedded/DefaultEmailNotifier.java b/jetlinks-components/notify-component/notify-email/src/main/java/org/jetlinks/community/notify/email/embedded/DefaultEmailNotifier.java index b46ee1cf..06a2f57d 100644 --- a/jetlinks-components/notify-component/notify-email/src/main/java/org/jetlinks/community/notify/email/embedded/DefaultEmailNotifier.java +++ b/jetlinks-components/notify-component/notify-email/src/main/java/org/jetlinks/community/notify/email/embedded/DefaultEmailNotifier.java @@ -59,10 +59,15 @@ public class DefaultEmailNotifier extends AbstractNotifier { @Setter private String sender; + @Getter + private final String notifierId; + + public static Scheduler scheduler = Schedulers.newElastic("email-notifier"); public DefaultEmailNotifier(NotifierProperties properties, TemplateManager templateManager) { super(templateManager); + notifierId = properties.getId(); DefaultEmailProperties emailProperties = new JSONObject(properties.getConfiguration()) .toJavaObject(DefaultEmailProperties.class); diff --git a/jetlinks-components/notify-component/notify-sms/src/main/java/org/jetlinks/community/notify/sms/provider/Hy2046SmsSenderProvider.java b/jetlinks-components/notify-component/notify-sms/src/main/java/org/jetlinks/community/notify/sms/provider/Hy2046SmsSenderProvider.java index 85049c6b..a468c3e7 100644 --- a/jetlinks-components/notify-component/notify-sms/src/main/java/org/jetlinks/community/notify/sms/provider/Hy2046SmsSenderProvider.java +++ b/jetlinks-components/notify-component/notify-sms/src/main/java/org/jetlinks/community/notify/sms/provider/Hy2046SmsSenderProvider.java @@ -1,6 +1,7 @@ package org.jetlinks.community.notify.sms.provider; import com.alibaba.fastjson.JSON; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.codec.digest.DigestUtils; import org.jetlinks.core.Values; @@ -84,7 +85,7 @@ public class Hy2046SmsSenderProvider implements NotifierProvider, TemplateProvid Assert.hasText(userId, "短信配置错误,缺少userId"); Assert.hasText(username, "短信配置错误,缺少username"); Assert.hasText(password, "短信配置错误,缺少password"); - return Mono.just(new Hy2046SmsSender(userId, username, password)); + return Mono.just(new Hy2046SmsSender(properties.getId(),userId, username, password)); }); } @@ -103,10 +104,13 @@ public class Hy2046SmsSenderProvider implements NotifierProvider, TemplateProvid String userId; String username; String password; + @Getter + private final String notifierId; - public Hy2046SmsSender(String userId, String username, String password) { + public Hy2046SmsSender(String id,String userId, String username, String password) { super(templateManager); this.userId = userId; + this.notifierId = id; this.username = username; this.password = password; } diff --git a/jetlinks-components/notify-component/notify-sms/src/main/java/org/jetlinks/community/notify/sms/provider/TestSmsProvider.java b/jetlinks-components/notify-component/notify-sms/src/main/java/org/jetlinks/community/notify/sms/provider/TestSmsProvider.java index 483d5662..48348e57 100644 --- a/jetlinks-components/notify-component/notify-sms/src/main/java/org/jetlinks/community/notify/sms/provider/TestSmsProvider.java +++ b/jetlinks-components/notify-component/notify-sms/src/main/java/org/jetlinks/community/notify/sms/provider/TestSmsProvider.java @@ -69,6 +69,11 @@ public class TestSmsProvider extends AbstractNotifier impl return Mono.just(this); } + @Override + public String getNotifierId() { + return "test-sms-notify"; + } + @Override public String getId() { return "test"; diff --git a/jetlinks-components/notify-component/notify-voice/src/main/java/org/jetlinks/community/notify/voice/aliyun/AliyunVoiceNotifier.java b/jetlinks-components/notify-component/notify-voice/src/main/java/org/jetlinks/community/notify/voice/aliyun/AliyunVoiceNotifier.java index aabd5a86..ccb4769c 100644 --- a/jetlinks-components/notify-component/notify-voice/src/main/java/org/jetlinks/community/notify/voice/aliyun/AliyunVoiceNotifier.java +++ b/jetlinks-components/notify-component/notify-voice/src/main/java/org/jetlinks/community/notify/voice/aliyun/AliyunVoiceNotifier.java @@ -9,6 +9,7 @@ import com.aliyuncs.IAcsClient; import com.aliyuncs.http.MethodType; import com.aliyuncs.profile.DefaultProfile; import com.aliyuncs.profile.IClientProfile; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.hswebframework.web.exception.BusinessException; import org.hswebframework.web.logger.ReactiveLogger; @@ -25,14 +26,17 @@ import java.util.Objects; @Slf4j public class AliyunVoiceNotifier extends AbstractNotifier { - private IAcsClient client; + private final IAcsClient client; private String domain = "dyvmsapi.aliyuncs.com"; private String regionId = "cn-hangzhou"; - private int connectTimeout = 1000; - private int readTimeout = 5000; + private final int connectTimeout = 1000; + private final int readTimeout = 5000; + @Getter + private String notifierId; public AliyunVoiceNotifier(NotifierProperties profile, TemplateManager templateManager) { super(templateManager); + this.notifierId = profile.getId(); Map config = profile.getConfiguration(); DefaultProfile defaultProfile = DefaultProfile.getProfile( this.regionId = (String) Objects.requireNonNull(config.get("regionId"), "regionId不能为空"), diff --git a/jetlinks-components/notify-component/notify-wechat/src/main/java/org/jetlinks/community/notify/wechat/WechatNotifierProvider.java b/jetlinks-components/notify-component/notify-wechat/src/main/java/org/jetlinks/community/notify/wechat/WechatNotifierProvider.java index 2a246beb..35cff8b9 100644 --- a/jetlinks-components/notify-component/notify-wechat/src/main/java/org/jetlinks/community/notify/wechat/WechatNotifierProvider.java +++ b/jetlinks-components/notify-component/notify-wechat/src/main/java/org/jetlinks/community/notify/wechat/WechatNotifierProvider.java @@ -1,6 +1,7 @@ package org.jetlinks.community.notify.wechat; import com.alibaba.fastjson.JSON; +import lombok.Getter; import org.hswebframework.web.bean.FastBeanCopier; import org.hswebframework.web.validator.ValidatorUtils; import org.jetlinks.core.metadata.ConfigMetadata; @@ -23,7 +24,6 @@ public class WechatNotifierProvider implements NotifierProvider, TemplateProvide private WebClient client = WebClient.create(); private final TemplateManager templateManager; - public WechatNotifierProvider(TemplateManager templateManager) { this.templateManager = templateManager; } @@ -61,7 +61,7 @@ public class WechatNotifierProvider implements NotifierProvider, TemplateProvide public Mono createNotifier(@Nonnull NotifierProperties properties) { return Mono.defer(() -> { WechatCorpProperties wechatCorpProperties = FastBeanCopier.copy(properties.getConfiguration(), new WechatCorpProperties()); - return Mono.just(new WeixinCorpNotifier(client, ValidatorUtils.tryValidate(wechatCorpProperties), templateManager)); + return Mono.just(new WeixinCorpNotifier(properties.getId(),client, ValidatorUtils.tryValidate(wechatCorpProperties), templateManager)); }); } diff --git a/jetlinks-components/notify-component/notify-wechat/src/main/java/org/jetlinks/community/notify/wechat/WeixinCorpNotifier.java b/jetlinks-components/notify-component/notify-wechat/src/main/java/org/jetlinks/community/notify/wechat/WeixinCorpNotifier.java index 6f90a186..dcb90924 100644 --- a/jetlinks-components/notify-component/notify-wechat/src/main/java/org/jetlinks/community/notify/wechat/WeixinCorpNotifier.java +++ b/jetlinks-components/notify-component/notify-wechat/src/main/java/org/jetlinks/community/notify/wechat/WeixinCorpNotifier.java @@ -1,5 +1,6 @@ package org.jetlinks.community.notify.wechat; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.hswebframework.web.exception.BusinessException; import org.jetlinks.core.Values; @@ -22,24 +23,27 @@ import java.util.concurrent.atomic.AtomicReference; @Slf4j public class WeixinCorpNotifier extends AbstractNotifier { - private AtomicReference accessToken = new AtomicReference<>(); + private final AtomicReference accessToken = new AtomicReference<>(); private long refreshTokenTime; - private long tokenTimeOut = Duration.ofSeconds(7000).toMillis(); + private final long tokenTimeOut = Duration.ofSeconds(7000).toMillis(); - private WebClient client; + private final WebClient client; private static final String tokenApi = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"; private static final String notify = "https://qyapi.weixin.qq.com/cgi-bin/message/send"; - private WechatCorpProperties properties; + private final WechatCorpProperties properties; + @Getter + private final String notifierId; - public WeixinCorpNotifier(WebClient client, WechatCorpProperties properties, TemplateManager templateManager) { + public WeixinCorpNotifier(String id,WebClient client, WechatCorpProperties properties, TemplateManager templateManager) { super(templateManager); this.client = client; this.properties = properties; + this.notifierId = id; } @Nonnull diff --git a/jetlinks-components/notify-component/notify-wechat/src/test/java/org/jetlinks/community/notify/wechat/WeixinCorpNotifierTest.java b/jetlinks-components/notify-component/notify-wechat/src/test/java/org/jetlinks/community/notify/wechat/WeixinCorpNotifierTest.java index cd142e37..eca830ab 100644 --- a/jetlinks-components/notify-component/notify-wechat/src/test/java/org/jetlinks/community/notify/wechat/WeixinCorpNotifierTest.java +++ b/jetlinks-components/notify-component/notify-wechat/src/test/java/org/jetlinks/community/notify/wechat/WeixinCorpNotifierTest.java @@ -26,7 +26,7 @@ class WeixinCorpNotifierTest { messageTemplate.setMessage("test"+System.currentTimeMillis()); messageTemplate.setToUser("userId"); - WeixinCorpNotifier notifier=new WeixinCorpNotifier( + WeixinCorpNotifier notifier=new WeixinCorpNotifier("test", WebClient.builder().build(),properties,null ); diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DeviceStateInfo.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DeviceStateInfo.java new file mode 100644 index 00000000..8f5c61aa --- /dev/null +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DeviceStateInfo.java @@ -0,0 +1,17 @@ +package org.jetlinks.community.device.entity; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import org.jetlinks.community.device.enums.DeviceState; + +@Getter +@Setter +@AllArgsConstructor(staticName = "of") +@NoArgsConstructor +public class DeviceStateInfo { + private String deviceId; + + private DeviceState state; +} diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceBatchOperationSubscriptionProvider.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceBatchOperationSubscriptionProvider.java new file mode 100644 index 00000000..63c803e7 --- /dev/null +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceBatchOperationSubscriptionProvider.java @@ -0,0 +1,81 @@ +package org.jetlinks.community.device.message; + +import com.alibaba.fastjson.JSON; +import org.hswebframework.web.api.crud.entity.QueryParamEntity; +import org.jetlinks.community.device.entity.DeviceInstanceEntity; +import org.jetlinks.community.device.service.LocalDeviceInstanceService; +import org.jetlinks.community.gateway.external.SubscribeRequest; +import org.jetlinks.community.gateway.external.SubscriptionProvider; +import org.jetlinks.supports.utils.MqttTopicUtils; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; + +import java.util.Map; + +@Component +public class DeviceBatchOperationSubscriptionProvider implements SubscriptionProvider { + + private final LocalDeviceInstanceService instanceService; + + public DeviceBatchOperationSubscriptionProvider(LocalDeviceInstanceService instanceService) { + this.instanceService = instanceService; + } + + @Override + public String id() { + return "device-batch-operator"; + } + + @Override + public String name() { + return "设备批量操作"; + } + + @Override + public String[] getTopicPattern() { + return new String[]{ + "/device-batch/*" + }; + } + + @Override + public Flux subscribe(SubscribeRequest request) { + String topic = request.getTopic(); + + QueryParamEntity queryParamEntity = request.getString("queryJson") + .map(json -> JSON.parseObject(json, QueryParamEntity.class)) + .orElseGet(QueryParamEntity::new); + + Map var = MqttTopicUtils.getPathVariables("/device-batch/{type}", topic); + String type = var.get("type"); + + switch (type) { + case "state-sync": + return handleStateSync(queryParamEntity); + case "deploy": + return handleDeploy(queryParamEntity); + + default: + return Flux.error(new IllegalArgumentException("不支持的类型:" + type)); + } + + } + + private Flux handleDeploy(QueryParamEntity queryParamEntity) { + + return instanceService + .query(queryParamEntity.noPaging().includes("id")) + .as(instanceService::deploy); + } + + private Flux handleStateSync(QueryParamEntity queryParamEntity) { + + return instanceService.query(queryParamEntity.noPaging().includes("id")) + .map(DeviceInstanceEntity::getId) + .buffer(200) + .publishOn(Schedulers.single()) + .concatMap(flux -> instanceService.syncStateBatch(Flux.just(flux), true)) + .flatMap(Flux::fromIterable); + } +} diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceInstanceService.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceInstanceService.java index f1cc06e5..aee73050 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceInstanceService.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/LocalDeviceInstanceService.java @@ -352,7 +352,7 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService Mono.justOrEmpty(DeviceMessageUtils.convert(message)) .map(DeviceMessage::getDeviceId)), 800, 200, Duration.ofSeconds(2)) .publishOn(Schedulers.parallel()) - .concatMap(list -> syncStateBatch(Flux.just(list), false).reduce(Math::addExact)) + .concatMap(list -> syncStateBatch(Flux.just(list), false).map(List::size)) .onErrorContinue((err, obj) -> log.error(err.getMessage(), err)) .subscribe((i) -> log.info("同步设备状态成功:{}", i)); } @@ -364,7 +364,7 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService syncStateBatch(Flux> batch, boolean force) { + public Flux> syncStateBatch(Flux> batch, boolean force) { return batch .concatMap(list -> Flux.fromIterable(list) @@ -384,6 +384,7 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService { + List deviceId=group.getValue().stream().map(Tuple3::getT2).collect(Collectors.toList()); DeviceState state = DeviceState.of(group.getKey()); return Mono.zip( //批量修改设备状态 @@ -391,10 +392,9 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudServiceDeviceStateInfo.of(id,state)).collect(Collectors.toList())); })); } diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceInstanceController.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceInstanceController.java index 581e5147..1fa165a7 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceInstanceController.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/web/DeviceInstanceController.java @@ -168,7 +168,7 @@ public class DeviceInstanceController implements .map(DeviceInstanceEntity::getId) .buffer(200) .publishOn(Schedulers.single()) - .concatMap(flux -> service.syncStateBatch(Flux.just(flux), true)) + .concatMap(flux -> service.syncStateBatch(Flux.just(flux), true).map(List::size)) .defaultIfEmpty(0); } diff --git a/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/entity/NotifyHistoryEntity.java b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/entity/NotifyHistoryEntity.java new file mode 100644 index 00000000..9e6beffb --- /dev/null +++ b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/entity/NotifyHistoryEntity.java @@ -0,0 +1,89 @@ +package org.jetlinks.community.notify.manager.entity; + +import com.alibaba.fastjson.JSON; +import lombok.Getter; +import lombok.Setter; +import org.hswebframework.ezorm.rdb.mapping.annotation.ColumnType; +import org.hswebframework.ezorm.rdb.mapping.annotation.DefaultValue; +import org.hswebframework.ezorm.rdb.mapping.annotation.EnumCodec; +import org.hswebframework.ezorm.rdb.mapping.annotation.JsonCodec; +import org.hswebframework.web.api.crud.entity.GenericEntity; +import org.hswebframework.web.bean.FastBeanCopier; +import org.hswebframework.web.crud.generator.Generators; +import org.jetlinks.community.notify.event.SerializableNotifierEvent; +import org.jetlinks.community.notify.manager.enums.NotifyState; + +import javax.persistence.Column; +import javax.persistence.Index; +import javax.persistence.Table; +import java.sql.JDBCType; +import java.util.Date; +import java.util.Map; + +@Table(name = "notify_history", indexes = { + @Index(name = "idx_nt_his_notifier_id", columnList = "notifier_id") +}) +@Getter +@Setter +public class NotifyHistoryEntity extends GenericEntity { + + private static final long serialVersionUID = -6849794470754667710L; + + @Column(length = 32, nullable = false, updatable = false) + private String notifierId; + + @Column(nullable = false) + @DefaultValue("success") + @EnumCodec + @ColumnType(javaType = String.class) + private NotifyState state; + + @Column(length = 1024) + private String errorType; + + @Column + @ColumnType(jdbcType = JDBCType.CLOB, javaType = String.class) + private String errorStack; + + @Column(length = 32, nullable = false) + @DefaultValue("-") + private String templateId; + + @Column + @ColumnType(jdbcType = JDBCType.CLOB, javaType = String.class) + private String template; + + @Column + @ColumnType(jdbcType = JDBCType.CLOB, javaType = String.class) + @JsonCodec + private Map context; + + @Column(length = 32, nullable = false) + private String provider; + + @Column(length = 32, nullable = false) + private String notifyType; + + @Column + @DefaultValue(generator = Generators.CURRENT_TIME) + private Date notifyTime; + + @Column + @DefaultValue("0") + private Integer retryTimes; + + public static NotifyHistoryEntity of(SerializableNotifierEvent event) { + NotifyHistoryEntity entity = FastBeanCopier.copy(event, new NotifyHistoryEntity()); + if (null != event.getTemplate()) { + entity.setTemplate(JSON.toJSONString(event.getTemplate())); + } + if (event.isSuccess()) { + entity.setState(NotifyState.success); + } else { + entity.setErrorStack(event.getCause()); + entity.setState(NotifyState.error); + } + return entity; + } + +} diff --git a/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/enums/NotifyState.java b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/enums/NotifyState.java new file mode 100644 index 00000000..36b32506 --- /dev/null +++ b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/enums/NotifyState.java @@ -0,0 +1,20 @@ +package org.jetlinks.community.notify.manager.enums; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import org.hswebframework.web.dict.EnumDict; + +@Getter +@AllArgsConstructor +public enum NotifyState implements EnumDict { + + success("成功"), + error("失败"); + + private final String text; + + @Override + public String getValue() { + return name(); + } +} diff --git a/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/service/NotifyHistoryService.java b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/service/NotifyHistoryService.java new file mode 100644 index 00000000..b5de315b --- /dev/null +++ b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/service/NotifyHistoryService.java @@ -0,0 +1,22 @@ +package org.jetlinks.community.notify.manager.service; + +import org.hswebframework.web.crud.service.GenericReactiveCrudService; +import org.jetlinks.community.gateway.annotation.Subscribe; +import org.jetlinks.community.notify.event.SerializableNotifierEvent; +import org.jetlinks.community.notify.manager.entity.NotifyHistoryEntity; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; +import reactor.core.publisher.Mono; + +@Service +public class NotifyHistoryService extends GenericReactiveCrudService { + + + @Subscribe("/notify/**") + @Transactional(propagation = Propagation.NEVER) + public Mono handleNotify(SerializableNotifierEvent event) { + return insert(Mono.just(NotifyHistoryEntity.of(event))).then(); + } + +} diff --git a/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/web/NotifierHistoryController.java b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/web/NotifierHistoryController.java new file mode 100644 index 00000000..820d90e2 --- /dev/null +++ b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/web/NotifierHistoryController.java @@ -0,0 +1,25 @@ +package org.jetlinks.community.notify.manager.web; + +import org.hswebframework.web.authorization.annotation.Resource; +import org.hswebframework.web.crud.web.reactive.ReactiveServiceQueryController; +import org.jetlinks.community.notify.manager.entity.NotifyHistoryEntity; +import org.jetlinks.community.notify.manager.service.NotifyHistoryService; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequestMapping("/notify/history") +@Resource(id = "notifier", name = "通知管理") +public class NotifierHistoryController implements ReactiveServiceQueryController { + + private final NotifyHistoryService historyService; + + public NotifierHistoryController(NotifyHistoryService historyService) { + this.historyService = historyService; + } + + @Override + public NotifyHistoryService getService() { + return historyService; + } +} diff --git a/jetlinks-standalone/src/main/resources/application-embedded.yml b/jetlinks-standalone/src/main/resources/application-embedded.yml index 29e4bc73..dc2c966a 100644 --- a/jetlinks-standalone/src/main/resources/application-embedded.yml +++ b/jetlinks-standalone/src/main/resources/application-embedded.yml @@ -5,10 +5,10 @@ spring: embedded: enabled: true # 使用内置的redis,不建议在生产环境中使用. host: 127.0.0.1 - port: 6379 + port: 6370 data-path: ./data/redis host: 127.0.0.1 - port: 6379 + port: 6370 lettuce: pool: max-active: 1024 @@ -26,11 +26,11 @@ elasticsearch: embedded: enabled: true # 为true时使用内嵌的elasticsearch,不建议在生产环境中使用 data-path: ./data/elasticsearch - port: 9200 + port: 9201 host: 0.0.0.0 client: host: localhost - port: 9200 + port: 9201 max-conn-total: 128 connect-timeout: 5000 socket-timeout: 5000