refactor: 优化lock

This commit is contained in:
zhouhao 2025-08-01 16:38:21 +08:00
parent b9cf1b3db5
commit 8e72147362
5 changed files with 93 additions and 99 deletions

View File

@ -1,20 +1,9 @@
/*
* Copyright 2025 JetLinks https://www.jetlinks.cn
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jetlinks.community.lock;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
@ -34,6 +23,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
@Slf4j
class DefaultReactiveLock implements ReactiveLock {
@SuppressWarnings("all")
static final AtomicReferenceFieldUpdater<DefaultReactiveLock, LockingSubscriber>
@ -42,8 +32,22 @@ class DefaultReactiveLock implements ReactiveLock {
final Deque<LockingSubscriber<?>> queue = new ConcurrentLinkedDeque<>();
protected final LockName lockName;
volatile LockingSubscriber<?> pending;
static final AtomicIntegerFieldUpdater<DefaultReactiveLock> WIP =
AtomicIntegerFieldUpdater.newUpdater(DefaultReactiveLock.class, "wip");
volatile int wip;
DefaultReactiveLock(String lockName) {
this.lockName = new LockName(lockName);
}
public boolean isEmpty() {
return queue.isEmpty();
}
@Override
public <T> Flux<T> lock(Flux<T> job) {
@ -76,28 +80,34 @@ class DefaultReactiveLock implements ReactiveLock {
}
protected void drain() {
if (PENDING.get(this) != null) {
if (WIP.getAndIncrement(this) != 0) {
return;
}
LockingSubscriber<?> locking;
for (; ; ) {
locking = queue.pollFirst();
if (locking == null) {
return;
do {
for (; ; ) {
locking = queue.pollFirst();
if (locking == null) {
break;
}
if (locking.isDisposed()) {
continue;
}
if (PENDING.compareAndSet(this, null, locking)) {
try {
locking.subscribe();
} catch (Throwable e) {
PENDING.compareAndSet(this, locking, null);
queue.addLast(locking);
}
} else {
queue.addLast(locking);
}
break;
}
if (locking.isDisposed()) {
continue;
}
if (PENDING.compareAndSet(this, null, locking)) {
//使用单独的线程池来调度,防止参与锁太多导致栈溢出.
Schedulers.parallel().schedule(locking::subscribe);
} else {
queue.addLast(locking);
}
break;
}
} while (WIP.decrementAndGet(this) != 0);
}
@ -151,6 +161,12 @@ class DefaultReactiveLock implements ReactiveLock {
@Override
public void subscribe(@Nonnull CoreSubscriber<? super T> actual) {
if (actual.currentContext().hasKey(main.lockName)) {
log.debug("reactive lock {} already locked in current context, skip.", main.lockName);
//如果当前上下文已经有锁了,则不再重复注册订阅者
source.subscribe(actual);
return;
}
Consumer<CoreSubscriber<? super T>> subscribeCallback = source::subscribe;
main.registerSubscriber(actual, subscribeCallback, timeout, timeoutFallback);
}
@ -183,6 +199,12 @@ class DefaultReactiveLock implements ReactiveLock {
@Override
public void subscribe(@Nonnull CoreSubscriber<? super T> actual) {
if (actual.currentContext().hasKey(main.lockName)) {
log.debug("reactive lock {} already locked in current context, skip.", main.lockName);
//如果当前上下文已经有锁了,则不再重复注册订阅者
source.subscribe(actual);
return;
}
Consumer<CoreSubscriber<? super T>> subscribeCallback = source::subscribe;
main.registerSubscriber(actual, subscribeCallback, timeout, fallback);
}
@ -190,7 +212,7 @@ class DefaultReactiveLock implements ReactiveLock {
}
static class LockingSubscriber<T> extends BaseSubscriber<T> {
static class LockingSubscriber<T> extends BaseSubscriber<T> implements Runnable {
protected final DefaultReactiveLock main;
protected final CoreSubscriber<? super T> actual;
private final Consumer<CoreSubscriber<? super T>> subscriber;
@ -201,6 +223,7 @@ class DefaultReactiveLock implements ReactiveLock {
AtomicIntegerFieldUpdater.newUpdater(LockingSubscriber.class, "status");
private volatile int status;
private final Context context;
//初始
private static final int INIT = 0;
@ -223,6 +246,11 @@ class DefaultReactiveLock implements ReactiveLock {
this.main = main;
this.subscriber = subscriber;
this.timeoutFallback = timeoutFallback;
this.context = actual
.currentContext()
.put(DefaultReactiveLock.class, main)
.put(main.lockName, true);
if (timeout != null) {
this.timeoutTask = Schedulers
.parallel()
@ -238,8 +266,12 @@ class DefaultReactiveLock implements ReactiveLock {
if (timeoutFallback != null) {
timeoutFallback.subscribe(actual);
} else {
this.onError(new TimeoutException("Lock timed out"));
Operators.error(
actual, new TimeoutException("Lock [" + main.lockName + "] timeout")
);
}
} else {
main.drain();
}
}
@ -249,15 +281,21 @@ class DefaultReactiveLock implements ReactiveLock {
timeoutTask.dispose();
}
subscriber.accept(this);
} else {
main.drain();
}
}
protected void complete() {
if (statusUpdater.compareAndSet(this, INIT, UN_SUB) || statusUpdater.compareAndSet(this, SUB_SOURCE, UN_SUB)) {
if (statusUpdater.compareAndSet(this, INIT, UN_SUB)
|| statusUpdater.compareAndSet(this, SUB_SOURCE, UN_SUB)) {
if (timeoutTask != null && !timeoutTask.isDisposed()) {
timeoutTask.dispose();
}
doComplete();
} else {
main.drain();
}
}
@ -266,9 +304,8 @@ class DefaultReactiveLock implements ReactiveLock {
if (!this.isDisposed()) {
this.cancel();
}
if (PENDING.compareAndSet(main, this, null)) {
main.drain();
}
PENDING.compareAndSet(main, this, null);
main.drain();
}
@Override
@ -304,8 +341,25 @@ class DefaultReactiveLock implements ReactiveLock {
@Override
@Nonnull
public Context currentContext() {
return actual.currentContext();
return context;
}
@Override
public void run() {
subscribe();
}
}
@Getter
@AllArgsConstructor
@EqualsAndHashCode
protected static class LockName {
final String name;
@Override
public String toString() {
return name;
}
}
}

View File

@ -1,18 +1,3 @@
/*
* Copyright 2025 JetLinks https://www.jetlinks.cn
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jetlinks.community.lock;
import com.github.benmanes.caffeine.cache.Caffeine;
@ -30,6 +15,6 @@ class DefaultReactiveLockManager implements ReactiveLockManager {
@Override
public ReactiveLock getLock(String name) {
return cache.computeIfAbsent(name, ignore -> new DefaultReactiveLock());
return cache.computeIfAbsent(name, DefaultReactiveLock::new);
}
}

View File

@ -1,18 +1,3 @@
/*
* Copyright 2025 JetLinks https://www.jetlinks.cn
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jetlinks.community.lock;
import reactor.core.publisher.Flux;

View File

@ -1,18 +1,3 @@
/*
* Copyright 2025 JetLinks https://www.jetlinks.cn
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jetlinks.community.lock;
/**

View File

@ -1,18 +1,3 @@
/*
* Copyright 2025 JetLinks https://www.jetlinks.cn
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jetlinks.community.lock;
/**