当前位置: 首页 > 科技观察

一篇文章教你如何设计百万级消息推送系统

时间:2023-03-14 10:02:19 科技观察

前言先简单说一下这篇文章的主题。由于最近一直在做物联网相关的开发工作,难免会遇到设备方面的问题。相互影响。最重要的工作是有一个系统来支持设备访问和向设备推送消息;同时,还要满足大量设备接入的需求。因此,本次分享的内容不仅可以满足IoT领域,还可以支持以下场景:基于WEB的聊天系统(点对点、群聊)。WEB应用中需要服务端推送的场景。基于SDK的消息推送平台。技术选型要满足大量连接,同时支持双工通信,保证性能。Java技术栈中的选型自然排除了传统IO。那我不得不选择蔚来。其实这个级别的选择并不多。考虑到社区和数据维护,最终选择了Netty。最终的架构图如下:现在看萌不要紧,下面一一介绍。协议解析既然是消息系统,自然要定义与客户端双方的协议格式。常见的简单的就是HTTP协议,但是我们的一个需求需要是双工的交互方式,同时HTTP更多的是服务于浏览器。我们需要的是一种更精简的协议,可以减少许多不必要的数据传输。因此,我认为最好是在满足业务需求的同时定制自己的私有协议。在我的场景中,实际上有一个标准的物联网协议。在其他场景下,可以借鉴流行的RPC框架,自定义私有协议,让双方的通信更加高效。但是根据这段时间的经验,无论采用哪种方式,都必须在协议中预留一个安全相关的位置。只是讨论了协议相关的内容,介绍了更具体的应用。简单实现,先考虑功能如何实现,再考虑百万连接的情况。注册鉴权在做真正的消息上下行之前首先要考虑的就是鉴权。就像你使用微信一样,第一步肯定是登录,无论你是谁,都可以直接连接平台。所以第一步是注册。如上面架构图中的注册/认证模块。一般来说,客户端需要通过HTTP请求传递一个唯一的标识符。后台认证通过后,会响应一个token,在Redis或DB中维护token与客户端的关系。客户端也在本地保存了这个token,以后每次请求都要带上这个token。一旦令牌过期,客户端需要再次请求令牌。认证通过后,客户端会直接通过TCP长连接连接到图中的push-server模块。这个模块实际上是在处理消息的上下行。保存Channel关系连接连接后,在真正处理业务之前,需要维护当前client与Channel的关系。假设客户端的唯一标识是手机号,需要在一个Map中维护手机号和当前Channel。这个类似于之前的SpringBoot集成的长连接心跳机制。同时,为了通过Channel获取到客户端的唯一标识(手机号),还需要在Channel中设置相应的属性:获取手机号时:这样当我们的客户端去离线,我们可以记录相关的日志:这里有一点注意:存储client和Channel关系的Map最好预先设置好大小(避免频繁扩容),因为它会是使用频率最高的对象那也是占用内存最大的。消息上行下一步就是真正的业务数据上传。一般来说,第一步是确定上传消息输入的业务类型。在聊天场景中,可以上传文字、图片、视频等内容。所以我们要区分,做不同的处理;这与客户端协商的协议有关。消息头中的一个字段可以用来区分。比较简单的是JSON消息,用一个字段来区分不同的消息。不管是什么,只要能分辨出来就行。消息解析与业务解耦消息解析完成后,就可以进行业务处理了,比如写入数据库或者调用其他接口。我们都知道在Netty中处理消息一般是在channelRead()方法中。这里可以解析消息,区分类型。但是如果我们的业务逻辑也写在里面的话,这里的内容会非常庞大??。甚至我们分为几个开发来处理不同的业务,所以会出现很多冲突,维护困难等问题。所以把消息解析和业务处理完全分开是非常有必要的。这就是面向接口编程发挥作用的地方。这里的核心代码与“造轮子”一致——cicada(轻量级WEB框架)。先定义一个处理业务逻辑的接口,然后在解析消息后通过反射创建一个具体的对象来执行处理功能。这样不同的业务,不同的开发者只需要实现这个接口,实现自己的业务逻辑即可。伪代码如下:上行还需要注意一点;因为是基于长连接的,客户端需要周期性的发送心跳包来维护这个连接。同时服务器端也会有相应的检查。在N时间间隔没有收到消息后,它会主动断开连接以节省资源。这可以使用IdleStateHandler来实现。更多内容请参考Netty(一)SpringBoot集成长连接心跳机制。消息下去,就会有起有落。比如聊天场景,两个客户端连接到push-server,直接需要点对点通信。此时的流程是:A向服务器发送消息。服务器收到消息后,知道消息要发给B,需要在内存中找到B的Channel。A的消息通过B的Channel转发。这是一个下降的过程。甚至管理员需要向所有在线用户发送系统通知也是类似的:遍历保存频道关系的Map,一条一条发送消息。这也是之前需要存储在Map中的主要原因。伪代码如下:单机版的分布式方案已经实现,下面重点介绍如何实现百万连接。百万连接其实只是一个形容词。更多的是如何实现一个分布式的方案,可以灵活的水平扩展,支持更多的连接。在做这件事之前,我们首先要搞清楚我们的单机版能支持多少个连接。影响这一点的因素有很多。服务器自己的配置。内存,CPU,网卡,linux支持的最大打开文件数等应用自己的配置,因为Netty本身需要依赖堆外内存,但是JVM本身也需要占用一部分内存,比如作为存储频道关系的大型Map。这个需要根据自己的情况进行调整。结合以上情况,可以测试单个节点最大支持连接数。无论你如何优化单机,都有一个上限,这也是分布式系统解决的主要问题。架构介绍在具体实现之前,首先要说一下上面贴出的整体架构图。先从左边开始。上面提到的注册认证模块也部署在集群中,通过前端Nginx加载。如前所述,它的主要目的是执行身份验证并向客户端返回令牌。但是在push-server集群之后,它又多了一个作用。即返回一个当前客户端可以使用的push-server。右边的平台一般是指管理平台,可以查看当前实时在线人数,向指定客户端推送消息等,推送消息需要经过一个推送路由(push-server)才能找到真正的推送节点。其他中间件如:Redis、Zookeeper、Kafka、MySQL等都是为这些功能准备的,具体实现见下文。注册发现第一个问题是注册发现。当push-server变成多台后,如何为客户端选择一个可用的节点是首先要解决的问题。这块的内容其实在分布式(一)完成服务注册与发现中已经详细讨论过了。所有推送服务器都需要在启动时向Zookeeper注册自己的信息。注册认证模块会订阅Zookeeper中的节点,从而获取到最新的服务列表。结构如下:以下是一些伪代码:应用程序启动并向Zookeeper注册。对于注册认证模块,只需要订阅这个Zookeeper节点即可:既然路由策略可以获取到所有的服务列表,那么如何选择一个恰到好处的push-server供客户端使用呢?这个过程要注意以下几点:尽量保证每个节点的连接是均匀的。添加或删除节点是否做Rebalance。首先,保证平衡有几种算法:轮询。每个节点都被一一分配给客户端。但是会出现新节点分布不均的情况。哈希取模法。类似于HashMap,但是也有轮询的问题。当然也可以像HashMap那样做一个rebalance,让所有的client都重新连接。但是,这会导致所有连接中断并重新连接,这有点昂贵。由于Hash取模方式的问题,带来了一致的Hash算法,但是部分客户端还是需要Rebalance。重量。每个节点的负载可以手动调整,甚至可以自动调整。根据监控,当部分节点负载较高时,会自动降低权重,可以增加负载较低的权重。另一个问题是:当我们重启一些应用程序进行升级时,这个节点上的客户端是如何处理的?由于我们有心跳机制,当心跳失效时,就可以认为是节点有问题了。则需要重新请求注册认证模块获取可用节点。这同样适用于弱网络条件。如果此时客户端正在发送消息,则需要将消息保存在本地,等待新的节点再次发送。Statefulconnections在这种场景下不像HTTP那样是无状态的,我们必须清楚地知道每个client和connection之间的关系。在上面的单机版本中,我们将这个关系保存在了本地缓存中,但是在分布式环境中显然是行不通的。例如,平台向客户端推送消息时,首先要知道客户端的通道存储在哪个节点上。借助我们之前的经验,这样的问题自然需要引入第三方中间件来存储这个关系。即架构图中存储路由关系的Redis,当client访问push-server时,需要在Redis中存储当前client的唯一标识和服务节点的ip+port。同时,当客户端下线时,必须删除Redis中的连接关系。这样,理想情况下,每个节点内存中的map关系之和应该恰好等于Redis中的数据。伪代码如下:这里存放路由关系会有并发问题,最好换成lua脚本。推送路由想象这样一个场景:如果管理员需要向最近注册的客户端推送一条系统消息,他会怎么做?结合架构图,假设这批有100000个客户端。首先,我们需要将这些号码通过平台下的Nginx发送到一个推送路由。为了提高效率,这批号码甚至可以重新分配给各个推送路由。拿到具体号码后,根据号码的个数启动多线程,从之前的路由Redis中获取到client对应的push-server。然后通过HTTP调用push-server发送真正的消息(Netty对HTTP协议也支持的很好)。推送成功后需要将结果更新到数据库,离线客户端可以根据业务再次推送。消息流可能有一些场景,客户端的上行消息非常重要,需要持久化,消息量非常大。在push-server上做生意显然是不合适的。这时候可以选择Kafka进行解耦。把上游的数据全部直接扔到Kafka里,不管了。然后消费者程序会把数据取出来写入数据库。其实这个内容也很值得讨论。可以先看这篇文章了解一下:内存溢出比Disruptor还厉害?后面会讲到Kafka,做一个详细的介绍。分布式问题分布式解决了性能问题,但带来了其他麻烦。应用监控,比如如何知道在线的几十个push-server节点的健康状态?这时候,监控系统就得发挥作用了。我们需要知道每个节点当前的内存使用情况和GC情况。还有操作系统本身的内存占用,毕竟Netty使用了大量的堆外内存。同时需要监控每个节点的当前在线数和Redis中的在线数。理论上这两个数字应该相等。这样也可以知道系统的使用情况,可以灵活维护这些节点的数量。日志处理日志记录也变得极其重要。比如,某天你反映一个客户端无法连接,你需要知道问题出在哪里。最好在每个请求中加上一个traceID来记录日志,这样就可以通过这个日志查看每个节点卡在哪里了。并且必须使用ELK之类的工具。这次的总结是基于我的日常经验。有些陷阱在工作中可能没有踩到,所以会有一些疏漏。从目前来看,搭建一个稳定的推送系统其实比较麻烦。这里面涉及的点很多,只有真正去做了才知道。