优化规则引擎

This commit is contained in:
zhouhao 2020-02-27 17:03:22 +08:00
parent 72e313f378
commit a1c32b946d
10 changed files with 77 additions and 17 deletions

View File

@ -20,6 +20,7 @@ import org.jetlinks.rule.engine.model.antv.AntVG6RuleModelParserStrategy;
import org.jetlinks.rule.engine.standalone.StandaloneRuleEngine;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -120,5 +121,4 @@ public class RuleEngineConfiguration {
}
}

View File

@ -0,0 +1,57 @@
package org.jetlinks.community.rule.engine.configuration;
import lombok.extern.slf4j.Slf4j;
import org.jetlinks.community.elastic.search.enums.FieldDateFormat;
import org.jetlinks.community.elastic.search.enums.FieldType;
import org.jetlinks.community.elastic.search.index.CreateIndex;
import org.jetlinks.community.elastic.search.service.IndexOperationService;
import org.jetlinks.community.rule.engine.event.handler.RuleEngineLoggerIndexProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
/**
* @author bsetfeng
* @since 1.0
**/
@Component
@Order(1)
@Slf4j
public class RuleEngineLogIndexInitialize implements CommandLineRunner {
private final IndexOperationService indexOperationService;
@Autowired
public RuleEngineLogIndexInitialize(IndexOperationService indexOperationService) {
this.indexOperationService = indexOperationService;
}
@Override
public void run(String... args) throws Exception {
indexOperationService.init(CreateIndex.createInstance()
.addIndex(RuleEngineLoggerIndexProvider.RULE_LOG.getStandardIndex())
.createMapping()
.addFieldName("createTime").addFieldType(FieldType.DATE).addFieldDateFormat(FieldDateFormat.epoch_millis, FieldDateFormat.simple_date, FieldDateFormat.strict_date_time).commit()
.addFieldName("level").addFieldType(FieldType.KEYWORD).commit()
.addFieldName("message").addFieldType(FieldType.KEYWORD).commit()
.addFieldName("nodeId").addFieldType(FieldType.KEYWORD).commit()
.addFieldName("instanceId").addFieldType(FieldType.KEYWORD).commit()
.end()
.createIndexRequest())
.and(
indexOperationService.init(CreateIndex.createInstance()
.addIndex(RuleEngineLoggerIndexProvider.RULE_EVENT_LOG.getStandardIndex())
.createMapping()
.addFieldName("createTime").addFieldType(FieldType.DATE).addFieldDateFormat(FieldDateFormat.epoch_millis, FieldDateFormat.simple_date, FieldDateFormat.strict_date_time).commit()
.addFieldName("event").addFieldType(FieldType.KEYWORD).commit()
.addFieldName("nodeId").addFieldType(FieldType.KEYWORD).commit()
.addFieldName("instanceId").addFieldType(FieldType.KEYWORD).commit()
.end()
.createIndexRequest())
)
.doOnError(err -> log.error(err.getMessage(), err))
.subscribe();
}
}

View File

@ -10,6 +10,7 @@ import lombok.Setter;
@Getter
@Setter
public class RuleEngineExecuteEventInfo {
private String id;
private String event;

View File

@ -9,7 +9,9 @@ import lombok.Setter;
**/
@Getter
@Setter
public class ExecuteLogInfo {
public class RuleEngineExecuteLogInfo {
private String id;
private String instanceId;

View File

@ -12,8 +12,8 @@ import org.jetlinks.community.elastic.search.index.ElasticIndex;
@AllArgsConstructor
public enum RuleEngineLoggerIndexProvider implements ElasticIndex {
RULE_LOG("rule-log", "_doc"),
RULE_EVENT_LOG("rule-event-log", "_doc");
RULE_LOG("rule-engine-execute-log", "_doc"),
RULE_EVENT_LOG("rule-engine-execute-event", "_doc");
private String index;

View File

@ -4,7 +4,7 @@ import lombok.extern.slf4j.Slf4j;
import org.hswebframework.web.bean.FastBeanCopier;
import org.jetlinks.community.elastic.search.service.ElasticSearchService;
import org.jetlinks.community.rule.engine.entity.RuleEngineExecuteEventInfo;
import org.jetlinks.community.rule.engine.entity.ExecuteLogInfo;
import org.jetlinks.community.rule.engine.entity.RuleEngineExecuteLogInfo;
import org.jetlinks.rule.engine.api.events.NodeExecuteEvent;
import org.jetlinks.rule.engine.api.events.RuleEvent;
import org.jetlinks.rule.engine.cluster.logger.LogInfo;
@ -25,7 +25,7 @@ public class RuleLogHandler {
@EventListener
public void handleRuleLog(LogInfo event) {
ExecuteLogInfo logInfo = FastBeanCopier.copy(event, new ExecuteLogInfo());
RuleEngineExecuteLogInfo logInfo = FastBeanCopier.copy(event, new RuleEngineExecuteLogInfo());
elasticSearchService.commit(RuleEngineLoggerIndexProvider.RULE_LOG, Mono.just(logInfo))
.subscribe();
}
@ -36,7 +36,7 @@ public class RuleLogHandler {
if (!RuleEvent.NODE_EXECUTE_BEFORE.equals(event.getEvent())
&& !RuleEvent.NODE_EXECUTE_RESULT.equals(event.getEvent())) {
RuleEngineExecuteEventInfo eventInfo = FastBeanCopier.copy(event, new RuleEngineExecuteEventInfo());
elasticSearchService.commit(RuleEngineLoggerIndexProvider.RULE_LOG, Mono.just(eventInfo))
elasticSearchService.commit(RuleEngineLoggerIndexProvider.RULE_EVENT_LOG, Mono.just(eventInfo))
.subscribe();
}
}

View File

@ -40,7 +40,7 @@ public class TimerWorkerNode extends CommonExecutableRuleNodeFactoryStrategy<Tim
});
TimerJob job = jobs.computeIfAbsent(id, _id -> new TimerJob(config, context));
job.doStart();
job.start();
}
@Override
@ -74,7 +74,7 @@ public class TimerWorkerNode extends CommonExecutableRuleNodeFactoryStrategy<Tim
}
running = true;
Mono.delay(Duration.ofMillis(configuration.nextMillis()))
.subscribe(t -> execute(this::start));
.subscribe(t -> execute(this::doStart));
}
void execute(Runnable runnable) {

View File

@ -4,7 +4,7 @@ import lombok.extern.slf4j.Slf4j;
import org.hswebframework.ezorm.core.param.QueryParam;
import org.hswebframework.web.api.crud.entity.PagerResult;
import org.hswebframework.web.crud.service.GenericReactiveCrudService;
import org.jetlinks.community.rule.engine.entity.ExecuteLogInfo;
import org.jetlinks.community.rule.engine.entity.RuleEngineExecuteLogInfo;
import org.jetlinks.community.rule.engine.entity.RuleEngineExecuteEventInfo;
import org.jetlinks.community.rule.engine.event.handler.RuleEngineLoggerIndexProvider;
import org.jetlinks.community.elastic.search.service.ElasticSearchService;
@ -36,8 +36,8 @@ public class RuleInstanceService extends GenericReactiveCrudService<RuleInstance
return elasticSearchService.queryPager(RuleEngineLoggerIndexProvider.RULE_EVENT_LOG, queryParam, RuleEngineExecuteEventInfo.class);
}
public Mono<PagerResult<ExecuteLogInfo>> queryExecuteLog(QueryParam queryParam) {
return elasticSearchService.queryPager(RuleEngineLoggerIndexProvider.RULE_LOG, queryParam, ExecuteLogInfo.class);
public Mono<PagerResult<RuleEngineExecuteLogInfo>> queryExecuteLog(QueryParam queryParam) {
return elasticSearchService.queryPager(RuleEngineLoggerIndexProvider.RULE_LOG, queryParam, RuleEngineExecuteLogInfo.class);
}
public Mono<Void> stop(String id) {

View File

@ -9,7 +9,7 @@ import org.hswebframework.web.crud.service.ReactiveCrudService;
import org.hswebframework.web.crud.web.reactive.ReactiveServiceCrudController;
import org.hswebframework.web.exception.NotFoundException;
import org.hswebframework.web.id.IDGenerator;
import org.jetlinks.community.rule.engine.entity.ExecuteLogInfo;
import org.jetlinks.community.rule.engine.entity.RuleEngineExecuteLogInfo;
import org.jetlinks.community.rule.engine.entity.RuleEngineExecuteEventInfo;
import org.jetlinks.community.rule.engine.entity.RuleInstanceEntity;
import org.jetlinks.community.rule.engine.service.RuleInstanceService;
@ -48,8 +48,8 @@ public class RuleInstanceController implements ReactiveServiceCrudController<Rul
@GetMapping("/{id}/logs")
@QueryAction
public Mono<PagerResult<ExecuteLogInfo>> queryLog(@PathVariable String id,
QueryParamEntity paramEntity) {
public Mono<PagerResult<RuleEngineExecuteLogInfo>> queryLog(@PathVariable String id,
QueryParamEntity paramEntity) {
return paramEntity.toQuery()
.is("instanceId", id)
.execute(instanceService::queryExecuteLog);

View File

@ -91,8 +91,8 @@ logging:
# org.springframework.data.r2dbc.connectionfactory: debug
io.micrometer: warn
org.hswebframework.expands: error
system: warn
org.jetlinks.rule.engine: debug
system: debug
org.jetlinks.rule.engine: warn
org.jetlinks.gateway: debug
org.springframework: warn
vertx: