varamqp=require('amqplib');connect([url,[socketOptions]])varamqp=require('amqplib/callback_api');connect([url,[socketOptions]]],function(err,conn){...})url中的参数可以在URI的查询部分给出进一步的AMQP调整参数,例如,像'amqp://localhost?frameMax=0x1000'。它们是:frameMax连接上允许的最大帧的大小(以字节为单位)。0表示没有限制(但由于帧的大小字段是一个无符号的32位整数,因此它必须是2^32-1);我会将它默认为0x1000,即4kb,这是允许的最小值并且适合多次使用,而不是通过Node.JS的缓冲池。channelMax允许的最大通道数。默认为0,即2^16-1.heartbeatconnection心跳周期,单位秒。默认为0;locale错误消息所需的语言环境。RabbitMQ只使用过en_US;幸运的是,这是默认设置。url也可以作为对象使用{协议:'amqp',主机名:'localhost',端口:5672,用户名:'guest',密码:'guest',区域设置:'en_US',frameMax:0,heartbeat:0,vhost:'/',}//关闭连接connection.close()回调connection.close([function(err){...}])_//断言队列是否存在assertQueue_//检查队列是否存在checkQueue//删除队列(如果队列不存在则关闭通道)deleteQueueRPCservervaramqp=require('amqplib/callback_api');//建立连接amqp.connect({protocol:'amqp(协议包括amqp和amqps)',hostname:'rabbitserveraddress',port:端口号,username:'用户名',password:'密码'},function(err,conn){conn.createChannel(function(err,ch){varq='rpc\_queue';ch.assertQueue(q,{durable:false});ch.prefetch(1);console.log('\[x\]正在等待RPC请求');ch.consume(q,functionreply(msg){varn=parseInt(msg.content.toString());console.log("\[.\]fib(%d)",n);varr=fibonacci(n);ch.sendToQueue(msg.properties.replyTo,newBuffer(r.toString()),{correlationId:msg.properties.correlationId});ch.ack(味精);});});});functionfibonacci(n){if(n==0||n==1){returnn;}else{fibonacci(n\-1)+fibonacci(n-2);}}客户端代码varamqp=require('amqplib/callback_api');varargs=process.argv.slice(2);控制台日志(参数);if(args.length==0){console.log("Usage:rpc\_client.jsnum");process.exit(1);}amqp.connect({protocol:'amqp(协议包括amqp和amqps)',hostname:'rabbit服务器地址',port:端口号,username:'用户名',password:'密码'},function(err,conn){conn.createChannel(function(err,ch){ch.assertQueue('',{exclusive:true},function(err,q){//随机数的唯一值varcorr=generateUuid();varnum=parseInt(args\[0\]);console.log('\[x\]请求fib(%d)',num);ch.consume(q.queue,function(msg){if(msg.properties.correlationId==corr){console.log('\[.\]得到%s',msg.content.toString());setTimeout(function(){conn.close();process.exit(0)},500);}},{noAck:true});//replyTo:q.queuesetch.sendToQueue('rpc\_queue',newBuffer(num.toString()),{correlationId:corr,replyTo:q.queue});});});});functiongenerateUuid(){returnMath.random().toString()+Math.random().toString()+Math.random().toString();}
