上篇文章梳理了puppeteer作为爬虫一路部署的基础。现在来看分布式处理1)为什么是分布式1.需要抓取的数据有很多,headlessbrowser会同时打开。爬取,然后获取数据并无意义地将其压缩到数据库中。2.不保证同一时间对需要的数据只有一个操作在进行。百度上对框架ZooKeeper和RabbitMQ的解释很多。读者可以搜索更详细的了解节点版zookeeper和节点版RabbitMQ3)。代码,项目地址会在最后附上,你可以去publisher,给小说打分37(channel_id),然后是bookid(channel_book_id)来抢书//我们在formofaninterface爬取参数//简单版请求(接收参数不做任何处理除外)->Publisherapp.get('/v1.0/grasp_book',(req,res,next)=>{//爬取时必填参数if(!req.query.channel_id&&!req.query.channel_book_id){res.send({code:403,msg:'paramserror'})returnnull;}//publisher//connecttorabbitmqamqp.connect('amqp://rabbitmq:12345678@127.0.0.1:5672/').then(function(conn){returnconn.createChannel().then(function(ch){//创建hello消息队列varq='hello';//解析成json字符串格式作为传入的数据格式varmsg=JSON.stringify({"channel_id":req.query.channel_id,"book_id":req.query.book_id})//连接并保持varok=ch.assertQueue(q,{durable:true});returnok.t??hen(function(_qok){//发送数据给消费者ch.sendToQueue(q,Buffer.from(msg));console.log("[x]Sent'%s'",msg);returnch.close();});}).finally(function(){conn.close();});}).catch(console.warn);res.send({code:200,msg:'success'});});命令行启动app.js,发出请求并看到result,{}里面是我们发送出去的数据消费者和ZooKeeper消费者,接收发布者传过来的数据构建消息队列,然后使用Zookeeper创建一个临时节点来保持队列依次执行varzookeeper=require('node-zookeeper-client');//根据标识动态导入js文件functionmoduleCustomize(channel_id){returnrequire(`../${channel_id}.js`)}varclient=zookeeper.createClient('127.0.0.1:2181');asyncfunctionsleep(second){returnnewPromise((resolve,reject)=>{setTimeout(()=>{resolve('sleep')},second)})}//connect_zookeeperclient.once('已连接',function(){console.log('已连接到服务器。');//建立连接rabbitmpamqp.connect('amqp://rabbitmq:12345678@127.0.0.1:5672/').then(function(conn){returnconn.createChannel().then(function(ch){//名称为hello(varok=ch.assertQueue('hello',{durable:true});//预存为1ok=ok.t??hen(function(){ch.prefetch(1);});ok=ok.t??hen(function(){//doWork回调函数->收到数据后执行操作ch.consume('hello',doWork,{noAck:false});});returnok;//rabbitmq处理函数doWork(msg){//接收到的数据varbody=msg.content.toString();console.log("[x]Received'%s'",body);let_body=JSON.parse(body)letchannel_book_id=_body['channel_book_id'];letchannel_id=_body['channel_id'];//zookeeper节点letpath=`/${channel_id+"_"+channel_book_id}`//连接_zookeeper判断是否存在client.exists(path,function(error,stat){if(error){console.log(错误.堆栈);返回;}if(stat){console.log('节点存在。');//如果存在则不执行,但是需要传递数据//ack可以参考https://www.jianshu。com/p/a5f7fce67803ch.ack(味精);}else{console.log('节点不存在');//操作完成后,释放当前创建的临时节点client.create(path,null,zookeeper.CreateMode.EPHEMERAL,function(error){if(error){console.log('创建节点失败:%sdueto:%s.',path,error);}else{console.log('Node:%sissuccessfulcreated.',path);//根据传入的标识(比如bookflag是37),动态导入js文件(抢书的操作)moduleCustomize(channel_id).init(channel_book_id)//传输数据ch.ack(msg);//释放当前创建的临时节点客户。remove(path,-1,function(error){if(error){console.log(error.stack);return;}console.log('Nodeisdeleted.');});}});}console.log('任务完成')});}});}).catch(console.warn);});客户端连接();命令行启动rab_consumer.js,{}是我们收到的得到消息队列中的数据4)查看mongo结果数据5)项目地址(更新分支)
