upgrade jetlinks core

This commit is contained in:
zhou-hao 2020-07-29 17:13:32 +08:00
parent aa5d648752
commit 8663d01be8
4 changed files with 65 additions and 62 deletions

View File

@ -0,0 +1,57 @@
package org.jetlinks.community.gateway;
import io.netty.buffer.ByteBuf;
import org.jetlinks.core.NativePayload;
import org.jetlinks.core.Payload;
import org.jetlinks.core.event.TopicPayload;
import org.jetlinks.core.message.codec.EncodedMessage;
import javax.annotation.Nonnull;
public class TopicMessageWrap implements TopicMessage {
private String topic;
private EncodedMessage message;
public static TopicMessageWrap wrap(TopicPayload topicPayload) {
Payload payload = topicPayload.getPayload();
TopicMessageWrap wrap = new TopicMessageWrap();
wrap.topic = topicPayload.getTopic();
if (payload instanceof NativePayload) {
wrap.message = new EncodableMessage() {
@Override
public Object getNativePayload() {
return ((NativePayload<?>) payload).getNativeObject();
}
@Nonnull
@Override
public ByteBuf getPayload() {
return payload.getBody();
}
};
} else {
wrap.message = new EncodedMessage() {
@Nonnull
@Override
public ByteBuf getPayload() {
return payload.getBody();
}
};
}
return wrap;
}
@Nonnull
@Override
public String getTopic() {
return topic;
}
@Nonnull
@Override
public EncodedMessage getMessage() {
return message;
}
}

View File

@ -1,42 +0,0 @@
package org.jetlinks.community.rule.engine.configuration;
import io.netty.buffer.ByteBuf;
import lombok.Getter;
import lombok.Setter;
import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.community.gateway.EncodableMessage;
import org.jetlinks.community.gateway.TopicMessage;
import org.jetlinks.rule.engine.api.NativePayload;
import org.jetlinks.rule.engine.api.SubscribePayload;
import javax.annotation.Nonnull;
@Getter
@Setter
public class EventTopicMessage implements TopicMessage, EncodableMessage {
private String topic;
private Object nativePayload;
private SubscribePayload payload;
public EventTopicMessage(SubscribePayload payload) {
this.topic = payload.getTopic();
this.nativePayload = ((NativePayload) payload.getPayload()).getNativeObject();
this.payload = payload;
}
@Nonnull
@Override
public ByteBuf getPayload() {
return payload.getBody();
}
@Nonnull
@Override
public EncodedMessage getMessage() {
return this;
}
}

View File

@ -2,10 +2,10 @@ package org.jetlinks.community.rule.engine.configuration;
import lombok.extern.slf4j.Slf4j;
import org.jetlinks.community.gateway.MessageGateway;
import org.jetlinks.rule.engine.api.EventBus;
import org.jetlinks.community.gateway.TopicMessageWrap;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.rule.engine.api.RuleEngine;
import org.jetlinks.rule.engine.api.rpc.RpcService;
import org.jetlinks.rule.engine.api.rpc.RpcServiceFactory;
import org.jetlinks.rule.engine.api.scheduler.Scheduler;
import org.jetlinks.rule.engine.api.task.ConditionEvaluator;
import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
@ -16,14 +16,12 @@ import org.jetlinks.rule.engine.condition.supports.DefaultScriptEvaluator;
import org.jetlinks.rule.engine.condition.supports.ScriptConditionEvaluatorStrategy;
import org.jetlinks.rule.engine.condition.supports.ScriptEvaluator;
import org.jetlinks.rule.engine.defaults.DefaultRuleEngine;
import org.jetlinks.rule.engine.defaults.LocalEventBus;
import org.jetlinks.rule.engine.defaults.LocalScheduler;
import org.jetlinks.rule.engine.defaults.LocalWorker;
import org.jetlinks.rule.engine.defaults.rpc.DefaultRpcServiceFactory;
import org.jetlinks.rule.engine.defaults.rpc.EventBusRcpService;
import org.jetlinks.rule.engine.model.DefaultRuleModelParser;
import org.jetlinks.rule.engine.model.RuleModelParserStrategy;
import org.jetlinks.rule.engine.model.antv.AntVG6RuleModelParserStrategy;
import org.jetlinks.supports.event.BrokerEventBus;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.annotation.Bean;
@ -51,27 +49,17 @@ public class RuleEngineConfiguration {
@Bean
public EventBus eventBus(MessageGateway messageGateway) {
LocalEventBus local = new LocalEventBus();
BrokerEventBus local = new BrokerEventBus();
//转发到消息网关
local.subscribe("/**")
.flatMap(subscribePayload -> messageGateway.publish(new EventTopicMessage(subscribePayload)).then())
local.subscribe(Subscription.of("msg.gateway",new String[]{"/**"}, Subscription.Feature.local))
.flatMap(subscribePayload -> messageGateway.publish(TopicMessageWrap.wrap(subscribePayload)).then())
.onErrorContinue((err, obj) -> log.error(err.getMessage(), obj))
.subscribe();
return local;
}
@Bean
public RpcService rpcService(EventBus eventBus) {
return new EventBusRcpService(eventBus);
}
@Bean
public RpcServiceFactory rpcServiceFactory(RpcService rpcService) {
return new DefaultRpcServiceFactory(rpcService);
}
@Bean
public Scheduler localScheduler(Worker worker) {
LocalScheduler scheduler = new LocalScheduler("local");

View File

@ -22,7 +22,7 @@
<hsweb.framework.version>4.0.4</hsweb.framework.version>
<easyorm.version>4.0.4</easyorm.version>
<hsweb.expands.version>3.0.2</hsweb.expands.version>
<jetlinks.version>1.1.0</jetlinks.version>
<jetlinks.version>1.1.1-SNAPSHOT</jetlinks.version>
<r2dbc.version>Arabba-SR6</r2dbc.version>
<vertx.version>3.8.5</vertx.version>
<netty.version>4.1.50.Final</netty.version>