diff --git a/jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/device/HttpServerDeviceGateway.java b/jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/device/HttpServerDeviceGateway.java index bc89ac29..3f8f94a5 100755 --- a/jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/device/HttpServerDeviceGateway.java +++ b/jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/device/HttpServerDeviceGateway.java @@ -147,7 +147,7 @@ public class HttpServerDeviceGateway extends AbstractDeviceGateway { .getMessageCodec(DefaultTransport.WebSocket) .flatMapMany(codec -> codec.decode(FromDeviceMessageContext.of(session, msg, registry))) .cast(DeviceMessage.class) - .flatMap(deviceMessage -> { + .concatMap(deviceMessage -> { monitor.receivedMessage(); if (!StringUtils.hasText(deviceMessage.getDeviceId())) { deviceMessage.thingId(DeviceThingType.device, session.getDeviceId()); diff --git a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttClientDeviceGateway.java b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttClientDeviceGateway.java index 2ba9f3cf..931d2474 100755 --- a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttClientDeviceGateway.java +++ b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttClientDeviceGateway.java @@ -145,7 +145,7 @@ public class MqttClientDeviceGateway extends AbstractDeviceGateway { new UnknownDeviceMqttClientSession(getId(), mqttClient, monitor), mqttMessage, registry))) - .flatMap(message -> { + .concatMap(message -> { monitor.receivedMessage(); return helper .handleDeviceMessage((DeviceMessage) message, diff --git a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java index 6021961e..da749122 100755 --- a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java +++ b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java @@ -130,13 +130,7 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway { .onErrorResume(err -> { log.error(err.getMessage(), err); return Mono.empty(); - }) - .as(MonoTracer - .create(SpanName.connection(connection.getClientId()), - builder -> { - builder.setAttribute(clientId, connection.getClientId()); - builder.setAttribute(SpanKey.address, connection.getClientAddress().toString()); - })), + }), Integer.MAX_VALUE) .subscribe(); @@ -290,7 +284,7 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway { //网关暂停或者已停止时,则不处理消息 .filter(pb -> isStarted()) //解码收到的mqtt报文 - .flatMap(publishing -> this + .concatMap(publishing -> this .decodeAndHandleMessage(operator, session, publishing, connection) .as(MonoTracer .create(SpanName.upstream(connection.getClientId()), @@ -318,7 +312,7 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway { //解码 .flatMapMany(codec -> codec.decode(FromDeviceMessageContext.of(session, message, registry))) .cast(DeviceMessage.class) - .flatMap(msg -> { + .concatMap(msg -> { //回填deviceId,有的场景协议包不能或者没有解析出deviceId,则直接使用连接对应的设备id进行填充. if (!StringUtils.hasText(msg.getDeviceId())) { msg.thingId(DeviceThingType.device, operator.getDeviceId()); diff --git a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/gateway/device/TcpServerDeviceGateway.java b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/gateway/device/TcpServerDeviceGateway.java index 1a34cb47..118403b9 100755 --- a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/gateway/device/TcpServerDeviceGateway.java +++ b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/gateway/device/TcpServerDeviceGateway.java @@ -158,7 +158,7 @@ class TcpServerDeviceGateway extends AbstractDeviceGateway implements DeviceGate .flatMap(pt -> pt.getMessageCodec(getTransport())) .flatMapMany(codec -> codec.decode(FromDeviceMessageContext.of(sessionRef.get(), message, registry))) .cast(DeviceMessage.class) - .flatMap(msg -> this + .concatMap(msg -> this .handleDeviceMessage(msg) .as(MonoTracer.create( DeviceTracer.SpanName.decode(msg.getDeviceId()),