本文不会涉及Kafka的具体操作,而是告诉你Kafka是什么,它在爬虫开发中能起到什么重要的作用。一个简单的需求假设我们需要写一个微博爬虫。老大给的要求是这样的:你开发一个爬虫很简单,所以三遍五遍二分就可以开发爬虫了:接下来开始做报警功能,逻辑也很简单:我们看一下统计关键词的功能。这个功能背后有一个网页,会实时显示抓取数据量的变化,可以显示每分钟或每小时抓取关键词的数量。这个功能对你来说也很简单,所以你实现了如下逻辑:最后一个需求是对微博数据进行情感分析。情感分析模块是其他部门的同事开发的。你要做的就是每小时拉取一批数据,发送到接口,获取返回,然后存储到后端需要的数据库中:任务完成,你就高高兴兴的回家睡觉吧.困难接踵而至。履带车放慢了速度。随着老板逐渐增加新的关键词,你发现每次完整爬取的时间越来越长。一开始是2分钟的爬行,后来变成了10分钟的回合。然后就变成了30分钟一局,再后来就变成了一个小时抢一局。随着延迟越来越高,您的警报变得越来越不准确。微博发出已经一个小时了,你的报警还没有发出,因为那条微博还没有及时保存。你的爬虫技术很好,可以绕过所有的反爬虫机制。你有无限的代理IP,所以你很容易将爬虫增加到每秒一百万并发。现在你只需要1分钟就可以完成所有数据的爬取。现在没事了。但警报仍未发出。这是怎么回事?数据库撑不住了。经过排查,你发现了问题。取数据量变大了,但是MongoDB不能同时接收这么多数据写入。写入数据的速度远低于爬取数据的速度,内存中堆积了大量的数据。所以你的服务器爆炸了。您紧急建立了100个数据库,编号为0-99。对于抓取的微博,首先计算每条微博的ID与100的余数,然后将数据存储到对应的MongoDB中。每个MongoDB的压力下降到原来的1%。数据最终可以实时存入数据库。但警报仍未发出。不仅如此,实时抢量统计功能现在也不能使用了。有什么问题?查询为时已晚。现在报警程序要遍历100个数据库中最近5分钟的每条数据,确认是否有需要报警的内容。但是这个遍历过程远不止5分钟。时间错开。因为微博综合搜索功能没有按时间排序,所以会出现12:02只抓到早上10:01发的微博的情况。无论你报警时过滤数据,还是将过滤后的数据推送到NLP分析界面,如果你按微博发布时间搜索,那么这条会被你直接漏掉——当你在10:05搜索微博时10:00到10:055分钟内发布。由于这条微博没有被抓到,所以你将无法搜索到它。当你在12:05开始检索12:00-12:05的数据时,你搜索的是12:00-12:05发布的数据,所以虽然12:02抓到了10:01的数据,但您也无法过滤掉它们。那么是否可以利用爬取时间来进行搜索呢?比如10:05,开始取回10:00-10:05抓取的数据,不管它的发布时间,都会取回。这样做确实可以保证不漏掉任何数据,但是这样做的代价是你要保存和检索大量的数据。比如你每次抢,只要发布时间是最后10小时,就一定要存下来。因此,报警程序在检索数据时,需要检索出5分钟内存储在数据库中,10小时内实际释放的所有数据。什么,你说每次保存这条微博都要检查是否已经存在,如果存在就不保存?别忘了batchwrite时间不够,要不要分配点时间查询?脏数据来了。老板突然来告诉你,在关键词“篮球”中有很多关于蔡徐坤的内容,你应该删除所有包含蔡徐坤的数据。那么,这个过滤逻辑放在哪里呢?放到爬虫的pipelines.py里面?然后你必须重新部署所有的爬虫。今天过滤蔡徐坤,明天过滤粉丝层,后天过滤王一博。天天加关键词,天天都得重新部署爬虫?然后你把关键字放到Redis或者MongoDB里面,每次插入数据前读取所有关键字,看微博不包括re-save。还是一样的问题,插入时间不够,还需要查数据库?嗯,关键字过滤没有放在爬虫里。您编写了一个脚本,每分钟检查一次MongoDB是否有新数据,如果其中包含不必要的关键字,则将其删除。现在问题来了,删除数据的程序每分钟检查一次,报警程序每5分钟检查一次。中间肯定有一些数据。在删除之前,报警程序会报警。老大接到报警看数据,你的删除程序此时删除了脏数据。现在好了,天天报虚警,狼来了的故事在上演。5个问题1个救星如果你在爬虫开发的过程中遇到了上面的很多问题,那么你应该试试Kafka。一次性解决以上所有问题。在你的爬虫进程中加入Kafka,那么你的爬虫架构就变成了这样:看起来和直接往MongoDB里面写数据,然后每个程序去读MongoDB没什么区别?那么Kafka能解决什么问题呢?我们先来看看在这个爬虫架构中我们会用到的Kafka的特点:与其说Kafka在这个爬虫架构中像MongoDB,不如说它更像是一个Redis的列表。现在让我们简化我们的模型。如果现在爬虫只有一个需求,那就是搜索然后报警。那么我们可以这样设计:将爬虫爬取的数据直接塞到Redis列表的右侧。报警程序从Redis列表的左边开始,一个一个读取。读一看一,如果含有报警关键字,则报警。然后阅读下一篇。这样做有什么好处?因为报警程序直接从Redis中一条一条读取,没有按时间查找数据的过程,所以不会有数据延迟的问题。由于Redis是单线程数据库,可以同时启动多个报警程序。因为lpop读一个就删一个,如果报警程序因为某种原因崩溃了,只要重启一下,它就会继续工作,不会重复报警。但是使用Redis列表的优点也是缺点:列表中的信息只能消费一次,弹出来就没了。所以如果既需要报警,又需要将数据存入MongoDB备份,那么只有一种方式,即报警程序检查数据后,将数据存入MongoDB。可我只是一个哨兵,为什么要让我去当后勤兵呢?对于报警程序,让它做报警的事情,不应该做存储数据的事情。使用Kafka,它具有Redis列表的这些优点,但没有Redis列表的缺点!我们可以分别实现4个程序,不同程序之间消费数据的速度互不影响。但是同一个程序,无论是关闭再打开,还是同时运行多次,都不会被重复消费。程序一:alarm从kafka中一条一条读取数据,做alarm相关的工作。程序1可以同时启动多个。关闭再打开不会重复消费。程序2:存储原始数据该程序从Kafka中一条一条读取数据,每采集1000条,批量写入MongoDB。这个程序不需要实时存储数据,有延迟也无所谓。存储在MongoDB中只是一个原始数据存档。一般情况下不会从MongoDB中读取。程序3:统计从Kafka读取的数据,记录关键字和发布时间。按小时和分钟分别统计每个关键词的微博数。最后保存计数结果。程序4:情感分析从Kafka读取每条数据,做足够的batches发送到NLP分析接口。获取结果并将其存储在后端数据库中。如果要清洗数据,四个需求都解决了怎么办,那么如果还需要先把脏数据去掉,再分析呢?其实很简单,加个Kafka(Topic)就可以了!除了上面的微博例子,我们再看看在开发通用爬虫时如何应用Kafka。在任何时候,XPath提取数据和解析网站返回的JSON都不是爬虫开发的主要工作。爬虫开发的主要工作一直是爬虫的调度和反爬虫的开发。我们现在写Scrapy的时候,处理反爬虫和数据提取的逻辑都是写在一个爬虫项目里面的,所以在开发的时候其实很难做到多人协作。现在我们将网站内容的爬虫和数据提取分离,实现如下爬虫架构:爬虫开发能力好的同学负责绕过反爬虫,获取网站内容,无论是HTML源码还是接口返回的JSON。拿到后直接塞进Kafka中。相对于有爬虫技术的普通学生和实习生来说,他们只需要从Kafka中获取数据即可,不需要关心数据是来自Scrapy还是Selenium。他们要做的就是根据产品需求将HTML或JSON解析成格式化数据,然后塞入Kafka中,供后续数据分析同学继续阅读和使用。这样一个数据团队的工作就分开了,每个人各司其职,约定一个格式,同步开发,互不影响。为什么是Kafka而不是上面介绍的其他功能呢?其实很多MQ都可以实现。但为什么是卡夫卡而不是其他人?因为Kafka集群的性能非常高,在垃圾电脑上搭建的集群可以抵抗每秒10万次并发数据写入。而如果选择性能更好的服务器,可以轻松应对每秒100万次的数据写入。小结本文通过两个实例介绍了Kafka在爬虫开发中的作用。作为一名爬虫工程师,作为我的读者。一定要掌握Kafka。在下一篇文章中,我们将讨论如何使用Kafka。它会比你在网上看到的教程更简单易懂。关注本公众号,回复“爬虫与卡夫卡”即可获取本文对应的原创思维导图。
