From 8e721473628eedb48a9f491af11258e18b683434 Mon Sep 17 00:00:00 2001 From: zhouhao Date: Fri, 1 Aug 2025 16:38:21 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E4=BC=98=E5=8C=96lock?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../community/lock/DefaultReactiveLock.java | 130 +++++++++++++----- .../lock/DefaultReactiveLockManager.java | 17 +-- .../jetlinks/community/lock/ReactiveLock.java | 15 -- .../community/lock/ReactiveLockHolder.java | 15 -- .../community/lock/ReactiveLockManager.java | 15 -- 5 files changed, 93 insertions(+), 99 deletions(-) diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/lock/DefaultReactiveLock.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/lock/DefaultReactiveLock.java index 5de023af..941134d0 100644 --- a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/lock/DefaultReactiveLock.java +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/lock/DefaultReactiveLock.java @@ -1,20 +1,9 @@ -/* - * Copyright 2025 JetLinks https://www.jetlinks.cn - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package org.jetlinks.community.lock; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; @@ -34,6 +23,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Consumer; +@Slf4j class DefaultReactiveLock implements ReactiveLock { @SuppressWarnings("all") static final AtomicReferenceFieldUpdater @@ -42,8 +32,22 @@ class DefaultReactiveLock implements ReactiveLock { final Deque> queue = new ConcurrentLinkedDeque<>(); + protected final LockName lockName; + volatile LockingSubscriber pending; + static final AtomicIntegerFieldUpdater WIP = + AtomicIntegerFieldUpdater.newUpdater(DefaultReactiveLock.class, "wip"); + + volatile int wip; + + DefaultReactiveLock(String lockName) { + this.lockName = new LockName(lockName); + } + + public boolean isEmpty() { + return queue.isEmpty(); + } @Override public Flux lock(Flux job) { @@ -76,28 +80,34 @@ class DefaultReactiveLock implements ReactiveLock { } protected void drain() { - if (PENDING.get(this) != null) { + if (WIP.getAndIncrement(this) != 0) { return; } LockingSubscriber locking; - for (; ; ) { - locking = queue.pollFirst(); - if (locking == null) { - return; + do { + for (; ; ) { + locking = queue.pollFirst(); + if (locking == null) { + break; + } + if (locking.isDisposed()) { + continue; + } + if (PENDING.compareAndSet(this, null, locking)) { + try { + locking.subscribe(); + } catch (Throwable e) { + PENDING.compareAndSet(this, locking, null); + queue.addLast(locking); + } + } else { + queue.addLast(locking); + } + break; } - if (locking.isDisposed()) { - continue; - } - if (PENDING.compareAndSet(this, null, locking)) { - //使用单独的线程池来调度,防止参与锁太多导致栈溢出. - Schedulers.parallel().schedule(locking::subscribe); - } else { - queue.addLast(locking); - } - break; - } + } while (WIP.decrementAndGet(this) != 0); } @@ -151,6 +161,12 @@ class DefaultReactiveLock implements ReactiveLock { @Override public void subscribe(@Nonnull CoreSubscriber actual) { + if (actual.currentContext().hasKey(main.lockName)) { + log.debug("reactive lock {} already locked in current context, skip.", main.lockName); + //如果当前上下文已经有锁了,则不再重复注册订阅者 + source.subscribe(actual); + return; + } Consumer> subscribeCallback = source::subscribe; main.registerSubscriber(actual, subscribeCallback, timeout, timeoutFallback); } @@ -183,6 +199,12 @@ class DefaultReactiveLock implements ReactiveLock { @Override public void subscribe(@Nonnull CoreSubscriber actual) { + if (actual.currentContext().hasKey(main.lockName)) { + log.debug("reactive lock {} already locked in current context, skip.", main.lockName); + //如果当前上下文已经有锁了,则不再重复注册订阅者 + source.subscribe(actual); + return; + } Consumer> subscribeCallback = source::subscribe; main.registerSubscriber(actual, subscribeCallback, timeout, fallback); } @@ -190,7 +212,7 @@ class DefaultReactiveLock implements ReactiveLock { } - static class LockingSubscriber extends BaseSubscriber { + static class LockingSubscriber extends BaseSubscriber implements Runnable { protected final DefaultReactiveLock main; protected final CoreSubscriber actual; private final Consumer> subscriber; @@ -201,6 +223,7 @@ class DefaultReactiveLock implements ReactiveLock { AtomicIntegerFieldUpdater.newUpdater(LockingSubscriber.class, "status"); private volatile int status; + private final Context context; //初始 private static final int INIT = 0; @@ -223,6 +246,11 @@ class DefaultReactiveLock implements ReactiveLock { this.main = main; this.subscriber = subscriber; this.timeoutFallback = timeoutFallback; + this.context = actual + .currentContext() + .put(DefaultReactiveLock.class, main) + .put(main.lockName, true); + if (timeout != null) { this.timeoutTask = Schedulers .parallel() @@ -238,8 +266,12 @@ class DefaultReactiveLock implements ReactiveLock { if (timeoutFallback != null) { timeoutFallback.subscribe(actual); } else { - this.onError(new TimeoutException("Lock timed out")); + Operators.error( + actual, new TimeoutException("Lock [" + main.lockName + "] timeout") + ); } + } else { + main.drain(); } } @@ -249,15 +281,21 @@ class DefaultReactiveLock implements ReactiveLock { timeoutTask.dispose(); } subscriber.accept(this); + } else { + main.drain(); } + } protected void complete() { - if (statusUpdater.compareAndSet(this, INIT, UN_SUB) || statusUpdater.compareAndSet(this, SUB_SOURCE, UN_SUB)) { + if (statusUpdater.compareAndSet(this, INIT, UN_SUB) + || statusUpdater.compareAndSet(this, SUB_SOURCE, UN_SUB)) { if (timeoutTask != null && !timeoutTask.isDisposed()) { timeoutTask.dispose(); } doComplete(); + } else { + main.drain(); } } @@ -266,9 +304,8 @@ class DefaultReactiveLock implements ReactiveLock { if (!this.isDisposed()) { this.cancel(); } - if (PENDING.compareAndSet(main, this, null)) { - main.drain(); - } + PENDING.compareAndSet(main, this, null); + main.drain(); } @Override @@ -304,8 +341,25 @@ class DefaultReactiveLock implements ReactiveLock { @Override @Nonnull public Context currentContext() { - return actual.currentContext(); + return context; + } + + @Override + public void run() { + subscribe(); } } + + @Getter + @AllArgsConstructor + @EqualsAndHashCode + protected static class LockName { + final String name; + + @Override + public String toString() { + return name; + } + } } diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/lock/DefaultReactiveLockManager.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/lock/DefaultReactiveLockManager.java index 8612e904..3bff23ac 100644 --- a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/lock/DefaultReactiveLockManager.java +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/lock/DefaultReactiveLockManager.java @@ -1,18 +1,3 @@ -/* - * Copyright 2025 JetLinks https://www.jetlinks.cn - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package org.jetlinks.community.lock; import com.github.benmanes.caffeine.cache.Caffeine; @@ -30,6 +15,6 @@ class DefaultReactiveLockManager implements ReactiveLockManager { @Override public ReactiveLock getLock(String name) { - return cache.computeIfAbsent(name, ignore -> new DefaultReactiveLock()); + return cache.computeIfAbsent(name, DefaultReactiveLock::new); } } diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/lock/ReactiveLock.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/lock/ReactiveLock.java index ad6f22de..358333b1 100644 --- a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/lock/ReactiveLock.java +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/lock/ReactiveLock.java @@ -1,18 +1,3 @@ -/* - * Copyright 2025 JetLinks https://www.jetlinks.cn - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package org.jetlinks.community.lock; import reactor.core.publisher.Flux; diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/lock/ReactiveLockHolder.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/lock/ReactiveLockHolder.java index 370f8ae2..e765dd30 100644 --- a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/lock/ReactiveLockHolder.java +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/lock/ReactiveLockHolder.java @@ -1,18 +1,3 @@ -/* - * Copyright 2025 JetLinks https://www.jetlinks.cn - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package org.jetlinks.community.lock; /** diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/lock/ReactiveLockManager.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/lock/ReactiveLockManager.java index c7cf6b09..2d10aaa1 100644 --- a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/lock/ReactiveLockManager.java +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/lock/ReactiveLockManager.java @@ -1,18 +1,3 @@ -/* - * Copyright 2025 JetLinks https://www.jetlinks.cn - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package org.jetlinks.community.lock; /**