tcp 支持 onClientConnect

This commit is contained in:
zhou-hao 2021-04-26 16:45:33 +08:00
parent 342c59cdb0
commit babc2d7711
3 changed files with 80 additions and 30 deletions

View File

@ -2,6 +2,7 @@ package org.jetlinks.community.network.tcp.client;
import org.jetlinks.community.network.Network;
import org.jetlinks.community.network.tcp.TcpMessage;
import org.jetlinks.core.server.ClientConnection;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -14,7 +15,7 @@ import java.time.Duration;
* @author zhouhao
* @version 1.0
*/
public interface TcpClient extends Network {
public interface TcpClient extends Network, ClientConnection {
/**
* 获取客户端远程地址

View File

@ -12,6 +12,7 @@ import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkType;
import org.jetlinks.community.network.tcp.TcpMessage;
import org.jetlinks.community.network.tcp.parser.PayloadParser;
import org.jetlinks.core.message.codec.EncodedMessage;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
@ -67,6 +68,44 @@ public class VertxTcpClient implements TcpClient {
}
}
@Override
public InetSocketAddress address() {
return getRemoteAddress();
}
@Override
public Mono<Void> sendMessage(EncodedMessage message) {
return Mono
.<Void>create((sink) -> {
if (socket == null) {
sink.error(new SocketException("socket closed"));
return;
}
Buffer buffer = Buffer.buffer(message.getPayload());
socket.write(buffer, r -> {
keepAlive();
if (r.succeeded()) {
sink.success();
} else {
sink.error(r.cause());
}
});
});
}
@Override
public Flux<EncodedMessage> receiveMessage() {
return this
.subscribe()
.cast(EncodedMessage.class);
}
@Override
public void disconnect() {
shutdown();
}
@Override
public boolean isAlive() {
return socket != null && (keepAliveTimeoutMs < 0 || System.currentTimeMillis() - lastKeepAliveTime < keepAliveTimeoutMs);
@ -193,21 +232,8 @@ public class VertxTcpClient implements TcpClient {
@Override
public Mono<Boolean> send(TcpMessage message) {
return Mono.<Boolean>create((sink) -> {
if (socket == null) {
sink.error(new SocketException("socket closed"));
return;
}
Buffer buffer = Buffer.buffer(message.getPayload());
socket.write(buffer, r -> {
keepAlive();
if (r.succeeded()) {
sink.success(true);
} else {
sink.error(r.cause());
}
});
});
return sendMessage(message)
.thenReturn(true);
}
@Override

View File

@ -5,6 +5,8 @@ import lombok.extern.slf4j.Slf4j;
import org.hswebframework.web.logger.ReactiveLogger;
import org.jetlinks.core.ProtocolSupport;
import org.jetlinks.core.ProtocolSupports;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceProductOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.Message;
@ -12,6 +14,7 @@ import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.FromDeviceMessageContext;
import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.server.DeviceGatewayContext;
import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.core.server.session.DeviceSessionManager;
import org.jetlinks.community.gateway.DeviceGateway;
@ -106,7 +109,7 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
}
class TcpConnection {
class TcpConnection implements DeviceGatewayContext {
final TcpClient client;
final AtomicReference<Duration> keepaliveTimeout = new AtomicReference<>();
final AtomicReference<DeviceSession> sessionRef = new AtomicReference<>();
@ -148,17 +151,21 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
}
Mono<Void> accept() {
return client
.subscribe()
.filter(tcp -> started.get())
.publishOn(Schedulers.parallel())
.flatMap(this::handleTcpMessage)
.onErrorResume((err) -> {
log.error(err.getMessage(), err);
client.shutdown();
return Mono.empty();
})
.then()
return getProtocol()
.flatMap(protocol -> protocol.onClientConnect(getTransport(), client, this))
.then(
client
.subscribe()
.filter(tcp -> started.get())
.publishOn(Schedulers.parallel())
.flatMap(this::handleTcpMessage)
.onErrorResume((err) -> {
log.error(err.getMessage(), err);
client.shutdown();
return Mono.empty();
})
.then()
)
.doOnCancel(client::shutdown);
}
@ -169,8 +176,10 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
.cast(DeviceMessage.class)
.doOnNext(msg -> gatewayMonitor.receivedMessage())
.flatMap(this::handleDeviceMessage)
.doOnEach(ReactiveLogger.onError(err -> log
.error("处理TCP[{}]消息失败:\n{}", address, message, err)))
.doOnEach(ReactiveLogger.onError(err -> log.error("处理TCP[{}]消息失败:\n{}",
address,
message
, err)))
.onErrorResume((err) -> Mono.fromRunnable(client::reset))
.then();
}
@ -194,6 +203,20 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
.then();
}
@Override
public Mono<DeviceOperator> getDevice(String deviceId) {
return registry.getDevice(deviceId);
}
@Override
public Mono<DeviceProductOperator> getProduct(String productId) {
return registry.getProduct(productId);
}
@Override
public Mono<Void> onMessage(DeviceMessage message) {
return handleDeviceMessage(message);
}
}