作者|尼泽RocketMQ-Streams是一款轻量级的流处理引擎,应用以SDK的形式嵌入启动,无需依赖其他组件即可进行流处理计算。可部署1核1G,在资源敏感场景下优势明显。同时支持UTF/UTAF/UTDF多种计算类型。目前已广泛应用于安全、风控、边缘计算等场景。本期将带领大家从源码的角度分析RocketMQ-Streams的构建和数据传输的过程。它还将讨论RocketMQ-Streams如何实现故障恢复和扩展。1.示例代码示例:publicclassRocketMQWindowExample{publicstaticvoidmain(String[]args){DataStreamSourcesource=StreamBuilder.dataStream("namespace","pipeline");source.fromRocketmq("topicName","groupName",false,"namesrvAddr").map(message->JSONObject.parseObject((String)message)).window(TumblingWindow.of(Time.seconds(10)))。groupBy("groupByKey").sum("字段名","输出别名").count("总计").waterMark(5).setLocalStorageOnly(true).toDataSteam().toPrint(1).start();}}pom文件依赖:org.apache.rocketmqrocketmq-streams-clients1.0.1-preview的上面的代码是一个简单的使用示例,它的主要功能是读取RocketMQ中指定主题的数据,转成JSON格式后,按groupByKey字段值分组,10秒一个窗口,累加OutFlow字段值,将结果输出到total字段,打印到控制台。上面的计算还允许输入乱序5秒,即窗口时间到达后不会立即触发,而是会等待5s。如果这段时间内有窗口数据到达,它仍然有效。如果上面的setLocalStorageOnly为true,则表示状态没有被远程存储,本地存储只使用RocksDB。目前的RocketMQ-Streams1.0.1版本仍然使用Mysql作为远程状态存储,下一个版本将使用RocketMQ作为远程状态存储。2.RocketMQ整体架构图RocketMQ-Streams作为一个轻量级的流处理引擎,本质上是作为RocketMQ客户端来消费数据的。一个流处理实例可以处理多个队列,一个队列只能被一个实例消费。多个RocketMQ-Streams实例组成一个消费者组,共同消费数据。通过扩展实例增加处理能力的消耗,减少实例会引起rebalance,消费队列会自动rebalance到其他消费实例。从上图我们也可以看出,计算实例之间不需要直接交换任何数据,所有的计算过程都可以独立完成。这种架构简化了RocketMQ-Streams本身的设计,也非常方便扩缩容实例。处理拓扑处理器拓扑为应用程序定义了流处理过程的计算逻辑,它由一系列处理器节点和数据流向组成。例如在开头的代码示例中,整个处理拓扑由source、map、groupBy、sum、count、print等处理节点组成。有两个特殊的处理节点:源节点没有任何上游节点,从外部读取数据到RocketMQ-Streams,提交给下游处理。sink节点没有任何下游节点,将处理后的数据写入外部。处理拓扑只是流处理代码的逻辑抽象,在流计算启动时会被实例化。为了设计的简单,目前一个流处理实例只有一个计算拓扑。在所有的流处理算子中,有两个比较特殊的算子,一个是涉及数据分组的算子groupBy,另一个是有状态的计算,比如count。这两个算子会影响整个计算拓扑结构的构建。下面将具体分析RocketMQ-Streams是如何处理它们的。groupBy分组操作符groupBy比较特殊,因为在groupBy操作之后,后面的操作符期望对同一个key的数据进行操作。比如groupBy("grade")之后,sum就是按照grades对groups进行求和,这就需要需要将相同“grade”的数据重新路由到一个流计算实例进行处理。如果不这样做,每个实例上得到的结果都是不完整的,整体输出的结果也是错误的。RocketMQ-Streams使用shuffletopic进行处理。具体来说,计算实例将groupBy数据重新发送回RocketMQ的一个topic,发送过程中根据key的hash值选择目标队列,然后从这个topic中读取数据进行后续的流处理。根据keyhash,同一个key一定在一个queue中,一个queue只会被一个流处理实例消费,这样就达到了同一个key路由到一个实例处理的效果。StatefuloperatorsStatefuloperators与statefuloperators相反。如果计算结果只与当前输入有关,与上次输入无关,则为无状态算子。例如filter、map、foreach的结果只与当前输入有关。还有一类算子,其输出结果不仅与当前算子有关,还与上次输入有关,比如sum,需要对一段时间内的输入进行求和。它是一个有状态的操作符。RocketMQ-Streams使用RocksDB作为本地存储,使用Mysql作为远程存储来保存状态数据。他的具体做法是:当发现消息来自一个新的队列时,检查状态是否需要加载,如果需要异步加载状态到RocksDB。当数据到达statefuloperator时,如果加载完成,则使用RocksDB中的状态进行计算,否则使用Mysql中的状态进行计算。计算完成后,将状态数据保存到RocksDB和Mysql中。窗口触发后,从RocksDB中查询状态数据,并将结果传递给下游算子。整体数据流图如下:3.缩放和故障恢复缩放和故障恢复是同一枚硬币的两个面,即同一事物的两种表现形式。如果计算集群能够正确伸缩,就相当于具备了故障恢复能力,反之亦然。通过前面的介绍,我们知道RocketMQ-Streams有很好的扩容和缩容性能。扩容时只需部署新的流计算实例,缩容时停止计算实例。对于无状态计算来说比较简单。展开后,数据计算不需要之前的状态。有状态计算的扩展涉及状态迁移。有状态的扩容和缩容可以用下图表示:当计算实例从3个缩减到2个时,借助RocketMQ的rebalance,MQ会在计算实例之间进行重新分配。Instance1上消费的MQ2和MQ3分配给Instance2和Instance3,这两个MQ的状态数据也需要迁移到Instance2和Instance3,这也暗示了状态数据是按照源数据分片存储的;产能扩张恰到好处的逆过程。在实现上,RocketMQ-Streams使用系统消息触发状态加载和持久化。系统消息类://新建消费队列NewSplitMessage//不消费某个队列RemoveSplitMessage//客户端持久化消费点到MQCheckPointMessage。当发现消息来自一个新的RocketMQ队列(MessageQueue)时,RocketMQ-Streams还没有处理过来自这个队列的消息,就会在数据之前发送一条NewSplitMessage消息,通过处理拓扑的下游算子传递。当statefuloperator收到消息后,会将新加入队列对应的state加载到本地内存RocksDB中。当数据真正到达的时候,按照这个状态继续计算。当由于计算实例的增加或RocketMQ集群的变化,计算实例在rebalance后不再消费某个队列(MessageQueue)时,发送RemoveSplitMessage消息,statefuloperator删除本地RocksDB中的state。CheckPointMessage是一种特殊的系统消息,其作用与exactly-once的实现有关。我们需要在扩容和缩容过程中实现exactly-once,以保证扩容或故障恢复不会影响计算结果。RocketMQ-streams在将消费offset提交给broker之前,会生成CheckPointMessage消息传递给下游topology。它将确保即将提交给消费站点的所有消息都已被接收器处理。开源地址:RocketMQ-Streams仓库地址:https://github.com/apache/rocketmq-streamsRocketMQ仓库地址:https://github.com/apache/rocketmq作者:倪泽,RocketMQ高级贡献者,RocketMQ-Streams维护作者之一,阿里云技术专家。