refactor: 设备接入网关同一个连接上报的消息使用串行处理. (#427)
This commit is contained in:
parent
cb17ae9d8b
commit
8b4ae09cf5
|
|
@ -147,7 +147,7 @@ public class HttpServerDeviceGateway extends AbstractDeviceGateway {
|
|||
.getMessageCodec(DefaultTransport.WebSocket)
|
||||
.flatMapMany(codec -> codec.decode(FromDeviceMessageContext.of(session, msg, registry)))
|
||||
.cast(DeviceMessage.class)
|
||||
.flatMap(deviceMessage -> {
|
||||
.concatMap(deviceMessage -> {
|
||||
monitor.receivedMessage();
|
||||
if (!StringUtils.hasText(deviceMessage.getDeviceId())) {
|
||||
deviceMessage.thingId(DeviceThingType.device, session.getDeviceId());
|
||||
|
|
|
|||
|
|
@ -145,7 +145,7 @@ public class MqttClientDeviceGateway extends AbstractDeviceGateway {
|
|||
new UnknownDeviceMqttClientSession(getId(), mqttClient, monitor),
|
||||
mqttMessage,
|
||||
registry)))
|
||||
.flatMap(message -> {
|
||||
.concatMap(message -> {
|
||||
monitor.receivedMessage();
|
||||
return helper
|
||||
.handleDeviceMessage((DeviceMessage) message,
|
||||
|
|
|
|||
|
|
@ -130,13 +130,7 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway {
|
|||
.onErrorResume(err -> {
|
||||
log.error(err.getMessage(), err);
|
||||
return Mono.empty();
|
||||
})
|
||||
.as(MonoTracer
|
||||
.create(SpanName.connection(connection.getClientId()),
|
||||
builder -> {
|
||||
builder.setAttribute(clientId, connection.getClientId());
|
||||
builder.setAttribute(SpanKey.address, connection.getClientAddress().toString());
|
||||
})),
|
||||
}),
|
||||
Integer.MAX_VALUE)
|
||||
.subscribe();
|
||||
|
||||
|
|
@ -290,7 +284,7 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway {
|
|||
//网关暂停或者已停止时,则不处理消息
|
||||
.filter(pb -> isStarted())
|
||||
//解码收到的mqtt报文
|
||||
.flatMap(publishing -> this
|
||||
.concatMap(publishing -> this
|
||||
.decodeAndHandleMessage(operator, session, publishing, connection)
|
||||
.as(MonoTracer
|
||||
.create(SpanName.upstream(connection.getClientId()),
|
||||
|
|
@ -318,7 +312,7 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway {
|
|||
//解码
|
||||
.flatMapMany(codec -> codec.decode(FromDeviceMessageContext.of(session, message, registry)))
|
||||
.cast(DeviceMessage.class)
|
||||
.flatMap(msg -> {
|
||||
.concatMap(msg -> {
|
||||
//回填deviceId,有的场景协议包不能或者没有解析出deviceId,则直接使用连接对应的设备id进行填充.
|
||||
if (!StringUtils.hasText(msg.getDeviceId())) {
|
||||
msg.thingId(DeviceThingType.device, operator.getDeviceId());
|
||||
|
|
|
|||
|
|
@ -158,7 +158,7 @@ class TcpServerDeviceGateway extends AbstractDeviceGateway implements DeviceGate
|
|||
.flatMap(pt -> pt.getMessageCodec(getTransport()))
|
||||
.flatMapMany(codec -> codec.decode(FromDeviceMessageContext.of(sessionRef.get(), message, registry)))
|
||||
.cast(DeviceMessage.class)
|
||||
.flatMap(msg -> this
|
||||
.concatMap(msg -> this
|
||||
.handleDeviceMessage(msg)
|
||||
.as(MonoTracer.create(
|
||||
DeviceTracer.SpanName.decode(msg.getDeviceId()),
|
||||
|
|
|
|||
Loading…
Reference in New Issue