fix(场景联动): 修复重启服务后,场景联动未初始化的问题 (#566)

This commit is contained in:
Zhang Ji 2024-09-02 12:23:46 +08:00 committed by GitHub
parent 412fbd6e2c
commit 3ce520ebe6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 19 additions and 17 deletions

View File

@ -5,9 +5,11 @@ import lombok.extern.slf4j.Slf4j;
import org.jetlinks.community.rule.engine.entity.RuleInstanceEntity;
import org.jetlinks.community.rule.engine.entity.SceneEntity;
import org.jetlinks.community.rule.engine.enums.RuleInstanceState;
import org.jetlinks.rule.engine.api.RuleEngine;
import org.jetlinks.rule.engine.api.model.RuleEngineModelParser;
import org.jetlinks.rule.engine.cluster.RuleInstance;
import org.jetlinks.rule.engine.cluster.RuleInstanceRepository;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -17,13 +19,15 @@ import javax.annotation.Nonnull;
@Component
@AllArgsConstructor
@Slf4j
public class LocalRuleInstanceRepository implements RuleInstanceRepository {
public class LocalRuleInstanceRepository implements RuleInstanceRepository, CommandLineRunner {
private final RuleInstanceService instanceService;
private final SceneService sceneService;
private final RuleEngineModelParser parser;
private final RuleEngine ruleEngine;
@Nonnull
@Override
public Flux<RuleInstance> findAll() {
@ -71,4 +75,17 @@ public class LocalRuleInstanceRepository implements RuleInstanceRepository {
)
;
}
@Override
public void run(String... args) throws Exception {
this
.findAll()
.flatMap(ruleInstance -> ruleEngine
.startRule(ruleInstance.getId(), ruleInstance.getModel())
.onErrorResume(err -> {
log.warn("启动规则[{}]失败: {}", ruleInstance.getModel().getName(), ruleInstance);
return Mono.empty();
}))
.subscribe();
}
}

View File

@ -15,7 +15,6 @@ import org.jetlinks.rule.engine.api.model.RuleEngineModelParser;
import org.jetlinks.rule.engine.api.model.RuleModel;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import reactor.core.publisher.Flux;
@ -23,7 +22,7 @@ import reactor.core.publisher.Mono;
@Service
@Slf4j
public class RuleInstanceService extends GenericReactiveCrudService<RuleInstanceEntity, String> implements CommandLineRunner {
public class RuleInstanceService extends GenericReactiveCrudService<RuleInstanceEntity, String> {
@Autowired
private RuleEngine ruleEngine;
@ -78,18 +77,4 @@ public class RuleInstanceService extends GenericReactiveCrudService<RuleInstance
.as(super::deleteById);
}
@Override
public void run(String... args) {
createQuery()
.where()
.is(RuleInstanceEntity::getState, RuleInstanceState.started)
.fetch()
.flatMap(e -> this
.doStart(e)
.onErrorResume(err -> {
log.warn("启动规则[{}]失败", e.getName(), e);
return Mono.empty();
}))
.subscribe();
}
}