本文作者:徐一斌,阿里云智能高级研发工程师。为什么需要schemaRocketMQ目前对消息体没有任何数据格式的限制。它可以是JSON,一个对象toString,或者只是一个词或一条日志。序列化和反序列化过程完全留给用户。业务上下游也需要对消息体的理解达成一致,才能基于RocketMQ进行通信。上述情况会导致两个问题。首先,类型安全问题。如果生产者或消费者来自完全不同的团队,上游对数据格式进行微小但不兼容的更改,可能会导致下游无法正常处理数据,恢复缓慢。其次,存在应用扩展的问题。对于研发场景,RocketMQ虽然在链路上实现了解耦,但是研发阶段的上下游仍然需要做大量基于消息理解的沟通和联调。耦合性还是很强的,生产端的重构也需要和消费端一起改。.对于数据流场景,如果没有schema定义,每次构建ETL都需要重写整个数据解析逻辑。RocketMQschema为消息提供了数据结构托管服务,同时也为原生客户端提供了丰富的序列化/反序列化SDK,包括Avro、JSON、PB等,补足了RocketMQ在数据治理和业务上下游解耦方面的短板。如上图所示,在商业版Kafka上创建topic时,会提示维护topic相关的schema。如果维护好schema,业务上下游看到topic时,可以清楚的知道需要传入哪些数据,有效提高研发效率。我们希望RocketMQ不仅可以面向App业务场景,还可以面向物联网微信场景,以及大数据场景,成为整个企业的业务中枢。加入RSQLDB后,用户可以使用SQL来分析RocketMQ数据。RocketMQ既可以作为具有管道流量特性的通信管道,也可以作为数据沉淀,即具有数据库特性。如果RocketMQ要同时向流引擎和DB引擎靠拢,那么它的数据定义、规范和治理就变得极其重要。面对业务消息场景,我们希望RocketMQ在加入schema后有以下优势:①数据治理:避免产生脏消息数据,防止生产者产生格式不规范的消息。②提高研发效率:降低业务上下游研发阶段或联调阶段的沟通成本。③托管“合约”:合约托管后,真正意义上实现上下游业务的解耦。④提高整个系统的健壮性:避免下游突然无法解析等数据异常。面对流式场景,我们期望RocketMQ有以下优势:①数据治理:能够保证整个链路数据分析的顺畅。②提高传输效率:schema独立托管,不依附于数据,提高了整个链路传输的效率。③促进message-flow-table的融合,topic可以变成动态表。④支持更丰富的序列化方式,节省消息存储成本。目前大部分业务场景使用JSON解析数据,大数据场景常用的Avro方式可以节省消息存储成本。整体架构引入SchemaRegistry后的整体架构如上图所示。在原有的核心生产者、代理者和消费者架构下,引入了SchemaRegistry来承载消息体的数据结构。下层是schema管理API,包括创建、更新、删除、绑定等。在与producer和consumer的交互中,producer会在发送给broker之前做序列化。序列化时,元数据将被查询到注册然后被解析。在消费者端,可以根据ID和topic进行查询,然后反序列化。RocketMQ用户只需要关心发送和接收消息时的结构,不需要关心数据如何序列化和反序列化。服务器端SchemaRegistry的部署方式与NameServer类似,与broker分开部署。因此,代理不必严重依赖SchemaRegistry。它采用无状态部署模式,可以动态扩展和收缩。在持久化方面,默认使用CompactTopic5.0的新特性,用户也可以自行实现存储插件,比如基于MySQL或者Git。管理接口提供Restful接口,用于增删改查,还支持schema与多个topic的绑定/解绑定。应用启动后,提供了自己的SwaggerUI,用于交互版本演化,提供了SchemaName维度的版本演化和相应的兼容性检查,支持七种兼容性策略。在元信息方面,每个模式版本都会向用户公开一个全球唯一的RecordID。获得RecordID后,用户可以去注册表中查找唯一的schema版本。代码设计如上图。主要针对springboot应用,暴露restful接口。Controller下面是Service层,涉及权限校验、jar包管理、StoreManager,其中StoreManager包括本地缓存和远程持久化。SchemaRegistry的核心概念与RocketMQ核心保持一致。比如registry有cluster的概念,对应内核中的cluster,tenant对应NameSpace的概念,subject对应内核中的topic。每个模式都有一个唯一的名称SchemaName。用户可以使用自己应用的Java类名或者全路径名作为SchemaName,保证全局唯一,可以绑定到主题上。每个模式都有一个唯一的ID,它由服务器端的雪花算法生成。SchemaVersion的每次更新都不会改变ID,而是会产生一个单调递增的版本号,所以一个schema可以有多个不同的版本。ID和版本叠加生成一个新的概念记录ID,暴露给用户唯一定位某个schema版本。SchemaType包括Avro、Json、Protobuf等常见的序列化类型,IDL用于具体描述schema的结构化信息。每个schema都有一个ID,ID不变,但是可以有版本迭代,比如从版本1到版本2再到版本3,每个版本都支持绑定不同的subject。Subject可以大致理解为Flink表。比如右图是使用FlinkSQL建表,先创建一个RocketMQtopic,注册到NameServer。因为有表结构,所以需要创建一个schema,注册到subject上。因此引入schema后,可以无缝兼容Flink等数据引擎。Schema主要存储以下几类信息。元信息:包括类型、名称、ID、属性和兼容性。每个版本的具体内容:包括版本号、IDL、IDL中的字段、jar包信息、绑定主题。命名信息:包括集群、租户、主题。审计信息。保留属性。具体的存储设计分为三层。客户端缓存:如果生产者消费者每次发送和接收消息都与注册中心进行交互,会极大地影响性能和稳定性。因此,RocketMQ实现了一层缓存,schema更新频率比较低,缓存可以满足大部分的消息收发请求。服务端缓存:通过RocksDB做一层缓存。得益于RocksDB,服务重启和升级都不会影响到自己的数据。服务器端持久化:通过插件实现远程存储,利用RocketMQ5.0的紧凑主题特性,本身可以支持KV存储的形式。远程持久化和本地缓存同步通过registey的PushConsumer进行监控和同步。目前SchemaRegistry支持7种兼容性策略。默认是backward,小米内部实践也验证了默认策略基本够用。验证方向为consumer兼容producer,即schema演化后,需要先升级consumer,高版本的consumer可以兼容低版本的producer。如果兼容性策略是backward_transative,它可以兼容所有版本的生产者。界面设计遵循OpenSchema标准。启动registry服务后,只需要访问本地主机的swaggerUI页面即可发起http请求,自行管理schema。客户端设计在发送和接收消息的过程中,客户端需要提供SDK,用于消息的schema查询和序列化、反序列化。如上图所示,以往用户发送时传递的是字节数组,接收时也是字节数组。现在我们希望发送者关心一个对象,而消费者关心一个对象。如果消费者不感知对象属于什么类,也可以通过generaterecord等通用类型了解消息。因此,用户视角发送和接收类似于公共类Order的结构化数据。Producer还可以支持schema的自动创建和更新,也支持Avro、JSON等主流的序列化方式。设计原则是不侵入原有客户端代码,不使用schema完全不影响消息收发。用户不感知架构,但感知序列化和反序列化类型。并且支持按最新版本解析,序列化过程中按指定ID解析。此外,为了满足流等轻量级场景,还支持无SchemaRegistry的消息解析。上面的代码就是schema核心API的序列化和反序列化。参数很简单,只要传入主题和原始消息对象,就可以序列化成消息体格式。反序列化也是如此,传入主体和原始字节数组,即可解析出对象传递给用户。上图是一个producer整合schema后的例子。创建生产者需要传入registryURL和序列化类型。发送时传入的不是字节数组,而是原始对象。创建消费者时,需要指定注册表URL和序列化类型,然后直接通过getMessage方法获取泛型或实际对象。ETL场景下的RocketMQflinkcatlog主要用于描述RocketMQFlink的Table、Database等元数据,所以基于SchemaRegistry实现时需要自然对齐一些概念。比如catalog对应cluster,database对应Tenant,subject对应table。在异构数据源的转换过程中,一个很重要的环节就是如何对异构数据源的schema进行转换,这就涉及到了converter。ConnectRecord会将数据和模式放在一起进行传输。如果转换器依赖注册中心作为schema的第三方托管,ConnectRecord就不需要将原始数据和schema放在一起,传输效率会提高。这也是connect集成SchemaRegistry的起点。融入RocketMQstreams场景的出发点是希望RocketMQstreamsAPI的使用能够更加友好。未集成schema时,用户需要主动将数据转成JSON。集成后,接近Flink或流的使用习惯,在流分析时可以直接通过对象进行操作,更加人性化。上面代码中新增参数schemaConfig,用于配置schema,包括序列化类型、目标java类,后续的filter、map、window算子的计算都可以基于对象操作,非常方便。此外,集成流目前可以支持基本类型解析、消息本身的分组操作以及自定义反序列化优化器。后续规划未来,我们将继续完善以下成果。一、社区SIG发展:群刚刚经历了从0到1的建设,还有很多todolists还没有落地,也有很多不错的firstissues适合社区新人尝试。二是强化表格观念。RocketMQ要想向流式引擎靠拢,就需要不断强化表的概念。因此,schema的引入是一个很好的契机,将RocketMQ的主题概念升级为表的概念,促进消息与流表的深度融合。第三,无服务器架构管理。引入registry组件后,增加了一定的外部组件依赖。所以一些强调轻量级的场景还是想做无服务器模式管理。比如直接和RocketMQ交互,把信息持久化到一个紧凑的topic,做直读,直写,或者基于Git存储。四、栏目查询。集成到流中后,我们发现我们可以按字段消费和理解消息。目前的RocketMQ消息是按行理解的,解析计算的时候需要消耗整个消息体。Streams目前是按字段消费消息,未来有望实现按条件按字段查询消息,将RocketMQ改造为查询引擎。第五,数据沿袭/数据地图。当RocketMQ通过分层存储等特性来延长消息的生命周期时,可以将其视为企业的数据资产。目前的痛点是在RocketMQ提供的dashboard上,业务人员很难感知topic背后的业务语义。如果做好了数据亲缘关系,明确了数据主题的上下游关系,比如谁在生产数据,提供了哪些字段,提供了哪些信息,整个dashboard可以从新闻的角度提供一个业务概览,这其实有很大的想象空间。加入ApacheRocketMQ社区,十年磨一剑。ApacheRocketMQ的成长离不开全球近500名开发者的积极参与和贡献。相信你会在下一个版本中成为ApacheRocketMQ的贡献者。在社区中,你不仅可以结识社区领袖,提升技术,还可以提升个人影响力,促进自我成长。社区5.0版本开发如火如荼,近30个SIG(兴趣小组)等你加入。欢迎立志打造世界一流分布式系统的同学加入社区。添加社区开发者:rocketmq666入群,共同参与构建下一代消息、事件、流的综合处理平台。
