refactor: 优化设备接入网关
This commit is contained in:
parent
edd1b9aba2
commit
efad62a75b
|
|
@ -31,6 +31,7 @@ import reactor.core.publisher.Mono;
|
|||
import reactor.util.function.Tuple3;
|
||||
import reactor.util.function.Tuples;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.concurrent.atomic.LongAdder;
|
||||
import java.util.function.Function;
|
||||
|
||||
|
|
@ -181,7 +182,10 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway {
|
|||
if (!hasValue) {
|
||||
span.setStatus(StatusCode.ERROR, "device not exists");
|
||||
}
|
||||
span.setAttribute(SpanKey.address, connection.getClientAddress().toString());
|
||||
InetSocketAddress address = connection.getClientAddress();
|
||||
if (address != null) {
|
||||
span.setAttribute(SpanKey.address, address.toString());
|
||||
}
|
||||
span.setAttribute(clientId, connection.getClientId());
|
||||
}))
|
||||
//设备认证错误,拒绝连接
|
||||
|
|
|
|||
|
|
@ -153,7 +153,6 @@ class TcpServerDeviceGateway extends AbstractDeviceGateway implements DeviceGate
|
|||
}
|
||||
|
||||
Mono<Void> handleTcpMessage(TcpMessage message) {
|
||||
long time = System.nanoTime();
|
||||
return getProtocol()
|
||||
.flatMap(pt -> pt.getMessageCodec(getTransport()))
|
||||
.flatMapMany(codec -> codec.decode(FromDeviceMessageContext.of(sessionRef.get(), message, registry)))
|
||||
|
|
@ -162,17 +161,14 @@ class TcpServerDeviceGateway extends AbstractDeviceGateway implements DeviceGate
|
|||
.handleDeviceMessage(msg)
|
||||
.as(MonoTracer.create(
|
||||
DeviceTracer.SpanName.decode(msg.getDeviceId()),
|
||||
builder -> {
|
||||
builder.setAttribute(DeviceTracer.SpanKey.message, msg.toString());
|
||||
builder.setStartTimestamp(time, TimeUnit.NANOSECONDS);
|
||||
})))
|
||||
.doOnEach(ReactiveLogger
|
||||
.onError(err -> log.error("Handle TCP[{}] message failed:\n{}",
|
||||
address,
|
||||
message
|
||||
, err)))
|
||||
|
||||
.onErrorResume((err) -> Mono.fromRunnable(client::reset))
|
||||
builder -> builder.setAttributeLazy(DeviceTracer.SpanKey.message, msg::toString))))
|
||||
.onErrorResume((err) -> {
|
||||
log.error("Handle TCP[{}] message failed:\n{}",
|
||||
address,
|
||||
message
|
||||
, err);
|
||||
return Mono.fromRunnable(client::reset);
|
||||
})
|
||||
.subscribeOn(Schedulers.parallel())
|
||||
.then();
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue