From 98861ac1edf40c95dba28ddaf314aecd49916632 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=80=81=E5=91=A8?= Date: Fri, 20 Oct 2023 11:37:21 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E4=BD=BF=E7=94=A8=E6=96=B0?= =?UTF-8?q?=E7=9A=84eventbus=E5=AE=9E=E7=8E=B0,=E5=A2=9E=E5=8A=A0=E7=9B=B8?= =?UTF-8?q?=E5=85=B3=E8=AE=A2=E9=98=85=E4=BC=98=E5=85=88=E7=BA=A7=E6=94=AF?= =?UTF-8?q?=E6=8C=81.=20(#430)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cluster/ClusterConfiguration.java | 18 ++-------- .../gateway/spring/SpringMessageBroker.java | 34 +++++++++++-------- .../data/AutoUpdateThingsDataManager.java | 1 + 3 files changed, 23 insertions(+), 30 deletions(-) diff --git a/jetlinks-components/configure-component/src/main/java/org/jetlinks/community/configure/cluster/ClusterConfiguration.java b/jetlinks-components/configure-component/src/main/java/org/jetlinks/community/configure/cluster/ClusterConfiguration.java index ccb43911..11f03354 100644 --- a/jetlinks-components/configure-component/src/main/java/org/jetlinks/community/configure/cluster/ClusterConfiguration.java +++ b/jetlinks-components/configure-component/src/main/java/org/jetlinks/community/configure/cluster/ClusterConfiguration.java @@ -10,21 +10,17 @@ import org.jetlinks.core.cluster.ClusterManager; import org.jetlinks.core.event.EventBus; import org.jetlinks.supports.cluster.redis.RedisClusterManager; import org.jetlinks.supports.config.EventBusStorageManager; -import org.jetlinks.supports.event.BrokerEventBus; -import org.jetlinks.supports.event.EventBroker; +import org.jetlinks.supports.event.InternalEventBus; import org.jetlinks.supports.scalecube.ExtendedCluster; import org.jetlinks.supports.scalecube.ExtendedClusterImpl; import org.jetlinks.supports.scalecube.rpc.ScalecubeRpcManager; import org.nustaq.serialization.FSTConfiguration; -import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ResourceLoader; import org.springframework.data.redis.core.ReactiveRedisTemplate; -import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; import java.util.stream.Collectors; @@ -70,16 +66,8 @@ public class ClusterConfiguration { } @Bean - public BrokerEventBus eventBus(ObjectProvider provider, - ObjectProvider scheduler) { - - BrokerEventBus eventBus = new BrokerEventBus(); - eventBus.setPublishScheduler(scheduler.getIfAvailable(Schedulers::parallel)); - for (EventBroker eventBroker : provider) { - eventBus.addBroker(eventBroker); - } - - return eventBus; + public InternalEventBus eventBus() { + return new InternalEventBus(); } @Bean diff --git a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/spring/SpringMessageBroker.java b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/spring/SpringMessageBroker.java index ea5cd15b..6e4537d0 100644 --- a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/spring/SpringMessageBroker.java +++ b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/spring/SpringMessageBroker.java @@ -7,6 +7,7 @@ import org.hswebframework.web.utils.TemplateParser; import org.jetlinks.community.gateway.annotation.Subscribe; import org.jetlinks.core.event.EventBus; import org.jetlinks.core.event.Subscription; +import org.jetlinks.core.trace.MonoTracer; import org.jetlinks.core.utils.TopicUtils; import org.springframework.beans.BeansException; import org.springframework.beans.factory.config.BeanPostProcessor; @@ -18,8 +19,10 @@ import org.springframework.util.ClassUtils; import org.springframework.util.CollectionUtils; import org.springframework.util.ReflectionUtils; import org.springframework.util.StringUtils; +import reactor.core.publisher.Mono; import reactor.core.publisher.Signal; +import javax.annotation.Nonnull; import java.util.Arrays; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -34,7 +37,7 @@ public class SpringMessageBroker implements BeanPostProcessor { private final Environment environment; @Override - public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { + public Object postProcessAfterInitialization(@Nonnull Object bean, @Nonnull String beanName) throws BeansException { Class type = ClassUtils.getUserClass(bean); ReflectionUtils.doWithMethods(type, method -> { AnnotationAttributes subscribes = AnnotatedElementUtils.getMergedAnnotationAttributes(method, Subscribe.class); @@ -45,36 +48,37 @@ public class SpringMessageBroker implements BeanPostProcessor { if (!StringUtils.hasText(id)) { id = type.getSimpleName().concat(".").concat(method.getName()); } - + String traceName = "/java/" + type.getSimpleName() + "/" + method.getName(); + String callName = type.getSimpleName() + "." + method.getName(); Subscription subscription = Subscription .builder() .subscriberId("spring:" + id) .topics(Arrays.stream(subscribes.getStringArray("value")) .map(this::convertTopic) - .flatMap(topic -> TopicUtils.expand(topic).stream()) - .collect(Collectors.toList()) - ) + .collect(Collectors.toList())) + .priority(subscribes.getNumber("priority")) .features((Subscription.Feature[]) subscribes.get("features")) .build(); ProxyMessageListener listener = new ProxyMessageListener(bean, method); - Consumer> logError = ReactiveLogger - .onError(error -> log.error("handle[{}] event message error", listener, error)); + Consumer logError = error -> log.error("handle[{}] event message error : {}", listener, error.getLocalizedMessage(), error); + + MonoTracer tracer = MonoTracer.create(traceName); eventBus - .subscribe(subscription) - .doOnNext(msg -> { + .subscribe(subscription, msg -> { try { - listener + return listener .onMessage(msg) - .doOnEach(logError) - .subscribe(); + .as(tracer) + .doOnError(logError) + .checkpoint(callName); } catch (Throwable e) { - log.error("handle[{}] event message error", listener, e); + logError.accept(e); } - }) - .subscribe(); + return Mono.empty(); + }); }); diff --git a/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/AutoUpdateThingsDataManager.java b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/AutoUpdateThingsDataManager.java index 09d44a71..110f0e09 100644 --- a/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/AutoUpdateThingsDataManager.java +++ b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/AutoUpdateThingsDataManager.java @@ -171,6 +171,7 @@ public class AutoUpdateThingsDataManager extends LocalFileThingsDataManager { .topics(ThingConstants.Topics.properties(ThingType.of(thingType), thingId)) .local() .broker() + .priority(Integer.MIN_VALUE) .build(), PropertyMessage.class )