当前位置: 首页 > 网络应用技术

如何从头开始写Python MQ客户端

时间:2023-03-06 01:37:56 网络应用技术

  该公司的主要开发语言是Java,算法部门使用的主要语言是Python.algorithm应用程序通常需要订阅业务系统生成的各种消息,但是业务部使用的消息队列是开源QMQ.QMQ.QMQ。本机不提供对Python的支持,因此您需要编写Python QMQ客户端。尽管本文基于QMQ,但其他大多数MQ客户端基本上都是相同的例程,并且是通用的。

  从功能上的角度来看,客户端可以分为两个模块:消费者和生产者。由于算法的要求主要是订阅消息,因此选择首先完成消费者模块,然后完成生产者模块。

  从责任的角度来看,客户可以分为以下四个部分:

  从上面的部分开始,本文一一解释以分析这三个部分:网络,序列化导数,线程控制和交互式逻辑。这可以帮助每个人更好地实现自己的客户。

  网络处理的解释的这一部分主要涉及数据包的处理,对回调的响应,网络连接管理和框架选择。

  在网络编程数据包处理中,我们主要面临两个核心问题:

  此问题意味着客户端向服务器发送请求,并且服务器将对响应响应。但是,在实际客户端中,将发送许多请求,并且服务器将响应许多响应客户端。但是我们要与客户端对请求的客户响应相对应。我们有两种解决此问题的方法:

  首先是发送请求序列化,并且相应的接收响应,请求和响应取决于订单。典型的方式(例如redis使用)(例如发送get get key1,get key2,get key2,get key2,get key3 to redis server to redis服务器),然后REDIS服务器回复是键1的值,键2的值,key3的值。

  第二个是通过请求ID进行映射。此需要定义消息中的字段以存储此请求请求的ID。服务器在此请求的响应消息中响应ID字段,并且该字段等于请求中的ID字段。在此方式,我们可以确定请求响应之间的相应关系:在发送时生成ID请求和地图维护ID <-> 请求之间的关系在接收响应时将ID从响应中取出以获取地图中的请求。QMQ使用第二种方法。在这两种方法中没有绝对优点和缺点,只有选择不同。

  索赔和一半背包是不准确的,因为TCP是定向的,为什么它是粘性和一半的说法。方向是什么意思?我们可以比较河流,并且河中的水是恒定的,而且没有一个明确的限制。但是在上限应用程序中,我们发送一个通常具有边界的数据。例如,我们的MQ在这里,然后我们通常使用消息作为单位,然后发送两条消息发送结束。我们想阅读来自TCP河的两条消息。

  通常有以下方法来解决此问题:

  了解一般解决方案后,让我们看看QMQ如何解决这两个问题?QMQ使用自定义格式的文档协议进行网络传输。自定义消息分为消息标头和消息主体。出于不同的目的,不同目的的要求,人体的编码解码格式是不同的。

  QMQ的消息格式如下:

  前四个字节是消息的总长度,然后两个字节是消息标头的长度(实际上,我们可以发现QMQ的消息头的头大小是固定的,并且没有其他附加标题大小。可能是该协议在协议设计开始时被视为遗留)。

  后四个字节是特定的魔术数字,紫色不透明是消息的ID徽标(即,上一篇文章的请求ID)。QMQ使用此字段来确定响应和请求之间的对应关系。在发送请求之前,使用请求的消息ID将ID标识为键。消息标识符ID使用原子增加并生成它。可以触发异步的对象是值。

  例如,在Java中,作为将来的对象,在Python中也可以是诸如Future或Deffred的对象。在响应到达时付出IDPONSE ID并查询地图。如果该ID的键不存在,它将引发异常和IO错误。否则,将触发与此密钥相对应的值对象。

  同样,我们可以从消息协议中看到,对于粘袋/拆卸袋,QMQ使用The Totalsize字段来分配消息。

  接收TCP数据包时,首先确定当前数据包的总长度是否大于缓冲区中的四个字节,因为我们的消息中的总体化占据了四个字节。我们在总长度中读取该值,以确定当前长度是否确定当前长度比总计更大。如果较少,则意味着当前内容不足以容纳数据包(即SO称为“半包装”)。此时,将所有当前字节内容放入下一个过程的BufferWait中。

  否则,该长度的字节片段将传递给上层(或者在此处以数据报对象封装的网络软件包)。

  连接管理的主要目的是重复使用TCP长连接,以避免在每个发送消息时重新建立连接。在实现条款中,我们可以考虑将TCP连接保存到地图上。目标IP和TCP连接的字符串是关键。如果有此连接时存在此连接,则如果存在此连接,则建立连接。

  由于该地图是在多个线程中访问的,因此有必要考虑线程安全性问题。此外,我们需要注意,我们将被捕获在地图中。当我们需要使用它时,我们必须使用地图中的有效链接。

  什么是有效的链接?实际上,没有直接的方法来确定在应用程序层中是否有效的缓存链接。唯一的方法是发送包裹以查看您是否可以收到响应。如果您可以收到响应,那是有效的。在应用程序中,通常会发送心跳袋来检测链接中链接的有效性。

  简单实施连接管理实施类如下:

  磨刀刀会错误地切割柴火,并选择一个正确的IO框架来帮助我们简单,快速,有效地构建网络层。因此,选择一个与需求一致的最佳网络层框架已成为我们当前的目标。Python中的自定义网络协议框架包括龙卷风,扭曲,异步等。选择以下三个指标时,选择以下三个指标:

  由于我们首先选择的Python版本是2.7,并且Asyncio由3.4支持(如果是Python 3.7,建议使用此模块),因此我们的眼睛都放在龙卷风和Twsited之间。而且两个文档和社区都没有太大不同。在绩效方面,在简单且接受测试之后,龙卷风的包装时间相对较短(平均6%),但与扭曲相比,CPU占40%(此测试很简单是的,是的,不一定准确)。

  考虑到我们不需要为高性能占用更多资源,因此我们选择了扭曲作为网络层框架。

  在特定的实施方面,我们首先需要实现抽象协议类,例如主要管理连接,接收和接收数据包等,然后我们需要实现客户端类别以管理自定义协议对象的创建和生命周期的创建类别。

  序列化和深度化的主要任务是将二进制网络数据包对内部对象进行测序(或将程序内的内部请求对象序列化为二进制网络数据包)。对于具有固定格式的数据包头,我们可以将python的struct模块用于分析它。例如:

  对于消息头的序列化和衍生化,我们只需要注意二进制方向(大端/小端)。

  对于身体,序列化和反对的基本类型类型,我们只需要简单地使用struct api。应该注意的是,字符串的序列化和派生化(python中的str类型)。当序列化时,字符串需要付费特殊注意编码格式。示例如下:

  本部分主要讲述了QMQ客户端的内部线程的设计。QMQ客户端的内部线程可以分为IO线程和业务线程。

  首先,我们讲述了如何分开IO线程和业务线程的工作范围。

  首先,澄清一个原理,不确定或长期执行时间的任务不得放在io线程中!在扭曲的线程中,在其中反应器线程为reactor.run(),以使IO使IO线程独立地,它与用户线程不同,我们需要创建一个单独的线程来执行反应器。run().installSignalHanlders = 0。

  特定代码如下:

  其次,我们将面临IO线程和其他线程的执行继电器问题。这也是线程间隔通信问题。有很多方法。

  在Twisted中,我们可以使用Reactor.callfromthread()方法来完成其他线程和IO线程之间的交互作用。

  此外,如何防止使用不适当使用用户并引起系统资源?或如何优雅地控制客户流量?在开始时,拉动方法没有阻碍,但立即返回了递延的对象(与未来有点相似),但这会引起问题。如果用户多线程回忆起此方法,则将导致客户端流量异常并占用流量并占据职业。资源不断上升。在TL指令后,该接口旨在阻止此接口。目前,您可以适应流量(业务代码处理速度很慢,并且将迅速拉出业务代码处理)。

  最后,如何减少多线程中的线程安全类错误?

  很难处理与线程安全相关的问题,很容易犯错,很难进行调查。因此,我们应该考虑在满足需求时最大程度地减少线程安全考虑。

  因为用户线程不可避免地调用多线程,所以我们可以考虑使用队列在内部维护线程监视以收听此队列,并且每个用户调用都会转换为此队列中的元素,以便我们可以使用用户来处理用户为了使用户将usermulti线程转换为内部单线线程执行。因为它是在单个线程中执行的,因此无需考虑线程安全问题。

  在谈论上述复杂的未知问题之后,我们现在一起组装了不同的部分,最后完成了QMQ的消费者客户。

  本节将主要讲述以下部分:

  这些字符参与QMQ消耗的交互逻辑:Metaserver,Server和Consumer.Metserver是整个交互的控制中心,服务器是实际的存储消息和重新发布消息。Consumer在消费消息中有两次请求:

  从API的角度来看,消费者消耗消息通常分为拉力模式(拉)和推动模式(拉动)。

  拉动模式是指使用拉动API使用消费者的拉动API来拉消息。用户将打开周期并在周期中捡起消息。处理消息 - >拉下一批消息...周期是往复。

  拉力模式下的逻辑如下所示:

  推送模式是指使用消耗API添加消耗API的消耗消息。该方法在消费者中的侦听器中积极触发。在线程池中执行。尽管它被分为拉动模式,并在使用级别下推动模式,但底层的实现是相同的,并且消费者启动消息请求到服务器端。

  听众顶级关系设计如下:

  每个听众均由PullRegister注册。pullregister对象是客户端的唯一对象。它具有puletrymap的属性。根据监视器的监视主题(主题),它创建了一个不同的平行处理对象,以及每个对象的hasdefaultpullentry多少,每个defaultpullentry对象都负责某个服务器上的某个主题的拉动。对于多个服务器,这些服务器分布在不同的计算机中,并且每个DefaultPullentry对象都负责某个机器上的该主题的拉动。

  消息成功处理后,消费者需要ACK消息来表明消息消耗是成功的,并且不再消耗。为了提高效率,QMQ不会记录每个消息的消耗状态,而是使用偏移来记录。例如,消费者收集了1-100条消息,现在前50件的消费成功了,您只需要记录数字50即可表明前50件消费成功。没有顺序。例如,前50个新闻,30新闻尚未成功消费,31-50的消费成功,然后我们只能ACK 1-29。因此,ACK机制类似于滑动窗口。当使用ACK方法标记消息消耗成功时,我们会尝试将窗口向右移动。

  在这一点上,从背景需求到网络的不同部分,衍生物的序列化,线程控制和交互式逻辑,描述了MQ客户端的基本元素,该元素描述了客户端应考虑的问题和简单的问题知识扩展。

  便利蜜蜂基础架构小组的初级实习生从需求分析,QMQ Java客户端的守则熟悉,并且Python解决方案设计和实施已很好地完成。最终进行了反应。

  如果您对相关技术感兴趣,并致力于提高研发效率,欢迎加入我们。