当前位置: 首页 > 科技观察

基于Kotlin实现一个简单的TCP自定义协议

时间:2023-03-16 01:58:56 科技观察

一、开发背景要成为一名优秀的Android开发者,你需要一个完整的知识体系。在这里,让我们一起成长,成为我们想要的样子~.我们的项目需要开发智能硬件。它将Web后台的指令发送到桌面应用程序,然后桌面程序控制不同的硬件设备来实现业务操作。从web后台到桌面端是通过WebSocket长链接维护的,桌面程序到各个硬件设备也是通过TCP长链接维护的。本文所描述的其实是桌面程序与各种硬件的通信。2.自定义通信协议首先,需要设计一个通用的TCP网络协议。网络协议结构如下+------------+------------+-------------+---------------+------------+--------+|幻数(4)|版本(1)|序列化方式(1)|命令(1)|数据长度(4)|数据(n)|+------------+----------------+------------+----------------+------------+----------+Magicnumber:4bytes,本项目中使用20200803(写这篇文章的日期是今天),为了防止端口被误调用,我们在接收到后取前4个字节和magicnumbermessage编号比较,如果不相同,直接拒绝并关闭连接。版本号:1字节,仅表示协议的版本号,方便协议升级。序列化方式:1字节,表示Java对象如何转化为二进制数据,如何反序列化。Command:1字节,表示消息的意图(比如拍照、拍视频、心跳、app升级等)。最多支持2^8条指令。数据长度:4字节,表示该字段之后的数据部分的长度。最多支持2^32位。数据:具体数据的内容。根据上面设计的网络协议,定义一个抽象类Packet:abstractclassPacket{varmagic:Int?=MAGIC_NUMBER//magicnumbervarversion:Byte=1//版本号,当前协议的版本号为1abstractvalserializeMethod:Byte//序列化methodabstractvalcommand:Byte//Watcher和App之间的通信命令}有多少命令,就需要定义多少Packet。下面以heartbeatPacket为例,定义一个HeartBeatPacket:TCP客户端,由TCP服务器接收并返回给客户端。每个Packet类都包含Packet使用的序列化方法。/***Serializationconstantlist*/interfaceSerialize{companionobject{constvalJSON:Byte=0}}每个Packet也包含了对应的command。下面是Commands是一个指令集,支持256条指令。/***指令集,支持-128到127共256条指令*/interfaceCommands{companionobject{/***心跳包*/constvalHEART_BEAT:Byte=0/***login(App需要告诉Watcher:positionofcameraPosition)*/constvalLOGIN:Byte=1......}}由于使用自定义协议,必须要有数据包的编码和解码,这些事情由PacketManager负责。编码时,根据协议的结构组装消息。同样,解码是它的逆过程。/***消息的管理类,对消息进行编解码*/objectPacketManager{funencode(packet:Packet):ByteBuf=encode(ByteBufAllocator.DEFAULT,packet)funencode(alloc:ByteBufAllocator,packet:Packet)=encode(alloc.ioBuffer(),packet)funencode(buf:ByteBuf,packet:Packet):ByteBuf{valserializer=SerializerFactory.getSerializer(packet.serializeMethod)valbytes:ByteArray=serializer.serialize(packet)//汇编消息:幻数(4bytes)+版本号(1byte)+序列化方式(1byte)+指令(1byte)+数据长度(4bytes)+数据(Nbytes)buf.writeInt(MAGIC_NUMBER)buf.writeByte(packet.version.toInt())buf.writeByte(packet.serializeMethod.toInt())buf.writeByte(packet.command.toInt())buf.writeInt(bytes.size)buf.writeBytes(bytes)returnbuf}fundecode(buf:ByteBuf):Packet{buf.skipBytes(4)//幻数通过单独的Handler验证buf.skipBytes(1)valserializationMethod=buf.readByte()valserializer=SerializerFactory.getSerializer(serializationMethod)valcommand=buf.readByte()valclazz=PacketFactory.getPacket(command)vallength=buf.readInt()//数据的长度valbytes=ByteArray(length)//定义要读取的字符数组buf.readBytes(bytes)returnsserializer.deserialize(clazz,bytes)}}三。TCP服务端启动TCP服务的方法funexecute(){boss=NioEventLoopGroup()worker=NioEventLoopGroup()valbootstrap=ServerBootstrap()bootstrap.group(boss,worker).channel(NioServerSocketChannel::class.java).option(ChannelOption.SO_BACKLOG,100).childOption(ChannelOption.SO_KEEPALIVE,true).childOption(ChannelOption.SO_REUSEADDR,true).childOption(ChannelOption.TCP_NODELAY,true).childHandler(object:ChannelInitializer(){@Throws(异常::类}})valfuture:ChannelFuture=bootstrap.bind(TCP_PORT)future.addListener(object:ChannelFutureListener{@Throws(Exception::class)overridefunoperationComplete(channelFuture:ChannelFuture){if(channelFuture.isSuccess){logInfo(logger,"TCPServerisstarting...")}else{logError(logger,channelFuture.cause(),"TCPServerfailed")}}})}其中,ServerIdleHandler:表示没有收到心跳在5分钟内,然后断开连接{logInfo(logger){ctx.channel().close()"如果在$HERT_BEAT_TIME秒内没有收到心跳,将断开连接"}}companionobject{privateconstvalHERT_BEAT_TIME=300}}MagicNumValidator:用于TCP的幻数校验数据包。classMagicNumValidator:LengthFieldBasedFrameDecoder(Int.MAX_VALUE,LENGTH_FIELD_OFFSET,LENGTH_FIELD_LENGTH){privatevallogger:Logger=LoggerFactory.getLogger(this.javaClass)@Throws(Exception::class)overridefundecode(ctx:ChannelHandlerContext,`in`:ByteBuf?{任何(`in`.getInt(`in`.readerIndex())!==MAGIC_NUMBER){//幻数验证失败,则关闭连接logInfo(logger,"幻数验证失败")ctx.channel().close()returnnull}returnsuper.decode(ctx,`in`)}companionobject{privateconstvalLENGTH_FIELD_OFFSET=7privateconstvalLENGTH_FIELD_LENGTH=4}}PacketCodecHandler:解析数据包的Handler,PacketCodecHandler继承自ByteToMessageCodec,用于处理byte-to-message和message-to-byte,很容易将字节消息解码成POJO或将POJO消息编码成字节。@ChannelHandler.SharableobjectPacketCodecHandler:MessageToMessageCodec(){overridefunencode(ctx:ChannelHandlerContext,msg:Packet,list:MutableList){valbyteBuf=ctx.channel().alloc().ioBuffer()PacketManager.encode(byteBuf,msg)list.add(byteBuf)}overridedefundecode(ctx:ChannelHandlerContext,msg:ByteBuf,list:MutableList){list.add(PacketManager.decode(msg));}}HeartBeatHandler:心跳处理器,接收从TCP客户端“ping”,并将“pong”返回给客户端。@ChannelHandler.SharableobjectHeartBeatHandler:SimpleChannelInboundHandler(){privatevallogger:Logger=LoggerFactory.getLogger(this.javaClass)overridefunchannelRead0(ctx:ChannelHandlerContext,msg:HeartBeatPacket){logInfo(logger,"收到心跳包:${toJsonUtils.(msg))}")msg.msg="pong"//返回pong给客户端ctx.writeAndFlush(msg)}}ResponseHandler:一个通用的Handler,处理从TCP客户端接收指令,并可以根据相应的指令去查询相应的处理程序并处理其命令。objectResponseHandler:SimpleChannelInboundHandler(){privatevallogger:Logger=LoggerFactory.getLogger(this.javaClass)privatevalhandlerMap:ConcurrentHashMap>=ConcurrentHashMap()init{handlerMap[LOGIN]=LoginHandler……handlerMap[ERROR]=ErrorHandler}overridefunchannelRead0(ctx:ChannelHandlerContext,msg:Packet){logInfo(logger,"收到来自客户端的命令:${msg.command}")valhandler:SimpleChannelInboundHandler?=handlerMap[msg.command]handler?.let{logInfo(logger,"找到响应命令的Handler:${it.javaClass.simpleName}")it.channelRead(ctx,msg)}?:logInfo(logger,"TheHandler未找到响应命令的")}@Throws(Exception::class)overridefunchannelInactive(ctx:ChannelHandlerContext){valinsocket=ctx.channel().remoteAddress()asInetSocketAddressvalclientIP=insocket.address.hostAddressvalclientPort=insocket.portlogError(logger"客户端掉线:$clientIP:$clientPort")super.channelInactive(ctx)}}四。客户端模拟一个客户端的实际valtopLevelClass=object:Any(){}.javaClass.enclosingClassvallogger:Logger=LoggerFactory.getLogger(topLevelClass)funmain(){valworker=NioEventLoopGroup()valbootstrap=Bootstrap()bootstrap.group(worker).channel(NioSocketChannel::class.java).handler(object:ChannelInitializer(){@Throws(Exception::class)overridefuninitChannel(channel:SocketChannel){channel.pipeline().addLast(PacketCodecHandler)channel.pipeline().addLast(ClientIdleHandler())channel.pipeline().addLast(ClientLogin())}})valfuture:ChannelFuture=bootstrap.connect("127.0.0.1",TCP_PORT).addListener(object:ChannelFutureListener{@Throws(异常::class)overridefunoperationComplete(channelFuture:ChannelFuture){if(channelFuture.isSuccess()){logInfo(logger,"connecttoserversuccess!")}else{logger.info("failedtoconnecttheserver!")System.exit(0)}}})try{future.channel().closeFuture().sync()logInfo(logger,"与服务端断开连接!")}catch(e:InterruptedException){e.printStackTrace()}}其中,PacketCodecHandler与服务端用来解析数据包的Handler相同。ClientIdleHandler:客户端实现心跳,每30秒发送一次心跳。classClientIdleHandler:IdleStateHandler(0,0,HEART_BEAT_TIME){privatevallogger=LoggerFactory.getLogger(ClientIdleHandler::class.java)@Throws(Exception::class)overridefunchannelIdle(ctx:ChannelHandlerContext,evt:IdleStateEvent?){logInfo(logger,"发送Heartbeat....")ctx.writeAndFlush(HeartBeatPacket())}companionobject{privateconstvalHEART_BEAT_TIME=30}}ClientLogin:登录服务器的Handler。@ChannelHandler.SharableclassClientLogin:ChannelInboundHandlerAdapter(){privatevallogger:Logger=LoggerFactory.getLogger(this.javaClass)@Throws(Exception::class)overridefunchannelActive(ctx:ChannelHandlerContext){valpacket:LoginPacket=LoginPacket()logInfo(logger,"packet=${GsonUtils.toJson(packet)}")valbyteBuf=PacketManager.encode(packet)ctx.channel().writeAndFlush(byteBuf)}}5.这次总结一下,我开发的桌面程序逻辑其实不是复杂的。需要从网页后台接收指令,然后与各个设备进行交互。Web端接收到命令后,通过TCP通过Guava的EventBus将命令发送给各个设备,发送时需要转换成对应的Packet。因此,核心模块就是这个TCP自定义协议。

最新推荐
猜你喜欢