本文最后更新于:a few seconds ago
                
              
            
            
              前言 注意:本篇内容以官方文档为主线加个人理解和采坑 
安装RabbitMQ 在windows10上安装 注意:安装RabbitMQ要先安装Erlang编程语言。需要翻墙下载,不然很慢。
https://www.erlang.org/downloads 
 
安装完需要设置系统变量。
可以从github 上下载: 
https://github.com/rabbitmq/rabbitmq-server/releases 
 
或者点击直接下载 注意:这个链接在您到来之后可能不是最新版!https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.3/rabbitmq-server-3.8.3.exe 
使用 chocolatey(Windows下的软件包管理器) 要安装RabbitMQ,请在命令行或PowerShell中运行以下命令:
 
在MacOS上下载 确保 Homebrew 为最新的:
 
然后下载rabbitmq
 
也可以下载通用UNIX的二进制文件 
https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.3/rabbitmq-server-generic-unix-3.8.3.tar.xz 
 
使用docker镜像  
配置RabbitMQ 注意:接下来以windows10为基本环境  如果您的环境为win10,下载完成记得将RabbitMQ的/sbin目录加入系统环境。 以便在任何地方控制您的MQ. 运行以下命令查看是否安装成功
 
遇到的问题 我的RabbitMQ一启动就失败,系统日志显示了以下错误。
RabbitMQ: Erlang machine stopped instantly (distribution name conflict?). The service is not restarted, ignoring OnFail option.
 
我尝试了多种解决方式。但是都没有用,最后把erlang和RabbitMQ卸载掉,都换成最新版本解决了问题。 注意:
RabbitMQ无法安装到具有ASCII字符的路径,也就是路径不能有中文 
需要用管理员身份安装Erlang环境。 
安装路径不要有空格。(未实践!) 官方对错误的解答:Windows怪癖  
 
安装可视化插件 先查看RabbitMQ的全部插件,运行以下命令:
 
输出内容如下:
Listing plugins with pattern ".*" ...  Configured: E = explicitly enabled; e = implicitly enabled  | Status: [failed to contact rabbit@waterbang - status not shown]  |/ .... [  ] rabbitmq_federation_management    3.8.3 [  ] rabbitmq_jms_topic_exchange       3.8.3 [  ] rabbitmq_management               3.8.3 // 这个是我们要安装的管理插件 [  ] rabbitmq_management_agent         3.8.3 [  ] rabbitmq_mqtt                     3.8.3 [  ] rabbitmq_peer_discovery_aws       3.8.3 [  ] rabbitmq_peer_discovery_common    3.8.3 [  ] rabbitmq_peer_discovery_consul    3.8.3 [  ] rabbitmq_peer_discovery_etcd      3.8.3 [  ] rabbitmq_peer_discovery_k8s       3.8.3 ...
 
运行以下命令进行安装:
rabbitmq-plugins enable rabbitmq_management
 
输出以下内容为安装成功
Enabling plugins on node rabbit@waterbang: rabbitmq_management The following plugins have been configured:   rabbitmq_management   rabbitmq_management_agent   rabbitmq_web_dispatch Applying plugin configuration to rabbit@waterbang... The following plugins have been enabled:   rabbitmq_management   rabbitmq_management_agent   rabbitmq_web_dispatch set 3 plugins. Offline change; changes will take effect at broker restart.
 
如何在node中传递消息 我们来跟着官方的入门例子来进行第一步,将生产者连接到RabbitMQ,发送一条消息后退出。 下面是我们要实现的基本模型。中间蓝色的队列是消息缓冲区。
安装amqp.node amqplib是一个为Node.JS制作AMQP 0-9-1 客户端的库,它支持Callback和Promise两种风格。 接下来在您的工作目录运行:
 
生产者 安装完成后开始写代码。创建生产者send.js。先连接道RabbitMQ服务器, 这是RabbitMQ给我们的一个最重要的接口,代码如下:
var amqp = require('amqplib'); amqp.connect('amqp://localhost:5672').then((conn) => {  // 创建一个通道   console.log(conn); }, (err) => {   throw Error('connection fail:===>' + err); })
 
它返回给我们一个ChannelModel。接下来我们用它来创建一个通道,并且用assertQueue(queue,[options])声明一个队列。 它接收两个参数:queue是一个字符串;如果您提供一个空字符串或其他虚假值(包括null和undefined),则服务器将为您创建一个随机名称。options是一个对象,可以为空或null,如果是最后一个参数,则可以完全省略。 选项中的相关字段是:
exclusive:如果为true,则将队列的作用域限定为连接(默认为false) 
durable:如果为true,则队列将在代理重新启动后幸存下来,并对exclusive和的作用取模autoDelete。如果未提供,则默认为true,这与其他方法不同 
autoDelete:如果为true,则当使用者数量降至零(默认值为false)时,将删除队列。 
arguments:附加参数,通常是某种特定于代理的扩展的参数,例如,高可用性,TTL。想要了解得更加详细点我  ,代码如下: 
 
var amqp = require('amqplib'); amqp.connect('amqp://localhost:5672').then((conn) => {  // 连接到RabbitMQ服务器   return conn.createChannel().then((ch) => { // 创建一个通道     console.log(ch);     let q = 'Queue'; // 队列名称     let msg = 'How are you!'; // 发送的消息     let ok = ch.assertQueue(q, { durable: false }); // 创建队列   }).finally(() => {     conn.close();   }); }, (err) => {   throw Error('connection fail:===>' + err); })
 
它返回给我们一个Channel,接下来我们来发送消息到队列. 具体代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 var amqp = require('amqplib'); amqp.connect('amqp://localhost:5672').then((conn) => {  // 连接到RabbitMQ   return conn.createChannel().then((ch) => { // 创建一个通道     let q = 'Queue';     let msg = 'How are you!';     let ok = ch.assertQueue(q, { durable: false }); // 声明队列     return ok.then((_qok) => {        for (let i = 0; i < 100; i++) {             ch.sendToQueue(q, Buffer.from(`${msg} 第${i}条消息`));         }       console.log(" [x] Sent '%s'", msg);       return ch.close(); // 关闭通道     })   }).finally(() => {     conn.close();   }); }, (err) => {   throw Error('connection fail:===>' + err); })
 
API详解sendToQueue  Buffer.from() :创建一个新缓冲区,其中填充了指定的字符串
 
消费者 接下我们来跟着上面一样写出消费者的代码,我们将使消费者保持运行状态以监听消息并打印出来。 我们一样要创建一个队列,防止在没有运行生产者的时候运行消费者。 具体代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 var amqp = require('amqplib'); amqp.connect('amqp://localhost:5672').then((conn) => { // 连接到RabbitMQ   process.once('SIGINT', () => { conn.close(); }); // (beforeExit)在退出之前,关闭通道。   return conn.createChannel().then((ch) => { // 创建通道     let queue = 'Queue';     let ok = ch.assertQueue(queue, {durable: false});  // 创建队列,(关闭持久化)     ok = ok.then(function(_qok) {       return ch.consume(queue, (msg) => {  // 取出消息(消费)         console.log(" [x] 消费了 '%s'", msg.content.toString());       }, {noAck: true});     });     return ok.then((_consumeOk) => {       console.log(' [*] Waiting for messages. To exit press CTRL+C');     });   }); }).catch(console.warn);
 
noAck: true,如果为true,表示当消费者收到消息不会通知RabbitMQ,消费者收到了消息就会立即从队列中删除。
消费API -> consume 
 
这样就实现了基本的消息发送和接收。
任务队列 使用任务队列的优点之一是能够轻松并行化工作。如果我们有很多工作,我们可以增加更多的消费者,这样就可以轻松扩展。 我们先来修改一下上面send.js的代码,让它能在命令行接收消息。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 var amqp = require('amqplib'); amqp.connect('amqp://localhost:5672').then((conn) => {  // 创建一个通道   return conn.createChannel().then((ch) => {     let q = 'task_queue';     let msg = process.argv.slice(2).join('') || 'helloWord!';     let ok = ch.assertQueue(q, { durable: true });     return ok.then((_qok) => {         ch.sendToQueue(q, Buffer.from(msg), {           persistent: true         });       console.log(" [x] Sent '%s'", msg);       return ch.close();     })   }).finally(() => {     conn.close();   }); }, (err) => {   throw Error('connection fail:===>' + err); })
 
persistent: true如果是true,则消息将在代理重新启动后继续存在,前提是它在一个队列中,该队列也在重新启动后继续存在.
 
接下来修改一下上面的receive.js,让它在处理消息的时候伪造一秒钟的停顿。 它将从队列中弹出消息并执行任务,我们将其命名为worker.js,具体代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 var amqp = require('amqplib'); amqp.connect('amqp://localhost:5672').then((conn) => {   process.once('SIGINT', () => { conn.close(); }); //beforeExit 执行close。   return conn.createChannel().then((ch) => {     let queue = 'task_queue'; // 保证使用该队列的时候先声明该队列     let ok = ch.assertQueue(queue, {durable: true});     ok = ok.then(function(_qok) {       return ch.consume(queue, (msg) => {         let secs = msg.content.toString().split('.').length - 1; //几个点就延迟几秒          console.log(" [x] Received %s", msg.content.toString());          setTimeout(function() {            console.log(" [x] Done");          }, secs * 1000);       }, {noAck: true});     });     return ok.then((_consumeOk) => {       console.log(' [*] Waiting for messages. To exit press CTRL+C');     });   }); }).catch(console.warn);
 
循环调度 默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。 平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环。 具体效果如下:
消息确认 如果其中一个使用者开始一项漫长的任务并仅部分完成而死掉,会发生什么情况。 使用我们当前的代码,RabbitMQ一旦向消费者发送了一条消息,便立即将其标记为删除。 在这种情况下,如果您杀死一个消费者,我们将丢失正在处理的消息。我们还将丢失所有发送给该特定消费者但尚未处理的消息。 但是我们不想丢失任何任务。如果一个消费者死亡,我们希望将任务交付给另一个消费者。 接下来我们来修改一下代码,开启{noAck: false}选项,一旦我们完成了一项任务,消费者会发送一个确认消息。 修改worker.js,具体如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 var amqp = require('amqplib'); amqp.connect('amqp://localhost:5672').then((conn) => {   process.once('SIGINT', () => { conn.close(); }); //beforeExit 执行close。   return conn.createChannel().then((ch) => {     let queue = 'task_queue'; // 保证使用该队列的时候先声明该队列     let ok = ch.assertQueue(queue, {durable: true});     ok = ok.then(function(_qok) {       return ch.consume(queue, (msg) => {         let secs = msg.content.toString().split('.').length - 1;          console.log(" [x] Received %s", msg.content.toString());          setTimeout(function() {            console.log(" [x] Done");            ch.ack(msg);  // ++  // 确认给定的消息          }, secs * 1000);       }, {noAck: false}); // ==      });     return ok.then((_consumeOk) => {       console.log(' [*] Waiting for messages. To exit press CTRL+C');     });   }); }).catch(console.warn);
 
这样就能防止消费者挂掉后,由它处理的任务失败掉,我们来故意挂掉消费者1号,具体效果如下:
消费者挂掉后,未确认的消息被重发了。
消息持久化 我们已经实现了如何确保消费者死亡,任务也不会丢失。但是,如果RabbitMQ服务器停止,我们的任务仍然会丢失。 如果您要告知RabbitMQ,确保消息不会消失,需要做两件事:我们需要将队列和消息都标记上持久性。 首先,我们需要确保RabbitMQ永远不会丢失队列。为此,我们需要将其声明为持久的:
channel.assertQueue('hello', {durable: true});
 
注意:RabbitMQ不允许您使用不同的参数重新定义现有队列,并且将向尝试执行此操作的任何程序返回错误。
 
接下来将消息标记为持久消息:
channel.sendToQueue(queue,Buffer.from(msg),{ persistent:true });
 
消息持久化并不会真正的写入磁盘,它只是保存到缓存当中,持久性并不强,但对于简单队列而言,已经绰绰有余了。 如果还需要更强的保存能力,可以使用发布者确认 
公平分配 为了不让RabbitMQ盲目的将消息的第n条消息发送给第n个使用者。这样在有的任务重,有的任务轻的时候, 可能会出现一位工人将一直忙碌而另一位工人将几乎不做任何工作。 发生这种情况是因为RabbitMQ在消息进入队列时才调度消息。它不会查看消费者的未确认消息数。 为了解决这个问题我们需要做以下配置:
 
在处理并确认上一条消息之前,不要将新消息发送给工作人员。而是将其分派给尚不繁忙的下一个工作人员。
发布订阅 上面我们实现了把每个任务恰好交付给一个工人,接下来我们来实现,将消息传递给多个工人。 我们来跟着官方的例子,构建一个简单的日志记录系统。就是将已发布的日志广播到所有接收者。 它由两个程序组成:第一个程序将发出日志消息,第二个程序将接收并打印它们。
交换(Exchanges) 它是Rabbit的消息交换模型。核心思想是生产者从不将任何消息直接发送到队列。 一方面,它接收来自生产者的消息,另一方面,将它们推入队列。 但是它如何知道应该将消息加入队列,还是丢弃呢?规则由交换类型定义。 有四种交换类型可用:direct,topic,headers,fanout。fanout交换类型正是我们需要的,它只是将接收到的所有消息广播到它知道的所有队列中。
ch.assertExchange('logs','fanout',{durable:false })
 
Channel的assertExchange用来断言交换的存在,与队列一样。 参数类型为:assertExchange(exchange, type, [options])
 
如果要知道您的服务器上的交换可以运行:
rabbitmqctl list_exchanges
 
默认交换 我们以前使用的都是默认交换
channel.sendToQueue('hello',Buffer.from('Hello World!'));
 
在这里,我们使用默认或无名称交换:消息以指定为第一个参数的名称路由到队列(如果存在)。 接下来我们指定发布到logs交换中:
channel.publish('logs', '', Buffer.from('Hello World!'));
 
publish功能和sendToQueue差不多,第二个参数的空字符串表示我们不想将消息发送到任何特定的队列。
绑定 我们已经创建了一个fanout交换和一个队列。现在我们需要告诉交换机将消息发送到我们的队列。交换和队列之间的关系称为绑定。
channel.bindQueue(queue_name, 'logs', '');
 
bindQueue :声明从交换机到队列的路由路径
 
这此我们将消息发送到logs交换器,而不是无名的默认交换器。 发送时我们需要提供一个路由密钥,但是对于fanout交换,它的值将被忽略。这是emit_log.js脚本的代码 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 const amqp = require('amqplib'); amqp.connect('amqp://localhost').then((conn) => {   return conn.createChannel().then((ch) => {     const ex = 'logs';     let ok = ch.assertExchange(ex, 'fanout', {durable: false}) // 声明交换     let message = process.argv.slice(2).join(' ') ||       'info: Hello World!';     return ok.then(() => {       ch.publish(ex, '', Buffer.from(message));       console.log(" [x] Sent '%s'", message);       return ch.close();     });   }).finally(() => { conn.close(); }); }).catch(console.warn);
 
从上面我们可以看到,建立连接之后,我们声明了交换。 如果没有队列绑定到对应的交换那么消息将丢失。receive_log.js代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 const amqp = require('amqplib'); amqp.connect('amqp://localhost').then( (conn) => {   process.once('SIGINT', () => { conn.close(); });   return conn.createChannel().then((ch) => {     let ok = ch.assertExchange('logs', 'fanout', { durable: false });     ok = ok.then(() => {       return ch.assertQueue('', { exclusive: true });     });     ok = ok.then((qok) => {       return ch.bindQueue(qok.queue, 'logs', '').then(function () {  // 绑定队列         return qok.queue;       });     });     ok = ok.then((queue) => {       return ch.consume(queue, logMessage, { noAck: true });     });     return ok.then(() => {       console.log(' [*] Waiting for logs. To exit press CTRL+C');     });     function logMessage(msg) {       console.log(" [x] '%s'", msg.content.toString());     }   }); }).catch(console.warn);
 
运行结果如下:
over 助秋风雨来何速,惊破秋窗秋梦绿。 「代别离·秋窗风雨夕」 曹雪芹