背景消息队列在业务解耦、移峰、流量控制、消息广播等场景中有很好的应用,已经成为很多企业IT系统内部通信的重要手段。现有常用的开源消息中间件有RabbitMQ、Kafka、RocketMQ等,但各有不同的应用场景和特点。比如Kafka注重消息的吞吐量,不保证消息存储的可靠性和一致性。因此,常用于日志系统数据的上报;RabbitMQ可以保证消息的可靠存储和传递,但性能较差。CMQ(CloudMessageQueue)是腾讯云开发的高可靠、高可用、高性能的分布式消息队列服务。具有低耦合、消息可靠、强一致性、可扩展性等特点,支持Push/Pull消费。模型、消息回溯、延迟消息、发布订阅、路由广播、消息加密等一系列功能,满足更多mq应用场景。与Kafka相比,CMQ更关注消息可靠性高的应用场景,如金融、交易、订单等业务;与RabbitMQ相比,CMQ在易用性和性能上做了很大的优化和提升。更详细的对比可以参考官网介绍。本文首先简单介绍了CMQ的底层架构,然后结合CMQ的功能特点介绍了CMQ的实践案例,让大家快速了解并上手CMQ开发。底层架构CMQ的整体结构如上图所示。每套由三份broker节点组成,保证消息的可靠存储和高可用,基于raft算法保证数据一致性。CMQ单集保证了CAP理论中的CP优先。当SET中一半以上的节点正常工作时,才能进行消息的生产和消费。实践案例一、广播拉取消息模型CMQ支持队列和主题两种模型,如下图:其中,队列模型是一对一的消息拉取模式,客户端主动拉取消息;主题模型,也称为发布/订阅模式,是一种一对多的消息推送(push)模式。CMQ服务器在广播消息时,会根据各个订阅地址主动推送消息给客户端。两种模式基本可以满足大部分应用场景。对比如下:在队列模型中,客户端可以根据自己的能力灵活消费拉取消息。消息的实时性取决于客户端的消费速度。如果消费速度慢于生产速度,会造成大量拉取消息。消息堆积如山。主题模型中,服务端主动推送消息,消息的实时性比较高,但是要求客户端性能能够及时处理大量推送的消息,当客户端失败,消息可能会丢失(有消息重发策略作为基本保证)。对于主题模型,有以下特殊场景需求:当客户端要根据自己的能力拉取消息并创建订阅时,需要暴露客户端接收消息的地址,但在某些特殊情况下比如企业内网、vpc网络,CMQ无法push到,只能使用pull方式获取消息。针对以上特殊场景,CMQ结合队列和主题两种模型,实现了一对多的广播拉取消息模型,如下图:主题的订阅者可以是队列实例,主题发布消息后,它会自动将消息推送到队列中,然后客户端就可以像使用队列模型一样消费消息了。#pythonsdkdemocode:createsubscriptionofqueueprotocalmy_sub=my_account.get_subscription(topic_name,subscription_name)subscription_meta=SubscriptionMeta()subscription_meta.Endpoint="queue1"subscription_meta.Protocal="queue"my_sub.create(subscription_meta)2.拉长轮询对于Queue模型,消费者它需要拉取消息,但问题是:消费者不知道队列中什么时候有消息,只能不断轮询请求拉取。如果轮询间隔时间短,会消耗消费者请求资源,并在队列中长时间没有消息时消耗资源。效率低下。轮询间隔时间长,消费速度慢,消息实时性低,会堆积大量消息。针对以上问题,CMQ的解决方案是设计一个长轮询功能。例如,假设队列的长轮询时间设置为10s。当消费者拉取消息时,如果队列中有消息,则立即返回。如果队列中没有消息,消费者pullrequest不会立即返回,而是等待阻塞10s:当10s队列中有新的生产消息到达时,CMQ会立即将消息投递给正在阻塞的消费者和等待。消费者感知被阻塞的pullrequest被唤醒,消息返回;当10s内队列没有消息时,request返回告诉消费者当前队列没有消息。#pythonsdkdemocode:receivemessagethroughlongpollingpollingWaitSeconds=3recv_msg=my_queue.receive_message(pollingWaitSeconds)3.延迟消息CMQ提供延迟消息功能:消息发送到队列后,从入队时间开始计算,将消息发送给消费者在设置的延迟时间后,可以看到只有消费者才能消费。延迟消息功能可以轻松实现一些定时任务的应用场景。如上图所示,根据CMQ延时消息功能实现的定时任务查看告警系统。#pythonsdkdemocode:senddelayedmessagemsg_body="Iamdelaymessage"msg=Message(msg_body)delaySeconds=3my_queue.send_message(msg,delaySeconds)4.消息回溯CMQ提供了类似Kafka的消息回溯能力。已经消费和删除的消息可以通过回溯的方式重新消费。目前支持指定回溯时间点,在该时间点可以重新消费被删除的消息。该功能在财务业务对账、业务系统重试等一些场景下非常实用。***回溯时间点=当前时间-设置回溯时间。这个值之前的消息生产时间无法追溯,但是之后可以追溯,如下图:#pythonsdkdemocode:rewindthequeue#backtrackonehourbackTrackingTime=int(time.time())-3600my_queue.rewindQueue(backTrackingTime)5.主题路由匹配CMQ主题模型提供了类似RabbitMQ的消息路由匹配功能,基于消息广播实现消息的自动分发。订阅者可以指定bindingKey,也就是路由规则。如上所示,*(星号)可以匹配一个词,#(井号)可以匹配一个或多个词。例如生产者发布消息,消息的路由键(routingKey)为“quick.orange.elephant”,则消息只会推送给消费者C1;如果routingKey="quick.orange.rabbit",消息将被推送到C1和C2;如果routingKey="lazy.brown.fox",消息只会被推送到C2。#pythonsdkdemocode:settopic-subscriptionroute-rulemy_sub=my_account.get_subscription(topic_name,subscription_name)subscription_meta=SubscriptionMeta()subscription_meta.Endpoint="http://test.com"subscription_meta.Protocal="http"subscription_meta.bindingKey=['*.*.rabbit','lazy.#']my_sub.create(subscription_meta)message=Message()message.msgBody="routemsgtest"my_topic.publish_message(message,'quick.orange.rabbit')六、超大消息传输电流CMQ队列最小消息大小限制为1MB,当消息大小不超过64KB时,发送和接收消息的最大QPS限制为正常的5k(有特殊需求可调整),当消息大小超过64KB但小于1MB,CMQ不保证发送和接收消息的QPS性能。所以支持大于64KB的消息只是考虑业务偶尔传输少量大消息,不想做消息分片的应用场景。一般来说,64KB的消息限制大小基本可以满足大部分业务场景的需求,但是在一些特殊场景下,当消息数据大于64KB甚至大于1MB时,业务和CMQ如何支持传输这样的一条大消息?这里有两种解决方法:1.消息分片。类似于IP数据包分片传输的原理,生产者标记消息分片,分别发送到队列中,消费者从队列中取出所有分片消息进行组装。我个人的方案是:每条消息体分为header和data。其中,data为原始消息分片后的内容,header包含三个标签:业务指定消息的ID号,一条消息唯一记录的ID值,只有相同的消息分片ID号将在消费端重新组合;分片序号(从1开始),记录一个消息分片的序号,消费者根据分片的序号依次组装消息;下一个分片是否存在,如果存在,说明消息包不完整,否则消息组装完成。由于可能有多个消费者客户端,不同的客户端可能会收到不同的片段。为了组装分片,需要一个集中的地方存储所有的分片,最后组装成一个完整的消息包,这无疑大大增加了系统成本。设计复杂性。2、COS代理存储(COS是腾讯云的对象存储服务)。类似于编程中的指针原理,方案如下(具体代码实现见附件):生产者先将超大消息的数据以文件的形式上传到COS,返回COS消息文件的URL地址;生产者将URL地址作为消息发送到CMQ队列中;消费者从CMQ队列中读取消息,判断消息内容是否为COS的URL地址信息,如果是,则根据URL地址从COS下载相应的消息文件,并从文件数据中读取超大消息。7、消息加密传输腾讯云提供KMS密钥管理服务,可以对数据进行安全加密。CMQ消息加密功能有以下两种方案:1.CMQSDK客户端加密方案。客户端发送消息时,调用KMS根据设置的CMK(KMS密钥ID)生成数据密钥接口,返回数据密钥的明文密钥和加密后的密文密钥,并使用明文密钥进行加密消息在本地。然后将加密后的数据和密文密钥作为消息发送给CMQ;消费者收到消息时,首先获取消息中的密文密钥,调用KMS接口解密(不需要每次都调用,可以缓存)得到对应的明文密钥,***即可根据明文密钥在本地解密密文数据。具体代码实现见附件。2、CMQ服务器加密方案。本方案中,CMQ服务器与KMS服务相连。CMQ在用户无感知的情况下自动加解密消息。例如用户通过https接口发送消息,CMQ自动加密存储。当通过https接口接收消息时,CMQ会自动对消息进行加密和解密。解密并返回给用户。此功能正在开发中。结语CMQ更多功能正在开发中,例如死信队列、FIFO序列消息等,欢迎体验:)原文链接:https://cloud.tencent.com/community/article/211497,作者:庄秋涛【本文为专栏作者《腾讯云技术社区》原稿,转载请联系原作者获得授权】点此查看作者更多好文
