优化背压策略

This commit is contained in:
zhou-hao 2020-07-09 18:19:47 +08:00
parent 919816dde6
commit 918db734d6
6 changed files with 37 additions and 63 deletions

View File

@ -4,6 +4,7 @@ import org.jetlinks.community.gateway.MessageConnection;
import org.jetlinks.community.gateway.MessageConnector;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import javax.annotation.Nonnull;
import java.util.function.Function;
@ -26,11 +27,13 @@ class LocalMessageConnector implements MessageConnector {
return "本地连接器";
}
EmitterProcessor<MessageConnection> processor = EmitterProcessor.create(false);
private final EmitterProcessor<MessageConnection> processor = EmitterProcessor.create(false);
private final FluxSink<MessageConnection> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
public LocalMessageConnection addConnection(String id, boolean shareCluster) {
LocalMessageConnection connection = new LocalMessageConnection(id,shareCluster);
processor.onNext(connection);
LocalMessageConnection connection = new LocalMessageConnection(id, shareCluster);
sink.next(connection);
return connection;
}

View File

@ -26,24 +26,21 @@ public class VertxMqttClient implements MqttClient {
@Getter
private io.vertx.mqtt.MqttClient client;
private FluxProcessor<MqttMessage, MqttMessage> messageProcessor;
private final FluxProcessor<MqttMessage, MqttMessage> messageProcessor = EmitterProcessor.create(false);
private FluxSink<MqttMessage> sink;
private final FluxSink<MqttMessage> sink = messageProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
private Map<String, AtomicInteger> topicsSubscribeCounter = new ConcurrentHashMap<>();
private final Map<String, AtomicInteger> topicsSubscribeCounter = new ConcurrentHashMap<>();
private boolean neverSubscribe = true;
private String id;
private final String id;
@Getter
private AtomicInteger reloadCounter = new AtomicInteger();
private final AtomicInteger reloadCounter = new AtomicInteger();
public VertxMqttClient(String id) {
this.id = id;
this.messageProcessor = EmitterProcessor.create(false);
sink = this.messageProcessor.sink();
}
public void setClient(io.vertx.mqtt.MqttClient client) {

View File

@ -32,6 +32,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
@ -45,32 +46,31 @@ import java.util.function.Function;
public class MqttClientDeviceGateway implements DeviceGateway {
@Getter
private String id;
private final String id;
private MqttClient mqttClient;
private final MqttClient mqttClient;
private DeviceRegistry registry;
private final DeviceRegistry registry;
private List<String> topics;
private final List<String> topics;
private String protocol;
private final String protocol;
private ProtocolSupports protocolSupport;
private final ProtocolSupports protocolSupport;
private DecodedClientMessageHandler clientMessageHandler;
private final DecodedClientMessageHandler clientMessageHandler;
private final EmitterProcessor<Message> messageProcessor = EmitterProcessor.create(false);
private EmitterProcessor<Message> messageProcessor = EmitterProcessor.create(false);
private final FluxSink<Message> sink = messageProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
private FluxSink<Message> sink = messageProcessor.sink();
private final AtomicBoolean started = new AtomicBoolean();
private AtomicBoolean started = new AtomicBoolean();
private final List<Disposable> disposable = new CopyOnWriteArrayList<>();
private List<Disposable> disposable = new CopyOnWriteArrayList<>();
private final DeviceGatewayMonitor gatewayMonitor;
private DeviceSessionManager sessionManager;
private DeviceGatewayMonitor gatewayMonitor;
private final DeviceSessionManager sessionManager;
public MqttClientDeviceGateway(String id,
MqttClient mqttClient,
@ -87,8 +87,8 @@ public class MqttClientDeviceGateway implements DeviceGateway {
this.registry = Objects.requireNonNull(registry, "registry");
this.protocolSupport = Objects.requireNonNull(protocolSupport, "protocolSupport");
this.protocol = Objects.requireNonNull(protocol, "protocol");
this.sessionManager = Objects.requireNonNull(sessionManager, "sessionManager");
this.clientMessageHandler = Objects.requireNonNull(clientMessageHandler, "clientMessageHandler");
this.sessionManager = Objects.requireNonNull(sessionManager, "sessionManager");
this.topics = Objects.requireNonNull(topics, "topics");
}
@ -109,6 +109,7 @@ public class MqttClientDeviceGateway implements DeviceGateway {
return getProtocol()
.flatMap(codec -> codec.getMessageCodec(getTransport()))
.flatMapMany(codec -> codec.decode(new FromDeviceMessageContext() {
@Nonnull
@Override
public EncodedMessage getMessage() {
return mqttMessage;

View File

@ -28,13 +28,13 @@ import java.util.function.Function;
@Slf4j
public class PipePayloadParser implements PayloadParser {
private EmitterProcessor<Buffer> processor = EmitterProcessor.create(true);
private final EmitterProcessor<Buffer> processor = EmitterProcessor.create(true);
private FluxSink<Buffer> sink = processor.sink();
private final FluxSink<Buffer> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
private List<Consumer<Buffer>> pipe = new CopyOnWriteArrayList<>();
private final List<Consumer<Buffer>> pipe = new CopyOnWriteArrayList<>();
private List<Buffer> result = new CopyOnWriteArrayList<>();
private final List<Buffer> result = new CopyOnWriteArrayList<>();
private volatile RecordParser recordParser;
@ -42,7 +42,7 @@ public class PipePayloadParser implements PayloadParser {
private Consumer<RecordParser> firstInit;
private AtomicInteger currentPipe = new AtomicInteger();
private final AtomicInteger currentPipe = new AtomicInteger();
public PipePayloadParser result(String buffer) {
return result(Buffer.buffer(buffer));
@ -68,6 +68,10 @@ public class PipePayloadParser implements PayloadParser {
}
public PipePayloadParser fixed(int size) {
if (size == 0) {
complete();
return this;
}
if (recordParser == null) {
setParser(RecordParser.newFixed(size));
firstInit = (parser -> parser.fixedSizeMode(size));

View File

@ -1,31 +0,0 @@
package org.jetlinks.community.network.tcp.server;
import org.jetlinks.community.network.tcp.client.TcpClient;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import java.util.function.Function;
/**
* @author bsetfeng
* @since 1.0
**/
public abstract class AbstractTcpServer implements TcpServer{
private EmitterProcessor<TcpClient> processor = EmitterProcessor.create(false);
FluxSink<TcpClient> sink=processor.sink();
protected void received(TcpClient tcpClient) {
// if (processor.hasDownstreams()) {
sink.next(tcpClient);
// }
}
@Override
public Flux<TcpClient> handleConnection() {
return processor
.map(Function.identity());
}
}

View File

@ -20,7 +20,7 @@ public class DeviceAlarmHistoryService extends GenericReactiveCrudService<Device
EmitterProcessor<DeviceAlarmHistoryEntity> processor = EmitterProcessor.create(false);
FluxSink<DeviceAlarmHistoryEntity> sink = processor.sink();
FluxSink<DeviceAlarmHistoryEntity> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
@PostConstruct
public void init() {