本文由AxelSirota撰写,StreamNative的刘宇、Jennifer、Sijia翻译。原文链接https://streamnative.io/en/bl…。关于ApachePulsarApachePulsar是Apache软件基金会的顶级项目。是集消息、存储、轻量级函数计算为一体的下一代云原生分布式消息流平台。它采用计算和存储分离的架构设计,支持多租户和持久化。存储,多机房跨区域数据复制,具有强一致性、高吞吐量、低延迟和高扩展性等流数据存储特性。GitHub地址:http://github.com/apache/pulsar/Pulsar2.0版本引入了PulsarFunctions。PulsarFunctions使用户能够轻松顺利地迁移到无服务器应用程序。本文主要介绍PulsarFunctions的基本信息以及如何开发PulsarFunctions。此外,本文列出了将应用程序迁移到PulsarFunctions时的一些注意事项。简单场景假设有这样一个使用场景:我们经营一家电子商务公司,公司的主要业务是处理支付发票。在Pulsar中,该业务包括以下三个步骤:将发票导入订单主题;执行代码以逗号分隔发票值;将发票值写入PostgreSQL。本文主要介绍第二步。一般来说,我们执行的代码可能是在AWSLambda中创建的无服务器函数,也可能是成熟的微服务。这种方法有很多缺点。首先,我们为一小段代码开发了一个完整的服务。由于开发工作的复杂性,完全实施可能需要长达两周的时间。其次,由于源数据架构在不断变化,维护会越来越困难。我们需要对服务和底层PostgreSQL表进行完整的版本控制和重新部署,这项工作至少需要一天的时间。此外,AWSLambda函数在与Pulsar连接或断开连接时需要身份验证。Pulsar首先调用Lambda函数,然后Lambda函数本身向Pulsar进行身份验证。由于Lambda函数引入了不必要的双向消息传递,因此会影响性能。PulsarFunctions简介PulsarFunctions是一个轻量级的计算框架,用于处理主题之间的数据。PulsarFunctions在Pulsar中运行,因此无需单独部署微服务,从而节省时间并简化故障排除。PulsarFunctions的复杂性是灵活的。PulsarFunctions不仅支持将数据从一个主题转换/移动到另一个主题,还支持将数据发送到多个主题,执行复杂的路由和批处理请求等。PulsarFunctions易于调试,支持在调试模式下部署功能,即,它们可以在连接到代码时进行调试并实时执行。开发PulsarFunctions使用熟悉的编程语言创建PulsarFunction就像实现PulsarFunction子类一样简单。下面的代码是用Java语言编写的,Pulsar也支持Python和Go语言。publicclassSplitFunctionimplementsFunction>{@OverridepublicListapply(Stringinput){returnArrays.asList(input.split(","));}}}代码编译打包后,通过functionscreate命令将函数部署到Pulsar实例中。该命令的参数是打包后的代码和函数的输入/输出主题。bin/pulsar-adminfunctionscreate--jartarget/split.jar--classnamedemo.SplitFunction--inputinput-topic--outputoutput-topic开发和部署自定义PulsarFunction最多需要两天时间。部署完成后,PulsarFunctions可以大大简化用户的工作量,缩短产品发布时间。Pulsar支持用户部署任意数量的PulsarFunctions从一个主题中获取数据并将其发送到其他主题,也可以轻松地将状态消息写入Pulsar日志。PulsarFunctions不仅简化了Pulsar的部署过程,增强了Pulsar的灵活性,还扩展了Pulsar的功能。如何开发一个完整的PulsarFunctions来充分利用PulsarFunctions的丰富特性?开发一个完整的PulsarFunction就像在类中实现Function接口一样简单。首先实现process()方法。process()方法提供的语义对象是通往Pulsar的门户。通过语义对象,我们可以访问记录器、跟踪输出、向主题发送消息等。使用PulsarFunctions获取输入数据和提取发票价格的示例代码如下。我们可以使用语义对象将这些数据发送到另一个输出主题(如果要在部署Function时将数据发送到指定的输出主题,只需将其作为Function的返回值返回即可。这个例子展示了如何将数据发送到另一个主题使用PulsarFunctions进行路由并从Function返回null。)导入org.apache.pulsar.functions.api.Function;publicclassRoutingFunctionimplementsFunction{@OverridepublicVoidprocess(Stringinput,Contextcontext)throwsException{LoggerLOG=context.getLogger();}LOG.info(String.format("得到这个输入:%s",input));价格inputPrice=newPrice(input);Stringtopic=String.format("year-%s",inputPrice.getYear());context.newOutputMessage(topic,Schema.STRING).value(inputPrice.getPrice()).send();//我们也可以在这里返回一些对象,它会被发送到//函数提交期间设置的输出主题returnnull;}}成本低于AWSLambda既然AWSLambda可以满足我们的需求,为什么我们决定切换到PulsarFunctions?与AWSLambda相比,PulsarFunctions具有一系列的优势,比如方便调试、去除了Pulsar和Lambda之间的双向认证等。下面我们通过一个常见的使用场景来比较一下使用AWSLambda和PulsarFunctions的成本。假设在一个在线拍卖的实时竞价系统中,每秒有10,000次竞价(每月260亿次请求),仅考虑请求费用,忽略计算时间,成本为$5,000。假设每个请求需要100毫秒和一个2048GB的虚拟机,计算成本为86,000美元。这还不包括AWS传输数据的费用!AWSLambda是无服务器功能的绝佳选择,但仅适用于小规模用例。使用Lambda处理数十亿笔交易的数据管道非常昂贵。使用PulsarFunctions可以显着节省成本。当我刚加入JAMPP时,JAMPP团队只使用Lambda,一小部分数据管道的成本是每月30,000美元以上。当我们从AWSLambda迁移到PulsarFunctions时,成本下降到每月几百美元,主要用于在AmazonEC2实例上托管Pulsar。迁移到PulsarFunctions首先看一下使用PulsarFunctions的架构。在示例使用场景中,我们在AWSLambda中编写了一个Java函数来处理主题之间的数据。PulsarFunctions在此架构中取代了Lambda,简化了开发和部署。部署PulsarFunctions后,我们需要为数据创建导入和转储脚本。借助PulsarIO,用户可以轻松地在Pulsar中定义外部数据源/接收器,从而简化此过程。PulsarIOsource/sink本身也是作为PulsarFunctions实现的,即用户可以在Pulsar中创建自定义的sources/sink来简化调试操作。迁移到PulsarFunctions只需三步:将所有处理逻辑迁移到一个或多个PulsarFunctions转换IO逻辑(使用PulsarIOsource/sink)使用logtopic处理日志数据只需三步即可完全迁移到Pulsar中运行无服务器应用程序。如果用户当前使用的消息系统是Kafka呢?不用担心,使用Kafka-on-Pulsar,无需编写任何代码,即可从Kafka顺利迁移到Pulsar。结束语本文简要介绍了如何使用PulsarFunctions。除了本文讨论的功能外,PulsarFunctions还添加了更多令人兴奋的功能。比如StreamNative最近宣布发布PulsarFunctionMesh,支持PulsarFunction服务集群协同部署。本文主要介绍如何开发PulsarFunctions,如何将应用迁移到运行在Pulsar上的serverless应用,以及PulsarFunctions的易用性和灵活性。祝迁移顺利!相关阅读翻译|深度解析PulsarFunctions翻译|使用ApachePulsarFunctions进行简单的事件处理翻译|PulsarFunctions简要指南:原则、目标和规划