增加通知记录

This commit is contained in:
zhou-hao 2020-05-25 17:11:26 +08:00
parent 8354071595
commit e8aa94af46
26 changed files with 571 additions and 40 deletions

View File

@ -35,6 +35,12 @@
<artifactId>common-component</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.jetlinks.community</groupId>
<artifactId>gateway-component</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>

View File

@ -1,6 +1,7 @@
package org.jetlinks.community.notify;
import lombok.extern.slf4j.Slf4j;
import org.jetlinks.community.gateway.MessageGateway;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.stereotype.Component;
@ -21,8 +22,11 @@ public class DefaultNotifierManager implements NotifierManager, BeanPostProcesso
private NotifyConfigManager configManager;
public DefaultNotifierManager(NotifyConfigManager manager) {
private MessageGateway messageGateway;
public DefaultNotifierManager(NotifyConfigManager manager, MessageGateway messageGateway) {
this.configManager = manager;
this.messageGateway = messageGateway;
}
protected Mono<NotifierProperties> getProperties(NotifyType notifyType,
@ -42,6 +46,8 @@ public class DefaultNotifierManager implements NotifierManager, BeanPostProcesso
.flatMap(map -> Mono.justOrEmpty(map.get(properties.getProvider())))
.switchIfEmpty(Mono.error(new UnsupportedOperationException("不支持的服务商:" + properties.getProvider())))
.flatMap(notifierProvider -> notifierProvider.createNotifier(properties))
//转成代理,把通知事件发送到消息网关中.
.map(notifier -> new NotifierEventDispatcher<>(messageGateway, notifier))
.flatMap(notifier -> Mono.justOrEmpty(notifiers.put(properties.getId(), notifier))
.flatMap(Notifier::close)//如果存在旧的通知器则关掉之
.onErrorContinue((err, obj) -> log.error(err.getMessage(), err))//忽略异常

View File

@ -17,6 +17,11 @@ import java.util.function.Consumer;
*/
public interface Notifier<T extends Template> {
/**
* @return 通知器ID
*/
String getNotifierId();
/**
* 获取通知类型,: 语音通知
*
@ -73,4 +78,7 @@ public interface Notifier<T extends Template> {
@Nonnull
Mono<Void> close();
default <R extends Notifier<T>> R unwrap(Class<R> type) {
return type.cast(this);
}
}

View File

@ -0,0 +1,27 @@
package org.jetlinks.community.notify;
import org.jetlinks.community.gateway.MessageGateway;
import org.jetlinks.community.notify.event.NotifierEvent;
import org.jetlinks.community.notify.template.Template;
import reactor.core.publisher.Mono;
public class NotifierEventDispatcher<T extends Template> extends NotifierProxy<T> {
private final MessageGateway gateway;
public NotifierEventDispatcher(MessageGateway gateway, Notifier<T> target) {
super(target);
this.gateway = gateway;
}
@Override
protected Mono<Void> onEvent(NotifierEvent event) {
// /notify/{notifierId}/success
return gateway
.publish(String.join("/", "/notify", event.getNotifierId(), event.isSuccess() ? "success" : "error"), event.toSerializable())
.then();
}
}

View File

@ -0,0 +1,106 @@
package org.jetlinks.community.notify;
import lombok.AllArgsConstructor;
import org.jetlinks.core.Values;
import org.jetlinks.community.notify.event.NotifierEvent;
import org.jetlinks.community.notify.template.Template;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
@AllArgsConstructor
public abstract class NotifierProxy<T extends Template> implements Notifier<T> {
private final Notifier<T> target;
@Override
public String getNotifierId() {
return target.getNotifierId();
}
@Nonnull
@Override
public NotifyType getType() {
return target.getType();
}
@Nonnull
@Override
public Provider getProvider() {
return target.getProvider();
}
@Override
public <R extends Notifier<T>> R unwrap(Class<R> type) {
return target.unwrap(type);
}
@Nonnull
@Override
public Mono<Void> send(@Nonnull String templateId, Values context) {
return target
.send(templateId, context)
.switchIfEmpty(Mono.defer(() -> onSuccess(templateId, context)))
.onErrorResume(err -> onError(templateId, context, err).then(Mono.error(err)));
}
@Nonnull
@Override
public Mono<Void> send(@Nonnull T template, @Nonnull Values context) {
return target.send(template, context)
.switchIfEmpty(Mono.defer(() -> onSuccess(template, context)))
.onErrorResume(err -> onError(template, context, err).then(Mono.error(err)));
}
protected Mono<Void> onError(T template, Values ctx, Throwable error) {
return onEvent(NotifierEvent.builder()
.cause(error)
.context(ctx.getAllValues())
.notifierId(getNotifierId())
.notifyType(getType())
.provider(getProvider())
.template(template)
.build());
}
protected Mono<Void> onError(String templateId, Values ctx, Throwable error) {
return onEvent(NotifierEvent.builder()
.cause(error)
.context(ctx.getAllValues())
.notifierId(getNotifierId())
.notifyType(getType())
.provider(getProvider())
.templateId(templateId)
.build());
}
protected Mono<Void> onSuccess(String templateId, Values ctx) {
return onEvent(NotifierEvent.builder()
.success(true)
.context(ctx.getAllValues())
.notifierId(getNotifierId())
.notifyType(getType())
.provider(getProvider())
.templateId(templateId)
.build());
}
protected Mono<Void> onSuccess(T template, Values ctx) {
return onEvent(NotifierEvent.builder()
.success(true)
.context(ctx.getAllValues())
.notifierId(getNotifierId())
.notifyType(getType())
.provider(getProvider())
.template(template)
.build());
}
protected abstract Mono<Void> onEvent(NotifierEvent event);
@Nonnull
@Override
public Mono<Void> close() {
return target.close();
}
}

View File

@ -0,0 +1,56 @@
package org.jetlinks.community.notify.event;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import org.hswebframework.utils.StringUtils;
import org.jetlinks.community.notify.NotifyType;
import org.jetlinks.community.notify.Provider;
import org.jetlinks.community.notify.template.Template;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Map;
@Getter
@Setter
@Builder
public class NotifierEvent {
private boolean success;
@Nullable
private Throwable cause;
@Nonnull
private String notifierId;
@Nonnull
private NotifyType notifyType;
@Nonnull
private Provider provider;
@Nullable
private String templateId;
@Nullable
private Template template;
@Nonnull
private Map<String, Object> context;
public SerializableNotifierEvent toSerializable() {
return SerializableNotifierEvent.builder()
.success(success)
.notifierId(notifierId)
.notifyType(notifyType.getId())
.provider(provider.getId())
.templateId(templateId)
.template(template)
.context(context)
.cause(cause != null ? StringUtils.throwable2String(cause) : "")
.errorType(cause != null ? cause.getClass().getName() : null)
.build();
}
}

View File

@ -0,0 +1,42 @@
package org.jetlinks.community.notify.event;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import org.jetlinks.community.notify.template.Template;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Map;
@Getter
@Setter
@Builder
public class SerializableNotifierEvent {
private boolean success;
@Nullable
private String errorType;
@Nullable
private String cause;
@Nonnull
private String notifierId;
@Nonnull
private String notifyType;
@Nonnull
private String provider;
@Nullable
private String templateId;
@Nullable
private Template template;
@Nonnull
private Map<String,Object> context;
}

View File

@ -1,5 +1,6 @@
package org.jetlinks.community.notify.dingtalk;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.hswebframework.web.exception.BusinessException;
import org.jetlinks.core.Values;
@ -22,24 +23,27 @@ import java.util.concurrent.atomic.AtomicReference;
@Slf4j
public class DingTalkNotifier extends AbstractNotifier<DingTalkMessageTemplate> {
private AtomicReference<String> accessToken = new AtomicReference<>();
private final AtomicReference<String> accessToken = new AtomicReference<>();
private long refreshTokenTime;
private long tokenTimeOut = Duration.ofSeconds(7000).toMillis();
private final long tokenTimeOut = Duration.ofSeconds(7000).toMillis();
private WebClient client;
private final WebClient client;
private static final String tokenApi = "https://oapi.dingtalk.com/gettoken";
private static final String notify = "https://oapi.dingtalk.com/topapi/message/corpconversation/asyncsend_v2";
private DingTalkProperties properties;
private final DingTalkProperties properties;
@Getter
private final String notifierId;
public DingTalkNotifier(WebClient client, DingTalkProperties properties, TemplateManager templateManager) {
public DingTalkNotifier(String id,WebClient client, DingTalkProperties properties, TemplateManager templateManager) {
super(templateManager);
this.client = client;
this.properties = properties;
this.notifierId = id;
}
@Nonnull

View File

@ -30,15 +30,15 @@ public class DingTalkNotifierProvider implements NotifierProvider, TemplateProvi
}
public static final DefaultConfigMetadata notifierConfig = new DefaultConfigMetadata("通知配置", "")
.add("appKey", "appKey", "", new StringType().expand(ConfigMetadataConstants.required.value(true)))
.add("appSecret", "appSecret", "", new StringType());
.add("appKey", "appKey", "", new StringType().expand(ConfigMetadataConstants.required.value(true)))
.add("appSecret", "appSecret", "", new StringType());
public static final DefaultConfigMetadata templateConfig = new DefaultConfigMetadata("模版配置", "")
.add("agentId", "应用ID", "", new StringType().expand(ConfigMetadataConstants.required.value(true)))
.add("userIdList", "收信人ID", "与部门ID不能同时为空", new StringType())
.add("departmentIdList", "收信部门ID", "与收信人ID不能同时为空", new StringType())
.add("toAllUser", "全部用户", "推送到全部用户", new BooleanType())
.add("message", "内容", "最大不超过500字", new StringType().expand(ConfigMetadataConstants.maxLength.value(500L)));
.add("agentId", "应用ID", "", new StringType().expand(ConfigMetadataConstants.required.value(true)))
.add("userIdList", "收信人ID", "与部门ID不能同时为空", new StringType())
.add("departmentIdList", "收信部门ID", "与收信人ID不能同时为空", new StringType())
.add("toAllUser", "全部用户", "推送到全部用户", new BooleanType())
.add("message", "内容", "最大不超过500字", new StringType().expand(ConfigMetadataConstants.maxLength.value(500L)));
@Nonnull
@Override
@ -64,7 +64,7 @@ public class DingTalkNotifierProvider implements NotifierProvider, TemplateProvi
public Mono<DingTalkNotifier> createNotifier(@Nonnull NotifierProperties properties) {
return Mono.defer(() -> {
DingTalkProperties dingTalkProperties = FastBeanCopier.copy(properties.getConfiguration(), new DingTalkProperties());
return Mono.just(new DingTalkNotifier(client, ValidatorUtils.tryValidate(dingTalkProperties), templateManager));
return Mono.just(new DingTalkNotifier(properties.getId(), client, ValidatorUtils.tryValidate(dingTalkProperties), templateManager));
});
}

View File

@ -27,7 +27,7 @@ class DingTalkNotifierTest {
messageTemplate.setMessage("test"+System.currentTimeMillis());
messageTemplate.setUserIdList("0458215455697857");
DingTalkNotifier notifier=new DingTalkNotifier(
DingTalkNotifier notifier=new DingTalkNotifier("test",
WebClient.builder().build(),properties,null
);

View File

@ -59,10 +59,15 @@ public class DefaultEmailNotifier extends AbstractNotifier<EmailTemplate> {
@Setter
private String sender;
@Getter
private final String notifierId;
public static Scheduler scheduler = Schedulers.newElastic("email-notifier");
public DefaultEmailNotifier(NotifierProperties properties, TemplateManager templateManager) {
super(templateManager);
notifierId = properties.getId();
DefaultEmailProperties emailProperties = new JSONObject(properties.getConfiguration())
.toJavaObject(DefaultEmailProperties.class);

View File

@ -1,6 +1,7 @@
package org.jetlinks.community.notify.sms.provider;
import com.alibaba.fastjson.JSON;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import org.jetlinks.core.Values;
@ -84,7 +85,7 @@ public class Hy2046SmsSenderProvider implements NotifierProvider, TemplateProvid
Assert.hasText(userId, "短信配置错误,缺少userId");
Assert.hasText(username, "短信配置错误,缺少username");
Assert.hasText(password, "短信配置错误,缺少password");
return Mono.just(new Hy2046SmsSender(userId, username, password));
return Mono.just(new Hy2046SmsSender(properties.getId(),userId, username, password));
});
}
@ -103,10 +104,13 @@ public class Hy2046SmsSenderProvider implements NotifierProvider, TemplateProvid
String userId;
String username;
String password;
@Getter
private final String notifierId;
public Hy2046SmsSender(String userId, String username, String password) {
public Hy2046SmsSender(String id,String userId, String username, String password) {
super(templateManager);
this.userId = userId;
this.notifierId = id;
this.username = username;
this.password = password;
}

View File

@ -69,6 +69,11 @@ public class TestSmsProvider extends AbstractNotifier<PlainTextSmsTemplate> impl
return Mono.just(this);
}
@Override
public String getNotifierId() {
return "test-sms-notify";
}
@Override
public String getId() {
return "test";

View File

@ -9,6 +9,7 @@ import com.aliyuncs.IAcsClient;
import com.aliyuncs.http.MethodType;
import com.aliyuncs.profile.DefaultProfile;
import com.aliyuncs.profile.IClientProfile;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.hswebframework.web.exception.BusinessException;
import org.hswebframework.web.logger.ReactiveLogger;
@ -25,14 +26,17 @@ import java.util.Objects;
@Slf4j
public class AliyunVoiceNotifier extends AbstractNotifier<AliyunVoiceTemplate> {
private IAcsClient client;
private final IAcsClient client;
private String domain = "dyvmsapi.aliyuncs.com";
private String regionId = "cn-hangzhou";
private int connectTimeout = 1000;
private int readTimeout = 5000;
private final int connectTimeout = 1000;
private final int readTimeout = 5000;
@Getter
private String notifierId;
public AliyunVoiceNotifier(NotifierProperties profile, TemplateManager templateManager) {
super(templateManager);
this.notifierId = profile.getId();
Map<String, Object> config = profile.getConfiguration();
DefaultProfile defaultProfile = DefaultProfile.getProfile(
this.regionId = (String) Objects.requireNonNull(config.get("regionId"), "regionId不能为空"),

View File

@ -1,6 +1,7 @@
package org.jetlinks.community.notify.wechat;
import com.alibaba.fastjson.JSON;
import lombok.Getter;
import org.hswebframework.web.bean.FastBeanCopier;
import org.hswebframework.web.validator.ValidatorUtils;
import org.jetlinks.core.metadata.ConfigMetadata;
@ -23,7 +24,6 @@ public class WechatNotifierProvider implements NotifierProvider, TemplateProvide
private WebClient client = WebClient.create();
private final TemplateManager templateManager;
public WechatNotifierProvider(TemplateManager templateManager) {
this.templateManager = templateManager;
}
@ -61,7 +61,7 @@ public class WechatNotifierProvider implements NotifierProvider, TemplateProvide
public Mono<WeixinCorpNotifier> createNotifier(@Nonnull NotifierProperties properties) {
return Mono.defer(() -> {
WechatCorpProperties wechatCorpProperties = FastBeanCopier.copy(properties.getConfiguration(), new WechatCorpProperties());
return Mono.just(new WeixinCorpNotifier(client, ValidatorUtils.tryValidate(wechatCorpProperties), templateManager));
return Mono.just(new WeixinCorpNotifier(properties.getId(),client, ValidatorUtils.tryValidate(wechatCorpProperties), templateManager));
});
}

View File

@ -1,5 +1,6 @@
package org.jetlinks.community.notify.wechat;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.hswebframework.web.exception.BusinessException;
import org.jetlinks.core.Values;
@ -22,24 +23,27 @@ import java.util.concurrent.atomic.AtomicReference;
@Slf4j
public class WeixinCorpNotifier extends AbstractNotifier<WechatMessageTemplate> {
private AtomicReference<String> accessToken = new AtomicReference<>();
private final AtomicReference<String> accessToken = new AtomicReference<>();
private long refreshTokenTime;
private long tokenTimeOut = Duration.ofSeconds(7000).toMillis();
private final long tokenTimeOut = Duration.ofSeconds(7000).toMillis();
private WebClient client;
private final WebClient client;
private static final String tokenApi = "https://qyapi.weixin.qq.com/cgi-bin/gettoken";
private static final String notify = "https://qyapi.weixin.qq.com/cgi-bin/message/send";
private WechatCorpProperties properties;
private final WechatCorpProperties properties;
@Getter
private final String notifierId;
public WeixinCorpNotifier(WebClient client, WechatCorpProperties properties, TemplateManager templateManager) {
public WeixinCorpNotifier(String id,WebClient client, WechatCorpProperties properties, TemplateManager templateManager) {
super(templateManager);
this.client = client;
this.properties = properties;
this.notifierId = id;
}
@Nonnull

View File

@ -26,7 +26,7 @@ class WeixinCorpNotifierTest {
messageTemplate.setMessage("test"+System.currentTimeMillis());
messageTemplate.setToUser("userId");
WeixinCorpNotifier notifier=new WeixinCorpNotifier(
WeixinCorpNotifier notifier=new WeixinCorpNotifier("test",
WebClient.builder().build(),properties,null
);

View File

@ -0,0 +1,17 @@
package org.jetlinks.community.device.entity;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.jetlinks.community.device.enums.DeviceState;
@Getter
@Setter
@AllArgsConstructor(staticName = "of")
@NoArgsConstructor
public class DeviceStateInfo {
private String deviceId;
private DeviceState state;
}

View File

@ -0,0 +1,81 @@
package org.jetlinks.community.device.message;
import com.alibaba.fastjson.JSON;
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
import org.jetlinks.community.device.entity.DeviceInstanceEntity;
import org.jetlinks.community.device.service.LocalDeviceInstanceService;
import org.jetlinks.community.gateway.external.SubscribeRequest;
import org.jetlinks.community.gateway.external.SubscriptionProvider;
import org.jetlinks.supports.utils.MqttTopicUtils;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.util.Map;
@Component
public class DeviceBatchOperationSubscriptionProvider implements SubscriptionProvider {
private final LocalDeviceInstanceService instanceService;
public DeviceBatchOperationSubscriptionProvider(LocalDeviceInstanceService instanceService) {
this.instanceService = instanceService;
}
@Override
public String id() {
return "device-batch-operator";
}
@Override
public String name() {
return "设备批量操作";
}
@Override
public String[] getTopicPattern() {
return new String[]{
"/device-batch/*"
};
}
@Override
public Flux<?> subscribe(SubscribeRequest request) {
String topic = request.getTopic();
QueryParamEntity queryParamEntity = request.getString("queryJson")
.map(json -> JSON.parseObject(json, QueryParamEntity.class))
.orElseGet(QueryParamEntity::new);
Map<String, String> var = MqttTopicUtils.getPathVariables("/device-batch/{type}", topic);
String type = var.get("type");
switch (type) {
case "state-sync":
return handleStateSync(queryParamEntity);
case "deploy":
return handleDeploy(queryParamEntity);
default:
return Flux.error(new IllegalArgumentException("不支持的类型:" + type));
}
}
private Flux<?> handleDeploy(QueryParamEntity queryParamEntity) {
return instanceService
.query(queryParamEntity.noPaging().includes("id"))
.as(instanceService::deploy);
}
private Flux<?> handleStateSync(QueryParamEntity queryParamEntity) {
return instanceService.query(queryParamEntity.noPaging().includes("id"))
.map(DeviceInstanceEntity::getId)
.buffer(200)
.publishOn(Schedulers.single())
.concatMap(flux -> instanceService.syncStateBatch(Flux.just(flux), true))
.flatMap(Flux::fromIterable);
}
}

View File

@ -352,7 +352,7 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
.flatMap(message -> Mono.justOrEmpty(DeviceMessageUtils.convert(message))
.map(DeviceMessage::getDeviceId)), 800, 200, Duration.ofSeconds(2))
.publishOn(Schedulers.parallel())
.concatMap(list -> syncStateBatch(Flux.just(list), false).reduce(Math::addExact))
.concatMap(list -> syncStateBatch(Flux.just(list), false).map(List::size))
.onErrorContinue((err, obj) -> log.error(err.getMessage(), err))
.subscribe((i) -> log.info("同步设备状态成功:{}", i));
}
@ -364,7 +364,7 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
.switchIfEmpty(Mono.error(NotFoundException::new));
}
public Flux<Integer> syncStateBatch(Flux<List<String>> batch, boolean force) {
public Flux<List<DeviceStateInfo>> syncStateBatch(Flux<List<String>> batch, boolean force) {
return batch
.concatMap(list -> Flux.fromIterable(list)
@ -384,6 +384,7 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
.collect(Collectors.groupingBy(Tuple2::getT1))
.flatMapIterable(Map::entrySet)
.flatMap(group -> {
List<String> deviceId=group.getValue().stream().map(Tuple3::getT2).collect(Collectors.toList());
DeviceState state = DeviceState.of(group.getKey());
return Mono.zip(
//批量修改设备状态
@ -391,10 +392,9 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
.createUpdate()
.set(DeviceInstanceEntity::getState, state)
.where()
.in(DeviceInstanceEntity::getId, group.getValue().stream().map(Tuple3::getT2).collect(Collectors.toList()))
.in(DeviceInstanceEntity::getId,deviceId)
.execute()
.thenReturn(group.getValue().size())//mysql下可能不会返回更新数量
,
.thenReturn(group.getValue().size()),
//修改子设备状态
Flux.fromIterable(group.getValue())
.filter(Tuple3::getT3)
@ -408,8 +408,8 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
.where()
.in(DeviceInstanceEntity::getParentId, parents)
.execute())
.defaultIfEmpty(0),
Math::addExact);
.defaultIfEmpty(0))
.thenReturn(deviceId.stream().map(id->DeviceStateInfo.of(id,state)).collect(Collectors.toList()));
}));
}

View File

@ -168,7 +168,7 @@ public class DeviceInstanceController implements
.map(DeviceInstanceEntity::getId)
.buffer(200)
.publishOn(Schedulers.single())
.concatMap(flux -> service.syncStateBatch(Flux.just(flux), true))
.concatMap(flux -> service.syncStateBatch(Flux.just(flux), true).map(List::size))
.defaultIfEmpty(0);
}

View File

@ -0,0 +1,89 @@
package org.jetlinks.community.notify.manager.entity;
import com.alibaba.fastjson.JSON;
import lombok.Getter;
import lombok.Setter;
import org.hswebframework.ezorm.rdb.mapping.annotation.ColumnType;
import org.hswebframework.ezorm.rdb.mapping.annotation.DefaultValue;
import org.hswebframework.ezorm.rdb.mapping.annotation.EnumCodec;
import org.hswebframework.ezorm.rdb.mapping.annotation.JsonCodec;
import org.hswebframework.web.api.crud.entity.GenericEntity;
import org.hswebframework.web.bean.FastBeanCopier;
import org.hswebframework.web.crud.generator.Generators;
import org.jetlinks.community.notify.event.SerializableNotifierEvent;
import org.jetlinks.community.notify.manager.enums.NotifyState;
import javax.persistence.Column;
import javax.persistence.Index;
import javax.persistence.Table;
import java.sql.JDBCType;
import java.util.Date;
import java.util.Map;
@Table(name = "notify_history", indexes = {
@Index(name = "idx_nt_his_notifier_id", columnList = "notifier_id")
})
@Getter
@Setter
public class NotifyHistoryEntity extends GenericEntity<String> {
private static final long serialVersionUID = -6849794470754667710L;
@Column(length = 32, nullable = false, updatable = false)
private String notifierId;
@Column(nullable = false)
@DefaultValue("success")
@EnumCodec
@ColumnType(javaType = String.class)
private NotifyState state;
@Column(length = 1024)
private String errorType;
@Column
@ColumnType(jdbcType = JDBCType.CLOB, javaType = String.class)
private String errorStack;
@Column(length = 32, nullable = false)
@DefaultValue("-")
private String templateId;
@Column
@ColumnType(jdbcType = JDBCType.CLOB, javaType = String.class)
private String template;
@Column
@ColumnType(jdbcType = JDBCType.CLOB, javaType = String.class)
@JsonCodec
private Map<String, Object> context;
@Column(length = 32, nullable = false)
private String provider;
@Column(length = 32, nullable = false)
private String notifyType;
@Column
@DefaultValue(generator = Generators.CURRENT_TIME)
private Date notifyTime;
@Column
@DefaultValue("0")
private Integer retryTimes;
public static NotifyHistoryEntity of(SerializableNotifierEvent event) {
NotifyHistoryEntity entity = FastBeanCopier.copy(event, new NotifyHistoryEntity());
if (null != event.getTemplate()) {
entity.setTemplate(JSON.toJSONString(event.getTemplate()));
}
if (event.isSuccess()) {
entity.setState(NotifyState.success);
} else {
entity.setErrorStack(event.getCause());
entity.setState(NotifyState.error);
}
return entity;
}
}

View File

@ -0,0 +1,20 @@
package org.jetlinks.community.notify.manager.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.hswebframework.web.dict.EnumDict;
@Getter
@AllArgsConstructor
public enum NotifyState implements EnumDict<String> {
success("成功"),
error("失败");
private final String text;
@Override
public String getValue() {
return name();
}
}

View File

@ -0,0 +1,22 @@
package org.jetlinks.community.notify.manager.service;
import org.hswebframework.web.crud.service.GenericReactiveCrudService;
import org.jetlinks.community.gateway.annotation.Subscribe;
import org.jetlinks.community.notify.event.SerializableNotifierEvent;
import org.jetlinks.community.notify.manager.entity.NotifyHistoryEntity;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import reactor.core.publisher.Mono;
@Service
public class NotifyHistoryService extends GenericReactiveCrudService<NotifyHistoryEntity, String> {
@Subscribe("/notify/**")
@Transactional(propagation = Propagation.NEVER)
public Mono<Void> handleNotify(SerializableNotifierEvent event) {
return insert(Mono.just(NotifyHistoryEntity.of(event))).then();
}
}

View File

@ -0,0 +1,25 @@
package org.jetlinks.community.notify.manager.web;
import org.hswebframework.web.authorization.annotation.Resource;
import org.hswebframework.web.crud.web.reactive.ReactiveServiceQueryController;
import org.jetlinks.community.notify.manager.entity.NotifyHistoryEntity;
import org.jetlinks.community.notify.manager.service.NotifyHistoryService;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/notify/history")
@Resource(id = "notifier", name = "通知管理")
public class NotifierHistoryController implements ReactiveServiceQueryController<NotifyHistoryEntity, String> {
private final NotifyHistoryService historyService;
public NotifierHistoryController(NotifyHistoryService historyService) {
this.historyService = historyService;
}
@Override
public NotifyHistoryService getService() {
return historyService;
}
}

View File

@ -5,10 +5,10 @@ spring:
embedded:
enabled: true # 使用内置的redis,不建议在生产环境中使用.
host: 127.0.0.1
port: 6379
port: 6370
data-path: ./data/redis
host: 127.0.0.1
port: 6379
port: 6370
lettuce:
pool:
max-active: 1024
@ -26,11 +26,11 @@ elasticsearch:
embedded:
enabled: true # 为true时使用内嵌的elasticsearch,不建议在生产环境中使用
data-path: ./data/elasticsearch
port: 9200
port: 9201
host: 0.0.0.0
client:
host: localhost
port: 9200
port: 9201
max-conn-total: 128
connect-timeout: 5000
socket-timeout: 5000