数据赋予世界力量。我们每秒都会获得大量信息,我们对其进行清理、分析并创建更有价值的输出,无论是日志文件、用户活动、聊天消息还是其他内容。我们交付的速度越快,我们为客户带来的价值就越大。我们生活在一个快节奏、瞬息万变的时代。ApacheKafka是一个分布式流媒体平台,可以实时发布、订阅、存储和处理消息。其基于拉式的架构减轻了对服务施加压力的繁重负载,使其易于扩展。它以低延迟将大量数据从源移动到目的地。关于推架构与拉架构的思考我最近和人们谈论不同服务架构的优缺点......Kafka是一个基于JVM的平台,所以客户端的主流编程语言是Java。然而,随着社区的蓬勃发展,高质量的开源Python客户端也可以在生产中使用。在本文中,我将介绍和比较最著名的PythonKafka客户端:kafka-python、pykafka和confluent-kafka。最后,我将对每个库的优缺点给出我的看法。我们为什么要卡夫卡?要事第一。为什么选择卡夫卡?Kafka旨在增强事件驱动架构。它通过提供高吞吐量、低延迟、高持久性和高可用性解决方案来增强架构。(这并不意味着您可以同时拥有它们,总是需要权衡。阅读本白皮书以了解更多信息。)如何部署和优化Kafka以实现高性能和低延迟ApacheKafka?是一种强大的流处理他的平台白皮书讨论了如何针对以下情况优化Kafka部署:除了其高性能之外,另一个有吸引力的功能是发布/订阅模型,在这种情况下,发送方不会专门向接收方发送消息。相反,消息是根据主题传递到收件人可以订阅的集中位置。这样,我们就可以轻松地解耦应用程序并摆脱单体设计。让我们看一个例子来理解为什么解耦效果更好。您创建的网站需要将用户活动发送到某处,因此您可以编写从您的网站到实时监控仪表板的直接连接。这是一个非常有效的简单解决方案。有一天,您决定将用户活动存储在数据库中以供将来分析。因此,您编写了另一个到您网站的直接数据库连接。同时,您的网站流量越来越大,您希望通过添加警报服务、实时分析服务等来增强其功能。您的架构最终会变成这样。大型代码存储库、安全问题、可伸缩性问题和可维护性问题等问题都会伤害您。>Architecturewithoutdecoupling(CreatedbyXiaoxuGao)您需要一个枢纽来分离具有不同角色的应用程序。对于创建事件的应用程序,我们称它们为生产者。他们将事件发布到中央集线器。每个事件(即消息)都属于一个主题。消费者在中心的另一边。他们从中心订阅了他们需要的主题,而无需直接与制作人交谈。有了这个模型,就可以轻松地扩展和维护架构。工程师可以更专注于他们的核心业务。>Architecturewithdecoupling(CreatedbyXiaoxuGao)简而言之,Kafkasetup你可以从官网下载ApacheKafka。快速启动可帮助您在10秒内启动服务器。您还可以从Confluent平台下载ApacheKafka。它是迄今为止最大的Kafka流数据平台。它为个人和企业提供了一套围绕Kafka的基础设施服务,以将数据作为实时流传输。创始人是最初创建ApacheKafka的团队的一员。每个Kafka服务器称为一个代理,您可以将其以独立模式运行,也可以形成集群。除了Kafka,我们还需要Zookeeper来存储关于Kafka的元数据。Zookeeper就像一个协调者,负责管理分布式系统中每个代理的状态。>Kafka设置(由XiaoxuGao创建)假设我们已经设置了具有1个Zookeeper和1个Kafka代理的基础设施。现在是连接的时候了!原始的Java客户端提供了5个API:ProducerAPI:向Kafka集群中的topic发布消息。ConsumerAPI:消费来自Kafka集群中主题的消息。StreamsAPI:消费来自主题的消息并将它们转换为Kafka集群中的其他主题。这些操作可以是过滤、加入、映射、分组等。ConnectAPI:无需编码,直接将Kafka集群连接到源或汇系统。该系统可以是文件、关系数据库、Elasticsearch等。AdminAPI:管理和检查Kafka集群中的主题和代理。在Python的世界里,Kafka的Python库实现了五个API中的三个,分别是ProducerAPI、ConsumerAPI和AdminAPI。Python中还没有这样的KafkaStreamAPI,但是Faust是一个不错的选择。本节测试基于本地安装1个Zookeeper和1个Kafkabroker进行。这与性能调优无关,所以我主要使用库提供的默认配置。Kafka-Pythonkafka-python的设计功能与官方Java客户端非常相似,具有大量pythonic接口。最好与Kafka0.9+一起使用。第一版于2014年3月发布。目前正在积极维护中。安装pipinstallkafka-python每条消息都是通过send()异步发送的。调用时,它将记录添加到缓冲区并立即返回。这允许生产者将记录分批发送给Kafka代理,以提高效率。async可以大大提高速度,但是我们也要明白以下几点:在async模式下,并不能保证顺序。您无法控制Kafka代理何时确认(acknowledges)每条消息。为生产者提供成功和失败回调是一种很好的做法。例如,您可以在成功回调中写入信息日志消息,在失败回调中写入异常日志消息。由于无法保证顺序,因此在回调中收到异常之前可能会发送其他消息。如果你想避免这些问题,你可以选择同步发送消息。send()的返回是FutureRecordMetadata。通过执行future.get(timeout=60),生产者将被阻塞最多60秒,直到代理成功确认消息。缺点是速度,与异步模式相比相对较慢。消费者消费者实例是一个Python迭代器。消费者类的核心是poll()方法。它允许消费者继续从主题中提取消息。它的输入参数之一timeout_ms默认为0,这意味着该方法将立即返回缓冲区中可用的所有已拉取记录。您可以增加timeout_ms以返回更大的批次。默认情况下,每个消费者都是一个无限的监听器,所以它不会停止运行,直到程序被中断。但另一方面,您可以根据消费者收到的消息停止消费者。例如,您可以在达到某个偏移量时退出循环并关闭消费者。消费者也可以从多个主题分配到一个分区或多个分区。这是kafka-python库的测试结果。每条消息的大小为100字节。生产者的平均吞吐量为1.4MB/s。消费者的平均吞吐量为2.8MB/s。Confluent-kafkaConfluent-kafka是一个用于Python的高性能Kafka客户端,它利用了高性能C客户端librdkafka。从1.0版开始,它们在PyPi上作为OSX和Linux的独立二进制轮分发。它支持Kafka0.8+版本。第一版于2016年5月发布。目前正在积极维护中。安装对于OSX和Linux,librdkafka包含在软件包中,需要单独安装。pipinstallconfluent-kafka对于Windows用户,在我撰写本文时,confluent-kafka尚不支持Windows上的Python3.8二进制轮。你会遇到librdkafka的问题。查看他们的发行说明,它们正在积极开发中。另一种解决方案是降级到Python3.7。Confluent-kafka在速度方面有着不可思议的表现。API的设计有点类似于kafka-python。您可以通过将flush()放入循环中来使其同步。confluent-kafka中的ConsumerAPI需要更多代码。您无需处理高级循环方法(例如consume()),而是需要自己处理while循环。我建议您创建自己的consume(),它实际上是一个Python生成器。每当消息被拉出并在缓冲区中可用时,它就会生成该消息。这样,主要功能将很干净,您可以自由控制消费者的行为。例如,您可以在consume()中定义一个“会话窗口”。如果X秒内没有获取消息,消费者将停止。或者,您可以添加标志infinite=True作为输入参数来控制消费者是否应该是无限侦听器。这是confluent-kafka库的测试结果。每条消息的大小为100字节。生产者的平均吞吐量为21.97MBps。消费者的平均吞吐量为16.8~28.7MB/s。PyKafkaPyKafka是一个对程序员友好的PythonKafka客户端。它包括Kafka生产者和消费者的Python实现,可选地由基于librdkafka的C扩展支持。它支持Kafka版本0.82+。第一版于2012年8月发布,但自2018年11月以来一直没有更新。安装pipinstallpykafkalibrdkafka没有自带软件包,您需要在所有操作系统中单独安装。pykafka具有涵盖ProducerAPI和ConsumerAPI的KafkaClient接口。消息可以异步和同步模式发送。我发现pykafka修改了一些producer配置的默认值(比如linger_ms和min_queued_messages),对发送少量数据有影响。您可以将此与ApacheKafka网站上的默认配置进行比较。如果你想获得每条消息的回调,请确保将min_queued_messages更改为1,否则如果你的数据集小于70000,你将不会获得任何报告。>pykafka-producer-configconsumer你可以从KafkaClinet接口获得SimpleConsumer。这类似于kafka-python,其中轮询被包装在SimpleConsumer类中。这是pykafka库的测试结果。每条消息的大小为100字节。生产者的平均吞吐量为2.1MB/s。消费者的平均吞吐量为1.57MB/s。结论至此,我已经解释了每个库的ProducerAPI和ConsumerAPI。就AdminAPI而言,kafka-python和confluent-kafka确实提供了显式的AdminAPI。您可以在要创建主题的单元测试中使用它,然后在执行下一个测试之前将其删除。此外,如果您想使用Python构建Kafka监控仪表板,AdminAPI可以帮助您检索集群和主题元数据。Confluent-kafka:毫无疑问,Confluent-kafka在这三个库中表现最好。API经过精心设计,参数与原始ApacheKafka具有相同的名称和默认值。您可以轻松地将其链接到原始参数。就个人而言,我喜欢定制消费者行为的灵活性。Confluent也在积极开发和支持它。缺点是Windows用户可能需要一段时间才能让它工作。而且由于C扩展,调试可能很棘手。kafka-python:kafka-python是一个没有C扩展的纯Python库。API经过精心设计,易于初学者使用。这也是一个积极开发的项目。python-kafka的缺点是它的速度。如果你真的很关心性能,我建议你改用confluent-kafka。pykafka:与kafka-python和conflunet-kafka相比,pykafka的开发活动较少。版本历史显示它自2018年11月以来就没有更新过。此外,pykafka具有不同的API设计,具有不同的默认参数,可能不是第一次。
