攻略:在3月6日的TGIP-CN直播中,我们邀请了StreamNative高级工程师鲁能,分享了PulsarFunctionMesh的功能和特点。以下是鲁能分享的视频精简版,供大家参考。今天很高兴和大家分享StreamNative基于PulsarFunction的新作:FunctionMesh。它的整个核心思想就是将一些复杂的、分离的、单独管理的功能统一管理起来,原生集成到Kubernetes中。并且可以充分利用它的各种功能和调度算法。Pulsar中的数据处理首先看一下Pulsar中支持的各种数据处理模块和方法,主要分为三个方面。首先,它是基于Presto的交互式查询。Pulsar有自己的PulsarSQL,基于Presto查询整个Pulsar集群;有一个和Presto相关的Connector,可以直接通过Presto集群查询topic。其次,作为消息队列和消息处理数据的核心,Pulsar可以对接各种流数据或批数据处理框架,如Flink、Spark、Hive等。未来我们会发布完整的Pulsar与FlinkSQL集成的解决方案。最后,Pulsar有一个内置的PulsarFunction。核心思想是提供最简单的API,让用户可以轻松处理在Pulsar中流动的数据。概括起来,PulsarFunction是一个轻量级的数据处理过程,主要执行以下操作:消费来自一个或多个Pulsar主题的消息;将用户提供的处理逻辑应用于每条消息;将结果发布到Pulsar主题。什么是Pulsar函数?上文提到的轻量级数据处理流程PulsarFunction如图所示。用户可以输入多个主题,每个输入的主题都可以向用户自定义的PulsarFunction发送数据。PulsarFunction的处理单元处理完后,将结果发送到唯一的OutputPulsartopic,可以记录一些辅助topic。或信息收集。PulsarFunction并不是一个完整的流处理框架。它不像Flink那样提供很多保证,也不是一个计算抽象层。它主要是和Pulsar紧密结合来处理计算任务。它的部署非常简单,不需要额外搭建和管理任何集群。你只需要在Pulsar配置文件中开启对Functions的支持,就可以将Functions提交到已有的Pulsar集群中。用户可以直接在集群中处理数据,无需额外维护另外一个集群进行对接和处理。PulsarFunction常见的应用场景,比如专注于ETL数据清洗的任务,实时数据聚合...由于Function本身其实是一个很笼统的抽象,它只是一个应用函数,所以也可以应用到微服务场景.在Function应用的函数中,可以调用任意API进行事件路由等操作,用户可以使用PulsarFunction将数据分发到不同的集群。如何实现PulsarFunction上图展示了PulsarFunction的API。PulsarFunction支持三种语言进行数据处理:Java、Python、Golang。PulsarFunction支持三种语义:至多一次(At-mostonce):不关心消息是否发送成功,不需要消息发送的返回值;至少一次(At-leastonce):发送的消息在没有收到返回值时会重新发送,以保证消息不会丢失,否则可能会造成消息重复。消费消息时,需要对消息进行幂等操作;恰好一次(Exactlyonce):保证消息不会丢失,不会重复。PulsarFunction自带简单的内置状态管理,分为三种类型:提供Context对象来支持用户可访问的状态;在BookKeeper中存储状态;支持服务器端操作(例如计数器)。除了Input,之前介绍的API还有Context的参数,很多状态都是在Context中管理的。publicclassWordCountFunctionimplementsFunction{@OverridepublicVoidprocess(Stringinput,Contextcontext)throwsException{Arrays.aslist(input.split(“\.”)).forEach(word->context.incrCounter(word,1));}}如何部署FunctionPulsarFunction的CLI可以进行创建、删除、更新、获取、重启、关闭、打开等一系列操作。$./pulsar—adminfunctions用法:pulsar—adminfunctions[options][command][commandoptions]命令:localrun在本地运行一个PulsarFunction,而不是部署到Pulsar集群)create在集群模式下创建一个PulsarFunction(将其部署在Pulsar上)cluster)delete删除运行在Pulsar集群上的PulsarFunctionupdate更新已部署到Pulsar集群的PulsarFunction获取有关PulsarFunction的信息restartRestartfunctioninstancestopStopsfunctioninstancestart启动已停止的函数instancestatus检查PulsarFunctionstats的当前状态获取PulsarFunctionlist的当前状态列出在特定租户和命名空间下运行的所有PulsarFunctionsquerystate获取与PulsarFunction关联的当前状态putstate将与PulsarFunction关联的状态trigger触发associatedspecifiedPulsarwithasuppliedvaluePulsarFunction的特点PulsarFunction具有以下特点:开发高效:API简单,学习不费力,支持多种语言;运维方便:与Pulsar完全集成,无需额外的系统/服务设置;易于排错:在本地运行时非常方便,日志主题简单易用。FunctionMesh详解上面已经介绍了PulsarFunction,这部分将深入介绍FunctionMesh。什么是FunctionMeshFunctionMesh是函数的集合,可以让多个函数协调完成数据处理目标,每个函数都有自己明确的任务和定义的阶段。特别强调的是,FunctionMesh的初衷并不是要取代Flink或成为Flink的竞争对手,而是对现有流数据处理引擎的补充和支持。下图是FunctionMesh的经典视图:如上图所示,在FunctionMesh之前,我们使用了单个PulsarFunction。引入FunctionMesh后,多个功能之间有了关联和数据连接,最终产生想要的结果,可以映射到微服务的不同场景中。FunctionMesh实现方案FunctionMesh设计实现方案一:基于Pulsar,Pulsar目前提供了一个命令行工具,可以用来管理单个函数。如上例所示,需要在Pulsar命令行工具中启动功能1到功能6。带来管理上的重复和复杂性;同时,Pulsar会将上述多个功能视为一个单一的功能,难以跟踪功能并将它们视为一个组合;也不可能很好地了解每个功能的上下游和处理顺序。面对上述问题,我们提出解决方案。详情请参考PIP-66[[1]](#)。PIP-66的主要思想是在Pulsar中提供对FunctionMesh的原生支持,即通过Pulsar命令行提交FunctionMesh,并在FunctionMeshYAML中定义每一个包含的函数参数和组织关系、输入输出配置文件源等bin/pulsar-adminfunction-meshcreate-fmesh.yaml//CreateFunctionMeshexamplecommand//YAMLconfigurationdemo#Metadataname:PIP_Meshnamespace:PIP_Namespacetenant:PIP_Tenant#FunctionMeshconfigsjarFile:/local/jar/files/example.jar#FunctionsfunctionInfos:-name:Func1classname:org.apache.pulsar.functions.api.examples.ExclamationFunctionreplicas:1inputs:-pulsar_topic_sourceoutput:-pulsar_topic_1-name:Func2classname:org.apache.pulsar.functions.api.ex.ExclamationFunctionreplicas:1inputs:-pulsar_topic_1output:-pulsar_topic_result下图是基于以上思路生成的FunctionMesh调度方案,最大限度的利用现有的PulsarFunction调度机制,达到设计目标FunctionMesh,当然也引入了FunctionMeshManager来管理FunctionMesh的元数据。FunctionMesh设计实现方案二:基于Kubernetes随着整体项目和相关云项目的推进,我们发现基于Kubernetes实现FunctionMesh是非常有意义和有价值的。创建FunctionMesh,用户可以直接使用Kubernetes命令行工具来创建(下面的演示命令),而我们需要做的是开发一个CRD,类型是FunctionMesh,关系基本一样如解决方案1中的设置:$kubectlapply-ffunction-mesh.yaml…apiVersion:cloud.streamnative.io/v1alpha1kind:FunctionMeshmetadata:name:functionmesh-samplespec:functions:-name:f1…-name:f2…-name:f3...-name:f4...-name:f5...-name:f6...这种模式下FunctionMesh不再运行在Pulsar上,而是运行在Kubernetes云平台上。在这种模式下,我们可以定义一系列资源,比如单个Function、Mesh(一组函数组合)、Source和Sink,其中Source和Sink是Pulsarconnector中更多的概念,Source是导入第三方partysystemdataintoPulsartopic中,sink是相反的动作,也方便数据湖等处理场景。基于Kubernetes的FunctionMesh调度方案如下图所示:方案对比:PulsarvsKubernetes对比基于Pulsar和基于Kubernetes的FunctionMesh实现方案,我们有一些思考:Kubernetes的调度能力。不同任务的调度是Kubernetes的一个特殊能力,它还可以提供高可用和容错的保证。在云环境中,Function成为一等公民,与Pulsar提供的服务具有同等地位。如果我们能把Pulsar的Function抽取出来,它就有可能和其他消息系统连接处理数据,类似于AWS提供的LambaFunction,也方便我们设计事件驱动模型。演示视频(点击链接查看推文原视频)后续规划目前我们在FunctionMesh上前期做了大量的工作,对于FunctionMesh我们也有更多的规划:提供更多的云原生支持;根据不同语言定制FunctionRuntime;函数注册表:方便用户打包和管理。。。我们也计划在近期开源。有兴趣的可以联系PulsarBot,在底部回复“Mesh”试试看。欢迎给我们反馈。相关阅读翻译|深度解析PulsarFunctions翻译|使用ApachePulsarFunctions进行简单的事件处理