diff --git a/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/entity/Notification.java b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/entity/Notification.java index 88e5e54a..9df48de7 100644 --- a/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/entity/Notification.java +++ b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/entity/Notification.java @@ -7,6 +7,7 @@ import org.hswebframework.web.id.IDGenerator; import org.jetlinks.community.notify.manager.subscriber.Notify; import java.io.Serializable; +import java.util.List; @Getter @Setter @@ -27,10 +28,17 @@ public class Notification implements Serializable { private String message; + + private Object detail; + + private String code; + private String dataId; private long notifyTime; + private List notifyChannels; + public static Notification from(NotifySubscriberEntity entity) { Notification notification = new Notification(); @@ -39,6 +47,7 @@ public class Notification implements Serializable { notification.subscriber = entity.getSubscriber(); notification.topicName = entity.getTopicName(); notification.setTopicProvider(entity.getTopicProvider()); + notification.setNotifyChannels(entity.getNotifyChannels()); return notification; } @@ -49,6 +58,8 @@ public class Notification implements Serializable { target.setMessage(message.getMessage()); target.setDataId(message.getDataId()); target.setNotifyTime(message.getNotifyTime()); + target.setDetail(message.getDetail()); + target.setCode(message.getCode()); return target; } diff --git a/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/entity/NotificationEntity.java b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/entity/NotificationEntity.java index 731aaa9d..62cc0375 100644 --- a/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/entity/NotificationEntity.java +++ b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/entity/NotificationEntity.java @@ -4,12 +4,14 @@ import io.swagger.v3.oas.annotations.Hidden; import io.swagger.v3.oas.annotations.media.Schema; import lombok.Getter; import lombok.Setter; +import lombok.SneakyThrows; import org.hswebframework.ezorm.rdb.mapping.annotation.ColumnType; import org.hswebframework.ezorm.rdb.mapping.annotation.DefaultValue; import org.hswebframework.ezorm.rdb.mapping.annotation.EnumCodec; import org.hswebframework.web.api.crud.entity.GenericEntity; import org.hswebframework.web.bean.FastBeanCopier; import org.jetlinks.community.notify.manager.enums.NotificationState; +import org.jetlinks.community.utils.ObjectMappers; import javax.persistence.Column; import javax.persistence.Index; @@ -58,6 +60,16 @@ public class NotificationEntity extends GenericEntity { @Schema(description = "通知时间") private Long notifyTime; + @Column(length = 128) + @Schema(description = "通知编码") + private String code; + + @Column + @Schema(description = "详情") + @ColumnType(jdbcType = JDBCType.CLOB, javaType = String.class) + private String detailJson; + + @Column(length = 32) @EnumCodec @DefaultValue("unread") @@ -69,7 +81,15 @@ public class NotificationEntity extends GenericEntity { @Schema(description = "说明") private String description; + @SneakyThrows public static NotificationEntity from(Notification notification) { - return FastBeanCopier.copy(notification, new NotificationEntity()); + NotificationEntity entity = FastBeanCopier.copy(notification, new NotificationEntity()); + Object detail = notification.getDetail(); + + entity.setCode(notification.getCode()); + if (detail != null) { + entity.setDetailJson(ObjectMappers.JSON_MAPPER.writeValueAsString(detail)); + } + return entity; } } diff --git a/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/entity/NotifyChannelEntity.java b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/entity/NotifyChannelEntity.java new file mode 100644 index 00000000..eb3dc492 --- /dev/null +++ b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/entity/NotifyChannelEntity.java @@ -0,0 +1,87 @@ +package org.jetlinks.community.notify.manager.entity; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Getter; +import lombok.Setter; +import org.hswebframework.ezorm.rdb.mapping.annotation.ColumnType; +import org.hswebframework.ezorm.rdb.mapping.annotation.DefaultValue; +import org.hswebframework.ezorm.rdb.mapping.annotation.EnumCodec; +import org.hswebframework.ezorm.rdb.mapping.annotation.JsonCodec; +import org.hswebframework.web.api.crud.entity.GenericEntity; +import org.hswebframework.web.validator.CreateGroup; +//import org.jetlinks.community.authorize.AuthenticationSpec; +import org.jetlinks.community.notify.manager.enums.NotifyChannelState; +import org.jetlinks.community.notify.manager.subscriber.SubscriberProvider; +import org.jetlinks.community.notify.manager.subscriber.channel.NotifyChannelProvider; + +import javax.persistence.Column; +import javax.persistence.Table; +import javax.validation.constraints.NotBlank; +import java.sql.JDBCType; +import java.util.Map; + +/** + * 通知通道(配置). + * 用于定义哪些权限范围(grant),哪种主题(topicProvider),支持何种方式(channel)进行通知 + *

+ * 比如: 管理员角色的用户可以使用邮件通知,但是普通用户只能使用站内信通知. + * + * @author zhouhao + * @since 2.0 + */ +@Table(name = "notify_channel") +@Getter +@Setter +@Schema(description = "通知通道(配置)") +public class NotifyChannelEntity extends GenericEntity { + + @Column(nullable = false, length = 32) + @NotBlank(groups = CreateGroup.class) + @Schema(description = "名称") + private String name; + +// @Column +// @JsonCodec +// @ColumnType(jdbcType = JDBCType.LONGVARCHAR, javaType = String.class) +// @Schema(description = "权限范围") +// private AuthenticationSpec grant; + + /** + * @see SubscriberProvider#getId() + */ + @Column(nullable = false, length = 32, updatable = false) + @NotBlank(groups = CreateGroup.class) + @Schema(description = "主题提供商标识") + private String topicProvider; + + @Column(nullable = false, length = 32) + @NotBlank(groups = CreateGroup.class) + @Schema(description = "主题提供商名称") + private String topicName; + + /** + * @see NotifyChannelProvider#getId() + */ + @Column(nullable = false, length = 32, updatable = false) + @NotBlank(groups = CreateGroup.class) + @Schema(description = "通知类型") + private String channelProvider; + + /** + * @see NotifyChannelProvider#createChannel(Map) + * @see org.jetlinks.community.notify.manager.subscriber.channel.notifiers.NotifierChannelProvider.NotifyChannelConfig + */ + @Column + @JsonCodec + @ColumnType(jdbcType = JDBCType.LONGVARCHAR, javaType = String.class) + @Schema(description = "通知配置") + private Map channelConfiguration; + + @Column(length = 32) + @EnumCodec + @ColumnType(javaType = String.class) + @DefaultValue("enabled") + @Schema(description = "状态") + private NotifyChannelState state; + +} diff --git a/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/entity/NotifySubscriberEntity.java b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/entity/NotifySubscriberEntity.java index fc1507c3..a35a43e3 100644 --- a/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/entity/NotifySubscriberEntity.java +++ b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/entity/NotifySubscriberEntity.java @@ -12,6 +12,8 @@ import org.jetlinks.community.notify.manager.enums.SubscribeState; import javax.persistence.Column; import javax.persistence.Index; import javax.persistence.Table; +import java.util.List; +import java.util.Locale; import java.util.Map; /** @@ -75,5 +77,22 @@ public class NotifySubscriberEntity extends GenericEntity { @Schema(description = "状态.") private SubscribeState state; + @Column(length = 32) + @Schema(description = "订阅语言") + private String locale; + + /** + * @see NotifyChannelEntity#getId() + */ + @Column(length = 3000) + @Schema(description = "通知方式") + @JsonCodec + @ColumnType(javaType = String.class) + private List notifyChannels; + + + public Locale toLocale() { + return locale == null ? Locale.getDefault() : Locale.forLanguageTag(locale); + } } diff --git a/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/enums/NotifyChannelState.java b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/enums/NotifyChannelState.java new file mode 100644 index 00000000..d158f356 --- /dev/null +++ b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/enums/NotifyChannelState.java @@ -0,0 +1,19 @@ +package org.jetlinks.community.notify.manager.enums; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import org.hswebframework.web.dict.EnumDict; + +@Getter +@AllArgsConstructor +public enum NotifyChannelState implements EnumDict { + enabled("正常"), + disabled("禁用"); + + private final String text; + + @Override + public String getValue() { + return name(); + } +} diff --git a/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/service/NotifySubscriberService.java b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/service/NotifySubscriberService.java index 7c0a9ba4..35fd9d10 100644 --- a/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/service/NotifySubscriberService.java +++ b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/service/NotifySubscriberService.java @@ -1,23 +1,28 @@ package org.jetlinks.community.notify.manager.service; + import lombok.extern.slf4j.Slf4j; import org.hswebframework.web.authorization.ReactiveAuthenticationHolder; -import org.hswebframework.web.crud.events.EntityCreatedEvent; -import org.hswebframework.web.crud.events.EntityDeletedEvent; -import org.hswebframework.web.crud.events.EntityModifyEvent; -import org.hswebframework.web.crud.events.EntitySavedEvent; +import org.hswebframework.web.crud.events.*; import org.hswebframework.web.crud.service.GenericReactiveCrudService; +import org.hswebframework.web.exception.BusinessException; +import org.hswebframework.web.i18n.LocaleUtils; +import org.jetlinks.core.cluster.ClusterManager; +import org.jetlinks.core.event.EventBus; +import org.jetlinks.community.gateway.annotation.Subscribe; import org.jetlinks.community.notify.manager.entity.Notification; import org.jetlinks.community.notify.manager.entity.NotifySubscriberEntity; import org.jetlinks.community.notify.manager.enums.SubscribeState; import org.jetlinks.community.notify.manager.subscriber.SubscriberProvider; -import org.jetlinks.core.cluster.ClusterManager; -import org.jetlinks.core.event.EventBus; +import org.jetlinks.community.notify.manager.subscriber.SubscriberProviders; +import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.CommandLineRunner; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import reactor.core.Disposable; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.util.List; @@ -31,19 +36,20 @@ public class NotifySubscriberService extends GenericReactiveCrudService providers = new ConcurrentHashMap<>(); private final Map subscribers = new ConcurrentHashMap<>(); + private final ApplicationEventPublisher eventPublisher; + public NotifySubscriberService(EventBus eventBus, - ClusterManager clusterManager, - List providers) { + ObjectProvider providers, + ApplicationEventPublisher eventPublisher) { this.eventBus = eventBus; - this.clusterManager = clusterManager; + this.eventPublisher=eventPublisher; for (SubscriberProvider provider : providers) { this.providers.put(provider.getId(), provider); + SubscriberProviders.register(provider); } } @@ -51,69 +57,109 @@ public class NotifySubscriberService extends GenericReactiveCrudServicegetTopic("notification-changed") - .subscribe() - .subscribe(this::handleSubscribe); + protected Mono doNotifyChange(NotifySubscriberEntity entity) { + return eventBus + .publish("/notification-changed", entity) + .then(); + } - protected void doNotifyChange(NotifySubscriberEntity entity) { - clusterManager.getTopic("notification-changed") - .publish(Mono.just(entity)) - .retry(3) - .subscribe(); + @EventListener + public void handleEvent(EntityPrepareCreateEvent entity) { + //填充语言 + entity.async( + LocaleUtils + .currentReactive() + .doOnNext(locale -> { + for (NotifySubscriberEntity subscriber : entity.getEntity()) { + if (subscriber.getLocale() == null) { + subscriber.setLocale(locale.toLanguageTag()); + } + } + }) + ); + } + + @EventListener + public void handleEvent(EntityPrepareSaveEvent entity) { + //填充语言 + entity.async( + LocaleUtils + .currentReactive() + .doOnNext(locale -> { + for (NotifySubscriberEntity subscriber : entity.getEntity()) { + if (subscriber.getLocale() == null) { + subscriber.setLocale(locale.toLanguageTag()); + } + } + }) + ); } @EventListener public void handleEvent(EntityCreatedEvent entity) { - entity.getEntity().forEach(this::doNotifyChange); + entity.async( + Flux.fromIterable(entity.getEntity()) + .flatMap(this::doNotifyChange) + ); } @EventListener public void handleEvent(EntitySavedEvent entity) { - entity.getEntity().forEach(this::doNotifyChange); + entity.async( + Flux.fromIterable(entity.getEntity()) + .flatMap(this::doNotifyChange) + ); } @EventListener public void handleEvent(EntityDeletedEvent entity) { - entity.getEntity().forEach(e -> { - e.setState(SubscribeState.disabled); - doNotifyChange(e); - }); + entity.async( + Flux.fromIterable(entity.getEntity()) + .doOnNext(e -> e.setState(SubscribeState.disabled)) + .flatMap(this::doNotifyChange) + ); + } @EventListener public void handleEvent(EntityModifyEvent entity) { - entity.getAfter().forEach(this::doNotifyChange); + entity.async( + Flux.fromIterable(entity.getAfter()) + .flatMap(this::doNotifyChange) + ); } - private void handleSubscribe(NotifySubscriberEntity entity) { + @Subscribe("/notification-changed") + public void handleSubscribe(NotifySubscriberEntity entity) { //取消订阅 if (entity.getState() == SubscribeState.disabled) { Optional.ofNullable(subscribers.remove(entity.getId())) - .ifPresent(Disposable::dispose); + .ifPresent(Disposable::dispose); log.debug("unsubscribe:{}({}),{}", entity.getTopicProvider(), entity.getTopicName(), entity.getId()); return; } //模版 Notification template = Notification.from(entity); - //转发通知 - String dispatch = template.createTopic(); Disposable old = subscribers .put(entity.getId(), - Mono.zip(ReactiveAuthenticationHolder.get(entity.getSubscriber()), Mono.justOrEmpty(getProvider(entity.getTopicProvider()))) - .flatMap(tp2 -> tp2.getT2().createSubscriber(entity.getId(),tp2.getT1(), entity.getTopicConfig())) - .flatMap(subscriber -> - subscriber - .subscribe() - .map(template::copyWithMessage) - .flatMap(notification -> eventBus.publish(dispatch, notification)) - .onErrorContinue((err, obj) -> log.error(err.getMessage(), err)) - .then()) - .subscribe() + Mono + .zip(ReactiveAuthenticationHolder.get(entity.getSubscriber()), Mono.justOrEmpty(getProvider(entity.getTopicProvider()))) + .flatMap(tp2 -> tp2.getT2().createSubscriber(entity.getId(), tp2.getT1(), entity.getTopicConfig())) + .flatMap(subscriber -> + subscriber + .subscribe(entity.toLocale()) + .map(template::copyWithMessage) + .doOnNext(eventPublisher::publishEvent) + .onErrorResume((err) -> { + log.error(err.getMessage(), err); + return Mono.empty(); + }) + .then()) + .subscribe() ); log.debug("subscribe :{}({})", template.getTopicProvider(), template.getTopicName()); @@ -124,8 +170,9 @@ public class NotifySubscriberService extends GenericReactiveCrudService doSubscribe(NotifySubscriberEntity entity) { - return Mono.justOrEmpty(getProvider(entity.getTopicProvider())) - .switchIfEmpty(Mono.error(() -> new IllegalArgumentException("不支持的主题:" + entity.getTopicProvider()))) + return Mono + .justOrEmpty(getProvider(entity.getTopicProvider())) + .switchIfEmpty(Mono.error(() -> new BusinessException("error.unsupported_topics", 500, entity.getTopicProvider()))) .map(provider -> { entity.setTopicName(provider.getName()); return entity; @@ -133,9 +180,10 @@ public class NotifySubscriberService extends GenericReactiveCrudService { if (StringUtils.isEmpty(entity.getId())) { entity.setId(null); - return save(Mono.just(entity)); + return save(entity); } else { - return createUpdate().set(entity) + return createUpdate() + .set(entity) .where(NotifySubscriberEntity::getId, entity.getId()) .and(NotifySubscriberEntity::getSubscriberType, entity.getSubscriberType()) .and(NotifySubscriberEntity::getSubscriber, entity.getSubscriber()) @@ -147,7 +195,6 @@ public class NotifySubscriberService extends GenericReactiveCrudService subscribe(); + Flux subscribe(Locale locale); + + default Flux subscribe() { + return subscribe(Locale.getDefault()); + } + } diff --git a/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/SubscriberProviders.java b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/SubscriberProviders.java new file mode 100644 index 00000000..eb23bbb4 --- /dev/null +++ b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/SubscriberProviders.java @@ -0,0 +1,25 @@ +package org.jetlinks.community.notify.manager.subscriber; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +public class SubscriberProviders { + + private final static Map providers = new ConcurrentHashMap<>(); + + + public static void register(SubscriberProvider provider) { + providers.put(provider.getId(), provider); + } + + public static List getProviders() { + return new ArrayList<>(providers.values()); + } + + public static Optional getProvider(String id) { + return Optional.ofNullable(providers.get(id)); + } +} diff --git a/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/channel/InsideMailChannelProvider.java b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/channel/InsideMailChannelProvider.java new file mode 100644 index 00000000..03acafee --- /dev/null +++ b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/channel/InsideMailChannelProvider.java @@ -0,0 +1,61 @@ +package org.jetlinks.community.notify.manager.subscriber.channel; + +import lombok.AllArgsConstructor; +import org.jetlinks.community.notify.manager.entity.Notification; +import org.jetlinks.core.event.EventBus; +import org.springframework.core.Ordered; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Mono; + +import java.util.Map; + +/** + * 站内信通知,通过推送通知信息到事件总线. + *

+ * 由{@link org.jetlinks.community.notify.manager.message.NotificationsPublishProvider}推送到前端. + *

+ * 由{@link org.jetlinks.community.notify.manager.service.NotificationService}写入到数据库. + * + * @author zhouhao + * @since 2.0 + */ +@Component +@AllArgsConstructor +public class InsideMailChannelProvider implements NotifyChannelProvider, NotifyChannel { + public static final String provider = "inside-mail"; + + private final EventBus eventBus; + + @Override + public String getId() { + return "inside-mail"; + } + + @Override + public String getName() { + return "站内信"; + } + + @Override + public Mono createChannel(Map configuration) { + return Mono.just(this); + } + + @Override + public Mono sendNotify(Notification notification) { + //设置了站内信的订阅才推送的事件总线 + return eventBus + .publish(notification.createTopic(), notification) + .then(); + } + + @Override + public void dispose() { + + } + + @Override + public int getOrder() { + return Ordered.HIGHEST_PRECEDENCE; + } +} diff --git a/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/channel/NotificationDispatcher.java b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/channel/NotificationDispatcher.java new file mode 100644 index 00000000..d11a277b --- /dev/null +++ b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/channel/NotificationDispatcher.java @@ -0,0 +1,172 @@ +package org.jetlinks.community.notify.manager.subscriber.channel; + +import lombok.extern.slf4j.Slf4j; +import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository; +import org.hswebframework.web.crud.events.EntityCreatedEvent; +import org.hswebframework.web.crud.events.EntityDeletedEvent; +import org.hswebframework.web.crud.events.EntityModifyEvent; +import org.hswebframework.web.crud.events.EntitySavedEvent; +import org.jetlinks.community.gateway.annotation.Subscribe; +import org.jetlinks.community.notify.manager.entity.Notification; +import org.jetlinks.community.notify.manager.entity.NotifyChannelEntity; +import org.jetlinks.community.notify.manager.enums.NotifyChannelState; +import org.jetlinks.core.cache.ReactiveCacheContainer; +import org.jetlinks.core.event.EventBus; +import org.jetlinks.core.event.Subscription; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.boot.CommandLineRunner; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * 通知订阅转发器,将通知信息转发到对应的订阅通道中 + * + * @author zhouhao + * @since 2.0 + */ +@Component +@Slf4j +public class NotificationDispatcher implements CommandLineRunner { + + private final EventBus eventBus; + + private final ReactiveCacheContainer channels = ReactiveCacheContainer.create(); + + private final Map providers = new HashMap<>(); + + private final ReactiveRepository channelRepository; + + public NotificationDispatcher(EventBus eventBus, + ObjectProvider providers, + ReactiveRepository channelRepository) { + this.eventBus = eventBus; + this.channelRepository = channelRepository; + //默认支持站内信 + this.channels.put(InsideMailChannelProvider.provider, new InsideMailChannelProvider(eventBus)); + + for (NotifyChannelProvider provider : providers) { + this.providers.put(provider.getId(), provider); + } + } + + @EventListener + public void handleNotifications(Notification notification) { + + List channelIdList = notification.getNotifyChannels(); + //默认站内信 + if (channelIdList == null) { + channelIdList = Collections.singletonList(InsideMailChannelProvider.provider); + } + //发送通知 + for (String notifyChannel : channelIdList) { + NotifyChannel dispatcher = channels.getNow(notifyChannel); + if (dispatcher != null) { + dispatcher + .sendNotify(notification) + .subscribe(); + } + } + + } + + @EventListener + public void handleEvent(EntityCreatedEvent event) { + + event.async( + register(event.getEntity()) + ); + } + + @EventListener + public void handleEvent(EntitySavedEvent event) { + + event.async( + register(event.getEntity()) + ); + } + + @EventListener + public void handleEvent(EntityModifyEvent event) { + + event.async( + register(event.getAfter()) + ); + } + + @EventListener + public void handleEvent(EntityDeletedEvent event) { + event.async( + unregister(event.getEntity()) + ); + } + + @Subscribe(value = "/_sys/notify-channel/unregister", features = Subscription.Feature.broker) + public void unregister(NotifyChannelEntity entity) { + channels.remove(entity.getId()); + } + + @Subscribe(value = "/_sys/notify-channel/register", features = Subscription.Feature.broker) + public Mono register(NotifyChannelEntity entity) { + if (entity.getState() == NotifyChannelState.disabled) { + channels.remove(entity.getId()); + } else { + return channels + .compute(entity.getId(), (ignore, old) -> { + if (null != old) { + old.dispose(); + } + return createChannel(entity); + }) + .then(); + } + return Mono.empty(); + } + + private Mono createChannel(NotifyChannelEntity entity) { + NotifyChannelProvider provider = providers.get(entity.getChannelProvider()); + if (null == provider) { + return Mono.empty(); + } + return provider.createChannel(entity.getChannelConfiguration()); + } + + private Mono unregister(List entities) { + for (NotifyChannelEntity entity : entities) { + unregister(entity); + } + return Flux.fromIterable(entities) + .flatMap(e -> eventBus.publish("/_sys/notify-channel/unregister", e)) + .then(); + } + + private Mono register(List entities) { + return Flux.fromIterable(entities) + .flatMap(e -> register(e) + .then(eventBus.publish("/_sys/notify-channel/register", e))) + .then(); + + } + + @Override + public void run(String... args) throws Exception { + channelRepository + .createQuery() + .where(NotifyChannelEntity::getState, NotifyChannelState.enabled) + .fetch() + .flatMap(e -> this + .register(e) + .onErrorResume(er -> { + log.warn("register notify channel error", er); + return Mono.empty(); + })) + .subscribe(); + + } +} diff --git a/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/channel/NotifyChannel.java b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/channel/NotifyChannel.java new file mode 100644 index 00000000..70fdcf71 --- /dev/null +++ b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/channel/NotifyChannel.java @@ -0,0 +1,17 @@ +package org.jetlinks.community.notify.manager.subscriber.channel; + +import org.jetlinks.community.notify.manager.entity.Notification; +import reactor.core.Disposable; +import reactor.core.publisher.Mono; + +/** + * 订阅通知通道,用于发送通知信息 + * + * @author zhouhao + * @since 2.0 + */ +public interface NotifyChannel extends Disposable { + + Mono sendNotify(Notification notification); + +} diff --git a/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/channel/NotifyChannelProvider.java b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/channel/NotifyChannelProvider.java new file mode 100644 index 00000000..477ef1ba --- /dev/null +++ b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/channel/NotifyChannelProvider.java @@ -0,0 +1,16 @@ +package org.jetlinks.community.notify.manager.subscriber.channel; + +import org.springframework.core.Ordered; +import reactor.core.publisher.Mono; + +import java.util.Map; + +public interface NotifyChannelProvider extends Ordered { + + String getId(); + + String getName(); + + Mono createChannel(Map configuration); + +} diff --git a/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/providers/AlarmProvider.java b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/providers/AlarmProvider.java index 458bced2..3455dbe5 100755 --- a/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/providers/AlarmProvider.java +++ b/jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/providers/AlarmProvider.java @@ -3,6 +3,7 @@ package org.jetlinks.community.notify.manager.subscriber.providers; import com.alibaba.fastjson.JSONObject; import lombok.AllArgsConstructor; import lombok.Getter; +import lombok.extern.slf4j.Slf4j; import org.hswebframework.web.authorization.Authentication; import org.hswebframework.web.i18n.LocaleUtils; import org.jetlinks.community.ValueObject; @@ -17,14 +18,20 @@ import org.jetlinks.core.metadata.DefaultConfigMetadata; import org.jetlinks.core.metadata.PropertyMetadata; import org.jetlinks.core.metadata.SimplePropertyMetadata; import org.jetlinks.core.metadata.types.StringType; +import org.jetlinks.core.utils.FluxUtils; import org.springframework.stereotype.Component; +import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.time.Duration; +import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.function.Consumer; @Component +@Slf4j public class AlarmProvider implements SubscriberProvider { private final EventBus eventBus; @@ -56,11 +63,15 @@ public class AlarmProvider implements SubscriberProvider { String alarmId = configs.getString("alarmConfigId").orElse("*"); String topic = Topics.alarm("*", "*", alarmId); - return Mono.justOrEmpty(()-> createSubscribe(id, new String[]{topic})); + + return Mono.just(locale -> createSubscribe(locale, id, new String[]{topic}) + //有效期内去重,防止同一个用户所在多个部门推送同一个告警 + .as(FluxUtils.distinct(Notify::getDataId, Duration.ofSeconds(10)))); } - private Flux createSubscribe(String id, + private Flux createSubscribe(Locale locale, + String id, String[] topics) { Subscription.Feature[] features = new Subscription.Feature[]{Subscription.Feature.local}; return Flux @@ -70,7 +81,7 @@ public class AlarmProvider implements SubscriberProvider { .map(msg -> { JSONObject json = msg.bodyToJson(); return Notify.of( - getNotifyMessage(json), + getNotifyMessage(locale, json), //告警记录ID json.getString("id"), System.currentTimeMillis(), @@ -80,18 +91,19 @@ public class AlarmProvider implements SubscriberProvider { })); } - private static String getNotifyMessage(JSONObject json) { + + private static String getNotifyMessage(Locale locale, JSONObject json) { String message; TargetType targetType = TargetType.of(json.getString("targetType")); String targetName = json.getString("targetName"); - String alarmName = json.getString("alarmName"); + String alarmName = json.getString("alarmConfigName"); if (targetType == TargetType.other) { message = String.format("[%s]发生告警:[%s]!", targetName, alarmName); } else { message = String.format("%s[%s]发生告警:[%s]!", targetType.getText(), targetName, alarmName); } - return LocaleUtils.resolveMessage("message.alarm.notify." + targetType.name(), message, targetName, alarmName); + return LocaleUtils.resolveMessage("message.alarm.notify." + targetType.name(), locale, message, targetName, alarmName); } @Override