之前分享过一篇文章《设计一个***的消息推送系统》,虽然文中贴出了一些伪代码,但是有朋友希望直接分享一些可以运行的源码,已经这么长的时间来填补这个洞。所以我在之前的基础上改进了一些内容。先来看看这个项目的介绍:CIM(CROSS-IM)是一个面向开发者的IM(即时通讯)系统,提供了一些组件来帮助开发者构建一个可以横向扩展的IM。使用CIM可以实现以下需求:IM即时通讯系统。App的推送消息中间件。物联网海量连接场景下的消息透传中间件。完整的源码托管在GitHub上:https://github.com/crossoverJie/cim这次主要涉及IM即时通讯,所以特意录了两个视频演示(群聊,私聊)。群聊和私聊的架构设计我们来看看具体的架构设计:CIM中的所有组件都是用SpringBoot构建的。使用Netty+GoogleProtocolBuffer构建底层通信。Redis存储了各个客户端的路由信息??、账户信息、在线状态等。Zookeeper用于注册和发现IM服务器服务。整体主要由以下几个模块组成:cim-server、IMserver:用于接收Client连接、消息透传、消息推送等功能。支持集群部署。cim-forward-route,消息路由服务器:用于处理消息路由,消息转发,用户登录,用户下线,以及一些运营工具(获取在线用户数等)。cim-client、IM客户端:用户的消息终端,可以一条命令启动,发起与他人的交流(群聊??、私聊);同时内置了一些常用命令,方便使用。流程图的整体流程比较简单,如下:客户端向Route发起登录。如果登录成功,从Zookeeper中选择可用的im-server返回给客户端,将登录和路由信息保存到Redis中。客户端向im-server发起长连接,成功后保持心跳。当客户端下线时,通过Route清除状态信息。所以我们自己部署的时候,需要如下几个步骤:搭建基础的中间件Redis和Zookeeper。部署cim-server,这是一个真正的IM服务器。为了满足性能需求,它支持水平扩展,只需要注册到同一个Zookeeper。部署cim-forward-route,也就是路由服务器,所有消息都需要经过它。由于它是无状态的,它还可以利用Nginx代理来提高可用性。cim-client是一个真正面向用户的客户端;启动后会自动连接到IM服务器,然后在控制台收发消息。更多使用介绍请参考快速入门。详细设计会关注具体的实现,比如群聊和私聊的消息如何流转;IM服务器负载均衡;如何注册和发现服务等。IM服务器首先看一下服务器;主要实现客户端上下线、发送消息等功能。首先是服务启动:由于是SpringBoot内置的,所以需要在应用启动的时候启动Netty服务。从Pipline可以看出使用了Protobuf的codec(具体消息在客户端解析)。注册发现需要满足IM服务器的水平扩展需求,所以cim-server需要将自己的数据发布到注册中心。因此,应用启动成功后,需要在Zookeeper中注册自己的数据。主要目的是注册当前应用的ip+cim-server-port+http-port。上图是我在demo环境下注册的两个cim-server实例(因为是在一台服务器上,所以端口不一样)。这样客户端(监控Zookeeper节点)就可以实时知道当前可用的服务信息。登录当客户端请求cim-forward-route中的登录接口(详见下文)并完成业务验证(类似日常登录其他网站)时,客户端会向服务器发起长连接,如上流程如图:此时,客户端会发送一条特殊的消息,表示当前的登录信息。服务端收到后需要保存客户端的userID和当前的Channel关系。同时还缓存了用户的信息,即userID和username。Offline当客户端断开连接时,需要清除刚才缓存的信息。同时,还需要调用Route接口清除相关信息(具体接口见下文)。IMRouting从架构图中可以看出,路由层是非常重要的一环;它提供一系列HTTP服务来承接客户端和服务器端。目前主要有以下几个接口:①注册接口由于每个客户端都需要登录才能使用,所以第一步自然是注册。这里的设计比较简单,直接使用Redis存储用户信息;用户信息只有ID和userName。只是为了方便在Redis中查询KV,依次存储一个VK,所以ID和userName都必须是唯一的。②登录界面这里的登录与cim-server中的登录不同,具有业务性质:登录成功后需要判断是否为重复登录(一个用户只能运行一个客户端)。登录成功后,需要从Zookeeper获取服务列表(cim-server),按照一定的算法选择一个服务返回给客户端。登录成功后,还需要保存路由信息,即在Redis中保存当前用户分配的服务实例。为了实现只有一个用户可以登录,使用Redis中的Set来保存登录信息;如果使用userID作为Key,重复登录会写入失败。类似于Java中的HashSet,只能去重保存。获取一个可用的路由实例也比较简单:先从Zookeeper中获取所有的服务实例,做一个内部缓存。轮询选择服务器(目前只有这个算法,以后会添加)。当然,在Zookeeper中获取服务实例之前,需要对cim-server之前注册的节点进行监控。具体代码如下:应用启动后还会监控Zookeeper中的路由节点,一旦有变化就更新内部缓存。这里使用了Guava的Cache,它是基于ConcurrentHashMap的,所以可以保证清除和添加缓存的原子性。③群聊界面这是一个真正发送消息的界面。效果就是其中一个客户端发送消息,其他所有客户端都能收到!流程肯定是client向server发送消息,server接收到上面介绍的SessionSocketHolder中,遍历所有Channel(通道),发送消息。以前用单机服务器还好,现在是集群设计。所以所有的client都会按照之前的轮询算法分配给不同的cim-server实例。因此需要路由层发挥作用:路由接口收到消息后,首先遍历所有客户端和服务实例之间的关系。路由关系在Redis中存储如下:由于Redis的单线程特性,当数据量较大时;一旦使用Keys匹配所有cim-route:*数据,Redis将无法处理其他请求。所以这里改用Scan命令遍历所有cim-route:*。然后,会一一调用每个客户端所在服务器的HTTP接口推送消息。cim-server中的实现是这样的:cim-server收到消息后,会查询内部缓存中userID所在的通道,然后只需要发送一条消息即可。④在线用户界面这是一个辅助界面,可以查询当前在线用户信息。实现也很简单,就是查询之前保存“用户登录状态”的去重集。⑤私聊界面之所以说获取在线用户是辅助界面,其实是用来辅助私聊的。一般使用私聊的前提是知道当前有哪些用户在线,然后你就知道你想和谁私聊了。类似这样:在我们的场景中,私聊的前提是获取在线用户的userID。因此,私聊接口收到消息后,需要查询接收方所在的cim-server实例信息,后续步骤与群聊一致。调用接收方所在实例的HTTP接口进行信息下发。只是群聊遍历所有在线用户,私聊只发一个差。⑥一旦客户端下线,我们需要删除之前存储在Redis中的一些信息(路由信息,登录状态)。IMclient客户端中的一些逻辑其实上面已经讲过了。登录的第一步是登录。启动时需要调用Route的登录接口获取cim-server信息,然后创建连接。在登录过程中,Route接口会判断是否为重复登录,重复登录直接退出程序。接下来就是利用Route接口返回的cim-server实例信息(ip+port)创建连接。最后一步是向服务器发送登录标志消息,以便它维护客户端与Channel之间的关系。自定义协议中上面提到的一些登录消息和真实消息消息,在我们的自定义协议中是可以区分的。既然用的是GoogleProtocolBuffercodec,那我们先看看原始格式。实际上这个协议中目前有三个字段:requestId,可以理解为userId。reqMsg是真正的消息。type是上面提到的消息类别。目前主要有三种,对应不同的业务:Heartbeat为了保持客户端和服务端的连接,需要在每次没有消息发送的时候自动发送心跳。目前的策略是每分钟向服务器发送一个心跳包:这样在没有收到业务消息的时候,服务器每分钟都会收到一个Ping心跳包:内置命令客户端也有一些内置的基本命令,方便使用。例如,键入:q将退出客户端并同时关闭一些系统资源。当你输入:olu(onlineUser的缩写)时,它会调用Route来获取所有在线用户界面。群聊群聊的使用非常简单,只需在控制台输入一条消息,回车即可。这个时候会调用Route的群聊接口。私聊也是如此,但前提是你需要触发关键词;使用用户ID;;消息内容的格式会向用户发送一条消息,所以一般需要使用:olu命令获取所有在线用户才方便使用。消息回调用于满足一些自定义需求,比如需要保存的消息。所以客户端收到消息后,会回调一个接口,可以自定义实现。因此,首先创建一个CallerBean,其中包含一个CustomMsgHandleListener接口,只需要实现这个接口就可以自己处理。自定义接口是因为我不会接口怎么写,但不确定其他大牛能不能写出来。因此,客户端中的群聊、私聊、在线用户获取、消息回调等服务(及后续服务)均以接口的形式提供。后面做页面集成也方便,只需要调整这几个接口即可;你不需要关心具体的实现。综上所述,Cim目前只是***版本,bug多,功能少(只测试了少数群友);不过以后会有所改进,至少这个版本会给那些没有相关经验的人带来一些想法。后续计划:
