refactor: 优化权限初始化逻辑
This commit is contained in:
parent
3811266945
commit
472e1dc89c
|
|
@ -80,7 +80,7 @@ public class UserAuthenticationEventPublisher {
|
|||
private Mono<Void> publish0(Collection<String> userIdList) {
|
||||
return Flux
|
||||
.fromIterable(userIdList)
|
||||
.flatMapDelayError(
|
||||
.flatMap(
|
||||
userId -> eventBus
|
||||
.publish(Topics.Authentications.userAuthenticationChanged(userId),
|
||||
ReactiveAuthenticationHolder
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@
|
|||
*/
|
||||
package org.jetlinks.community.auth.service;
|
||||
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import lombok.Generated;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.collections4.CollectionUtils;
|
||||
|
|
@ -50,6 +51,7 @@ import reactor.core.publisher.Flux;
|
|||
import reactor.core.publisher.Mono;
|
||||
import reactor.math.MathFlux;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.*;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
|
|
@ -161,6 +163,13 @@ public class DefaultMenuService
|
|||
.as(this::convertToView);
|
||||
}
|
||||
|
||||
final Map<List<String>, Flux<MenuView>> grantedCaching =
|
||||
CacheBuilder
|
||||
.newBuilder()
|
||||
.expireAfterAccess(Duration.ofSeconds(10))
|
||||
.<List<String>, Flux<MenuView>>build()
|
||||
.asMap();
|
||||
|
||||
public Flux<MenuView> getGrantedMenus(List<Dimension> dimensions,
|
||||
Mono<Map<String, MenuEntity>> menuEntityMap) {
|
||||
if (CollectionUtils.isEmpty(dimensions)) {
|
||||
|
|
@ -170,16 +179,25 @@ public class DefaultMenuService
|
|||
.stream()
|
||||
.filter(this::isMenuDimension)
|
||||
.map(dimension -> MenuBindEntity.generateTargetKey(dimension.getType().getId(), dimension.getId()))
|
||||
.sorted()
|
||||
.collect(Collectors.toList());
|
||||
|
||||
return convertToView(CollectionUtils.isEmpty(keyList)
|
||||
? Flux.empty()
|
||||
: bindRepository
|
||||
.createQuery()
|
||||
.where()
|
||||
.in(MenuBindEntity::getTargetKey, keyList)
|
||||
.fetch(),
|
||||
menuEntityMap);
|
||||
return grantedCaching
|
||||
.computeIfAbsent(keyList,
|
||||
_keyList -> Flux
|
||||
.defer(() -> this
|
||||
.convertToView(CollectionUtils.isEmpty(_keyList)
|
||||
? Flux.empty()
|
||||
: bindRepository
|
||||
.createQuery()
|
||||
.where()
|
||||
.in(MenuBindEntity::getTargetKey, _keyList)
|
||||
.fetch(),
|
||||
menuEntityMap))
|
||||
// 缓存1秒钟,避免编辑角色或者组织时,导致大量用户权限失效并进行初始化时执行大量重复的查询.
|
||||
.cache(Duration.ofSeconds(1)));
|
||||
|
||||
|
||||
}
|
||||
|
||||
private boolean isMenuDimension(Dimension dimension) {
|
||||
|
|
@ -204,7 +222,6 @@ public class DefaultMenuService
|
|||
}
|
||||
|
||||
|
||||
|
||||
private Flux<MenuView> convertToView(Flux<MenuBindEntity> entityFlux, Mono<Map<String, MenuEntity>> menuEntityMap) {
|
||||
return Mono
|
||||
.zip(
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ package org.jetlinks.community.notify.manager.service;
|
|||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.collections4.CollectionUtils;
|
||||
import org.apache.commons.collections4.MapUtils;
|
||||
import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
|
||||
import org.hswebframework.web.authorization.Authentication;
|
||||
|
|
@ -26,6 +27,8 @@ import org.hswebframework.web.crud.service.GenericReactiveCrudService;
|
|||
import org.hswebframework.web.exception.BusinessException;
|
||||
import org.hswebframework.web.i18n.LocaleUtils;
|
||||
import org.jetlinks.community.gateway.annotation.Subscribe;
|
||||
import org.jetlinks.community.lock.ReactiveLock;
|
||||
import org.jetlinks.community.lock.ReactiveLockHolder;
|
||||
import org.jetlinks.community.notify.manager.configuration.NotifySubscriberProperties;
|
||||
import org.jetlinks.community.notify.manager.entity.Notification;
|
||||
import org.jetlinks.community.notify.manager.entity.NotifySubscriberChannelEntity;
|
||||
|
|
@ -56,6 +59,7 @@ import reactor.core.publisher.Mono;
|
|||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Service
|
||||
|
|
@ -463,13 +467,9 @@ public class NotifySubscriberService extends GenericReactiveCrudService<NotifySu
|
|||
}
|
||||
|
||||
public void handleSubscriberEntity(NotifySubscriberEntity entity) {
|
||||
NotifySubscriberProviderCache cache = providerChannels.get(entity.getProviderId());
|
||||
NotifySubscriberProviderEntity provider = null;
|
||||
if (cache != null) {
|
||||
provider = cache.getProvider();
|
||||
}
|
||||
//取消订阅
|
||||
if ((provider != null && provider.getState() == NotifyChannelState.disabled) || entity.getState() == SubscribeState.disabled) {
|
||||
// 取消订阅,没有订阅通道也取消,减少内存占用。
|
||||
if (entity.getState() == SubscribeState.disabled
|
||||
|| CollectionUtils.isEmpty(entity.getNotifyChannels())) {
|
||||
Disposable disp = subs.remove(entity.getId());
|
||||
if (disp != null) {
|
||||
log.debug("unsubscribe:{}({}),{},subscriber:{}",
|
||||
|
|
@ -483,8 +483,7 @@ public class NotifySubscriberService extends GenericReactiveCrudService<NotifySu
|
|||
return;
|
||||
}
|
||||
|
||||
subs.computeIfAbsent(entity.getId(), ignore -> new Node())
|
||||
.init(entity);
|
||||
subs.computeIfAbsent(entity.getId(), ignore -> new Node()).init(entity);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -521,8 +520,15 @@ public class NotifySubscriberService extends GenericReactiveCrudService<NotifySu
|
|||
disposable.dispose();
|
||||
}
|
||||
|
||||
// fixme 使用锁来限制并发
|
||||
ReactiveLock lock = ReactiveLockHolder
|
||||
.getLock("subscriber_loader:" +
|
||||
ThreadLocalRandom.current().nextInt(
|
||||
0,
|
||||
Runtime.getRuntime().availableProcessors()));
|
||||
disposable = Mono
|
||||
.zip(ReactiveAuthenticationHolder.get(entity.getSubscriber()), getProvider(entity))
|
||||
.zip(ReactiveAuthenticationHolder.get(entity.getSubscriber()).as(lock::lock),
|
||||
getProvider(entity))
|
||||
.flatMap(tp2 -> {
|
||||
initNotifyChannels(tp2.getT1());
|
||||
return tp2.getT2().createSubscriber(entity.getId(), tp2.getT1(), entity.getTopicConfig());
|
||||
|
|
|
|||
|
|
@ -120,6 +120,8 @@ hsweb:
|
|||
# allopatric-login-modes:
|
||||
# app: offlineOther
|
||||
permission:
|
||||
initialize:
|
||||
enabled-dimensions: api-client
|
||||
filter:
|
||||
enabled: true # 设置为true开启权限过滤,赋权时,不能赋予比自己多的权限.
|
||||
exclude-username: admin # admin用户不受上述限制
|
||||
|
|
|
|||
Loading…
Reference in New Issue