refactor: 使用新的eventbus实现,增加相关订阅优先级支持. (#430)

This commit is contained in:
老周 2023-10-20 11:37:21 +08:00 committed by GitHub
parent 00997d9ee2
commit 98861ac1ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 23 additions and 30 deletions

View File

@ -10,21 +10,17 @@ import org.jetlinks.core.cluster.ClusterManager;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.supports.cluster.redis.RedisClusterManager;
import org.jetlinks.supports.config.EventBusStorageManager;
import org.jetlinks.supports.event.BrokerEventBus;
import org.jetlinks.supports.event.EventBroker;
import org.jetlinks.supports.event.InternalEventBus;
import org.jetlinks.supports.scalecube.ExtendedCluster;
import org.jetlinks.supports.scalecube.ExtendedClusterImpl;
import org.jetlinks.supports.scalecube.rpc.ScalecubeRpcManager;
import org.nustaq.serialization.FSTConfiguration;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ResourceLoader;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import java.util.stream.Collectors;
@ -70,16 +66,8 @@ public class ClusterConfiguration {
}
@Bean
public BrokerEventBus eventBus(ObjectProvider<EventBroker> provider,
ObjectProvider<Scheduler> scheduler) {
BrokerEventBus eventBus = new BrokerEventBus();
eventBus.setPublishScheduler(scheduler.getIfAvailable(Schedulers::parallel));
for (EventBroker eventBroker : provider) {
eventBus.addBroker(eventBroker);
}
return eventBus;
public InternalEventBus eventBus() {
return new InternalEventBus();
}
@Bean

View File

@ -7,6 +7,7 @@ import org.hswebframework.web.utils.TemplateParser;
import org.jetlinks.community.gateway.annotation.Subscribe;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.trace.MonoTracer;
import org.jetlinks.core.utils.TopicUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
@ -18,8 +19,10 @@ import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Signal;
import javax.annotation.Nonnull;
import java.util.Arrays;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@ -34,7 +37,7 @@ public class SpringMessageBroker implements BeanPostProcessor {
private final Environment environment;
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
public Object postProcessAfterInitialization(@Nonnull Object bean, @Nonnull String beanName) throws BeansException {
Class<?> type = ClassUtils.getUserClass(bean);
ReflectionUtils.doWithMethods(type, method -> {
AnnotationAttributes subscribes = AnnotatedElementUtils.getMergedAnnotationAttributes(method, Subscribe.class);
@ -45,36 +48,37 @@ public class SpringMessageBroker implements BeanPostProcessor {
if (!StringUtils.hasText(id)) {
id = type.getSimpleName().concat(".").concat(method.getName());
}
String traceName = "/java/" + type.getSimpleName() + "/" + method.getName();
String callName = type.getSimpleName() + "." + method.getName();
Subscription subscription = Subscription
.builder()
.subscriberId("spring:" + id)
.topics(Arrays.stream(subscribes.getStringArray("value"))
.map(this::convertTopic)
.flatMap(topic -> TopicUtils.expand(topic).stream())
.collect(Collectors.toList())
)
.collect(Collectors.toList()))
.priority(subscribes.getNumber("priority"))
.features((Subscription.Feature[]) subscribes.get("features"))
.build();
ProxyMessageListener listener = new ProxyMessageListener(bean, method);
Consumer<Signal<Void>> logError = ReactiveLogger
.onError(error -> log.error("handle[{}] event message error", listener, error));
Consumer<Throwable> logError = error -> log.error("handle[{}] event message error : {}", listener, error.getLocalizedMessage(), error);
MonoTracer<Void> tracer = MonoTracer.create(traceName);
eventBus
.subscribe(subscription)
.doOnNext(msg -> {
.subscribe(subscription, msg -> {
try {
listener
return listener
.onMessage(msg)
.doOnEach(logError)
.subscribe();
.as(tracer)
.doOnError(logError)
.checkpoint(callName);
} catch (Throwable e) {
log.error("handle[{}] event message error", listener, e);
logError.accept(e);
}
})
.subscribe();
return Mono.empty();
});
});

View File

@ -171,6 +171,7 @@ public class AutoUpdateThingsDataManager extends LocalFileThingsDataManager {
.topics(ThingConstants.Topics.properties(ThingType.of(thingType), thingId))
.local()
.broker()
.priority(Integer.MIN_VALUE)
.build(),
PropertyMessage.class
)