From efad62a75b510300c921ea1a01b00e0614e1b8bb Mon Sep 17 00:00:00 2001 From: zhouhao Date: Thu, 2 Nov 2023 14:16:30 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E4=BC=98=E5=8C=96=E8=AE=BE?= =?UTF-8?q?=E5=A4=87=E6=8E=A5=E5=85=A5=E7=BD=91=E5=85=B3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../device/MqttServerDeviceGateway.java | 6 +++++- .../device/TcpServerDeviceGateway.java | 20 ++++++++----------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java index da749122..c55bb986 100755 --- a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java +++ b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java @@ -31,6 +31,7 @@ import reactor.core.publisher.Mono; import reactor.util.function.Tuple3; import reactor.util.function.Tuples; +import java.net.InetSocketAddress; import java.util.concurrent.atomic.LongAdder; import java.util.function.Function; @@ -181,7 +182,10 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway { if (!hasValue) { span.setStatus(StatusCode.ERROR, "device not exists"); } - span.setAttribute(SpanKey.address, connection.getClientAddress().toString()); + InetSocketAddress address = connection.getClientAddress(); + if (address != null) { + span.setAttribute(SpanKey.address, address.toString()); + } span.setAttribute(clientId, connection.getClientId()); })) //设备认证错误,拒绝连接 diff --git a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/gateway/device/TcpServerDeviceGateway.java b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/gateway/device/TcpServerDeviceGateway.java index 118403b9..826ad75a 100755 --- a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/gateway/device/TcpServerDeviceGateway.java +++ b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/gateway/device/TcpServerDeviceGateway.java @@ -153,7 +153,6 @@ class TcpServerDeviceGateway extends AbstractDeviceGateway implements DeviceGate } Mono handleTcpMessage(TcpMessage message) { - long time = System.nanoTime(); return getProtocol() .flatMap(pt -> pt.getMessageCodec(getTransport())) .flatMapMany(codec -> codec.decode(FromDeviceMessageContext.of(sessionRef.get(), message, registry))) @@ -162,17 +161,14 @@ class TcpServerDeviceGateway extends AbstractDeviceGateway implements DeviceGate .handleDeviceMessage(msg) .as(MonoTracer.create( DeviceTracer.SpanName.decode(msg.getDeviceId()), - builder -> { - builder.setAttribute(DeviceTracer.SpanKey.message, msg.toString()); - builder.setStartTimestamp(time, TimeUnit.NANOSECONDS); - }))) - .doOnEach(ReactiveLogger - .onError(err -> log.error("Handle TCP[{}] message failed:\n{}", - address, - message - , err))) - - .onErrorResume((err) -> Mono.fromRunnable(client::reset)) + builder -> builder.setAttributeLazy(DeviceTracer.SpanKey.message, msg::toString)))) + .onErrorResume((err) -> { + log.error("Handle TCP[{}] message failed:\n{}", + address, + message + , err); + return Mono.fromRunnable(client::reset); + }) .subscribeOn(Schedulers.parallel()) .then(); }