diff --git a/jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/server/vertx/DefaultHttpServerProvider.java b/jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/server/vertx/DefaultHttpServerProvider.java index 28839480..cb5f2e30 100755 --- a/jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/server/vertx/DefaultHttpServerProvider.java +++ b/jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/server/vertx/DefaultHttpServerProvider.java @@ -107,14 +107,17 @@ public class DefaultHttpServerProvider implements NetworkProvider { - if (result.succeeded()) { - log.debug("startup http server on [{}]", server.getBindAddress()); - } else { - server.setLastError(result.cause().getMessage()); - log.warn("startup http server on [{}] failed", server.getBindAddress(), result.cause()); - } - }); + vertx.nettyEventLoopGroup() + .execute(()->{ + httpServer.listen(result -> { + if (result.succeeded()) { + log.debug("startup http server on [{}]", server.getBindAddress()); + } else { + server.setLastError(result.cause().getMessage()); + log.warn("startup http server on [{}] failed", server.getBindAddress(), result.cause()); + } + }); + }); } return server; }); diff --git a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java index c43bc6a7..72915a7b 100755 --- a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java +++ b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java @@ -5,10 +5,14 @@ import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.trace.StatusCode; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.hswebframework.web.logger.ReactiveLogger; +import org.jetlinks.community.gateway.AbstractDeviceGateway; +import org.jetlinks.community.gateway.DeviceGatewayHelper; +import org.jetlinks.community.network.mqtt.gateway.device.session.MqttConnectionSession; import org.jetlinks.community.network.mqtt.server.MqttConnection; import org.jetlinks.community.network.mqtt.server.MqttPublishing; import org.jetlinks.community.network.mqtt.server.MqttServer; +import org.jetlinks.community.utils.ObjectMappers; +import org.jetlinks.community.utils.SystemUtils; import org.jetlinks.core.ProtocolSupport; import org.jetlinks.core.device.*; import org.jetlinks.core.device.session.DeviceSessionManager; @@ -18,20 +22,11 @@ import org.jetlinks.core.server.session.DeviceSession; import org.jetlinks.core.server.session.KeepOnlineSession; import org.jetlinks.core.trace.FluxTracer; import org.jetlinks.core.trace.MonoTracer; -import org.jetlinks.community.gateway.AbstractDeviceGateway; -import org.jetlinks.community.gateway.GatewayState; -import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor; -import org.jetlinks.community.gateway.monitor.GatewayMonitors; -import org.jetlinks.community.network.mqtt.gateway.device.session.MqttConnectionSession; -import org.jetlinks.community.gateway.DeviceGatewayHelper; -import org.jetlinks.community.utils.ObjectMappers; -import org.jetlinks.community.utils.SystemUtils; import org.jetlinks.supports.server.DecodedClientMessageHandler; import org.springframework.util.StringUtils; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; import reactor.util.function.Tuple3; import reactor.util.function.Tuples; @@ -125,7 +120,6 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway { } return true; }) -// .publishOn(Schedulers.parallel()) //处理mqtt连接请求 .flatMap(connection -> this .handleConnection(connection) @@ -207,7 +201,6 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway { //应答SERVER_UNAVAILABLE connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE); })) -// .subscribeOn(Schedulers.parallel()) ; } @@ -299,7 +292,6 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway { MqttConnection::close) //网关暂停或者已停止时,则不处理消息 .filter(pb -> isStarted()) - .publishOn(Schedulers.parallel()) //解码收到的mqtt报文 .flatMap(publishing -> this .decodeAndHandleMessage(operator, session, publishing, connection) diff --git a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/DefaultVertxMqttServerProvider.java b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/DefaultVertxMqttServerProvider.java index 5ea30943..3954c433 100755 --- a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/DefaultVertxMqttServerProvider.java +++ b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/DefaultVertxMqttServerProvider.java @@ -60,16 +60,19 @@ public class DefaultVertxMqttServerProvider implements NetworkProvider { - if (result.succeeded()) { - log.debug("startup mqtt server [{}] on port :{} ", properties.getId(), result - .result() - .actualPort()); - } else { - server.setLastError(result.cause().getMessage()); - log.warn("startup mqtt server [{}] error ", properties.getId(), result.cause()); - } - }); + vertx.nettyEventLoopGroup() + .execute(()->{ + instance.listen(result -> { + if (result.succeeded()) { + log.debug("startup mqtt server [{}] on port :{} ", properties.getId(), result + .result() + .actualPort()); + } else { + server.setLastError(result.cause().getMessage()); + log.warn("startup mqtt server [{}] error ", properties.getId(), result.cause()); + } + }); + }); } return server; }); diff --git a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttServer.java b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttServer.java index c8bbca49..e963931d 100755 --- a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttServer.java +++ b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttServer.java @@ -60,20 +60,26 @@ public class VertxMqttServer implements MqttServer { } } + private boolean emitNext(Sinks.Many sink, VertxMqttConnection connection){ + if (sink.currentSubscriberCount() <= 0) { + return false; + } + try{ + sink.emitNext(connection,Reactors.emitFailureHandler()); + return true; + }catch (Throwable ignore){} + return false; + } private void handleConnection(VertxMqttConnection connection) { - boolean anyHandled = false; - if (sink.currentSubscriberCount() > 0) { - if (sink.tryEmitNext(connection).isSuccess()) { - anyHandled = true; - } - } + boolean anyHandled = emitNext(sink, connection); + for (List> value : sinks.values()) { if (value.size() == 0) { continue; } Sinks.Many sink = value.get(ThreadLocalRandom.current().nextInt(value.size())); - if (sink.currentSubscriberCount() > 0 && sink.tryEmitNext(connection).isSuccess()) { + if (emitNext(sink, connection)) { anyHandled = true; } } diff --git a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/DefaultTcpServerProvider.java b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/DefaultTcpServerProvider.java index 5568301d..06f45fab 100644 --- a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/DefaultTcpServerProvider.java +++ b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/DefaultTcpServerProvider.java @@ -79,14 +79,17 @@ public class DefaultTcpServerProvider implements NetworkProvider { - if (result.succeeded()) { - log.info("tcp server startup on {}", result.result().actualPort()); - } else { - tcpServer.setLastError(result.cause().getMessage()); - log.error("startup tcp server error", result.cause()); - } - }); + vertx.nettyEventLoopGroup() + .execute(()->{ + netServer.listen(properties.createSocketAddress(), result -> { + if (result.succeeded()) { + log.info("tcp server startup on {}", result.result().actualPort()); + } else { + tcpServer.setLastError(result.cause().getMessage()); + log.error("startup tcp server error", result.cause()); + } + }); + }); } return tcpServer; });