RabbitMQ快速入门手册

/ RABBITMQ / 0 条评论 / 371浏览 / 自动同步于GITHUB

1. 安装

使用docker方式,拉取镜像uetty/rabbitmq

具体Dockerfile的命令 -> Dockerfile

额外的TIP: 由于Linux机器上会有最大打开文件个数限制,Rabbitmq又依赖文件操作,所以应将Linux的所有用户打开文件限制调高到64000,Rabbitmq程序所属的用户的打开文件限制调高到64000,这一块见官网(文件限制

2. 虚拟主机

Rabbitmq是多租户系统,存在虚拟主机(把它想象成为数据库)的概念,用户对它的连接、队列、绑定、路由等操作是基于虚拟主机上的。通过rabbitmqctl add_vhost vhostname命令来增加虚拟主机,rabbitmqctl delete_vhost vhostname命令来删除虚拟主机,rabbitmqctl list_vhosts [name tracing]命令来显示虚拟主机列表。

设置虚拟主机并发客户端总数:rabbitmqctl set_vhost_limits -p test '{"max-connections": 256}'

设置虚拟主机最大队列数:rabbitmqctl set_vhost_limits -p test '{"max-queues": 1024}'

显示虚拟主机参数:rabbitmqctl list_vhost_limits -p test

3. 启动与用户操作及权限

启动命令:service rabbitmq-server start

Rabbit提供了用户名密码的方式认证和X509证书的方式认证,,这里只记用户名密码的方式

初始用户:刚创建的实例,有默认的用户guest密码guest,但只能在本机登陆(不鼓励将其配置成能远程登陆,其他用户默认是可以远程访问的)

用户设置(官网地址

添加用户:rabbitmqctl add_user janeway changeit

删除用户:rabbitmqctl delete_user janeway

变更密码:rabbitmqctl change_password janeway newpass

验证用户名密码:rabbitmqctl authenticate_user janeway verifyit

显示用户列表:rabbitmqctl list_users

用户权限

设置用户TAG:rabbitmqctl set_user_tags janeway administrator

删除用户TAG:rabbitmqctl set_user_tags janeway

TIP: 用户TAG与全局权限绑定(包括登陆可视化管理界面的权限等),具体见官网(TAG权限表

Tag Capabilities
(None) No access to the management plugin
management Anything the user could do via messaging protocols plus:List virtual hosts to which they can log in via AMQPView all queues, exchanges and bindings in "their" virtual hostsView and close their own channels and connectionsView "global" statistics covering all their virtual hosts, including activity by other users within them
policymaker Everything "management" can plus:View, create and delete policies and parameters for virtual hosts to which they can log in via AMQP
monitoring Everything "management" can plus:List all virtual hosts, including ones they could not access using messaging protocolsView other users's connections and channelsView node-level data such as memory use and clusteringView truly global statistics for all virtual hosts
administrator Everything "policymaker" and "monitoring" can plus:Create and delete virtual hostsView, create and delete usersView, create and delete permissionsClose other users's connections

设置用户在虚拟主机上的权限:rabbitmqctl set_permissions -p vhostname username ".*" ".*" ".*" 后面三个参数分别表示配置权限、写权限、读权限,引号内的内容用正则表达式匹配队列键名(不指定[--vhost/-p]参数,默认主机为'/' ,下同)

清除用户在虚拟主机上的权限: rabbitmqctl clear_permissions -p vhostname username

显示某个用户在虚拟主机上的权限: rabbitmqctl list_user_permissions uetty

显示某个虚拟主机上的用户权限:rabbitmqctl list_permissions -p vhostname

4. 端口介绍

5671端口:AMQP协议使用,且TLS加密

5672端口:AMQP协议使用,且不加密

15672端口:HTTP协议使用,或者开启可视化管理页面后,页面地址所在端口

61613端口:STOMP协议使用,且不加密

61614端口:STOMP协议使用,且TLS加密

1883端口:MQTT协议使用,且不加密

8883端口:MQTT协议使用,且TLS加密

15674端口:Websocket协议上的STOMP协议使用

15675端口:Websocket协议上的MQTT协议使用

TIP: 较常用的是5672端口(一般选择AMQP协议,有加密需求的化是5671端口)和15672端口

5. 交换机、队列与消息的路由

在Rabbitmq中,生产者是将消息推送到交换机里,交换机负责将消息路由到队列中,消费者再从队列中消费消息。这里涉及到了两个概念,交换机队列,在此基础上引申出了三个操作:交换机的声明队列的声明交换机和队列绑定的声明,这三个操作的完成是基于交换机名队列名路由键这三个属性的。

1) 通过channel.exchangeDeclare("exchangeName", BuiltinExchangeType.类型);声明交换机
2) 通过String queueName = channel.queueDeclare().getQueue();声明队列,并返回自动生成的队列的名称
   也可以通过channel.queueDeclare(queueName, durable, exclusive, autoDelete, arguments);声明命名的队列
   第二种声明方式的参数解释:durable为是否持久化到磁盘,它将使服务器重启后数据仍然存在;exclusive为是否排他队列,具有基于连接的排他性;autoDelete是否自动删除,在没有订阅者的情况下是否自动删除
3) 通过channel.queueBind(queueName, "exchangeName", "routingKey");绑定队列和交换机

TIP: 队列本身还包含了许多属性,例如:名称、交换代理重启后是否仍保存、自动删除、队列长度等参数,在声明队列的时候可以给定。

交换机种类主要有四种,以下介绍这几种交换机:

  1. 直连交换机(direct)
在直连交换机的三个声明操作中,交换机名、队列名都是不能为空字符串,路由键可以为空字符串但已被使用。发送到直连交换机的消息直接根据路由键进行完全匹配,将消息路由到所有以该路由键与该交换机绑定的队列中。

1) 生产者通过channel.basicPublish("exchangeName", routingKey, null, message.getBytes("UTF-8"));提交数据到Rabbitmq

2) 消费者通过channel.basicConsume(queueName, true, (consumerTag, delivery) -> {}, consumerTag -> { });从Rabbitmq消费消息
  1. 扇形交换机(fanout)
该交换机的设计思想是一个消息广播,在扇形交换机的三个声明操作中,路由键名可以为空字符串(事实上它会被忽略)。发送到扇形交换机的消息,会被路由到所有绑定在该交换机上的队列中(与直连交换机不同的是不再根据路由键进行筛选)。

1) 生产者通过channel.basicPublish("exchangeName", "", null, message.getBytes("UTF-8"));提交数据到Rabbitmq

2) 消费者通过channel.basicConsume(queueName, true, (consumerTag, delivery) -> {}, consumerTag -> { });从Rabbitmq消费消息
  1. 主题交换机(topic)
在主题交换机的三个声明中,路由键的命名是受限制的,必须由点号分割的多个单词或星号或井号组成,*用于匹配一个单词,井号用于匹配零到多个单词,发送到主题交换机的消息,会根据路由键匹配来路由到队列中。

1) 生产者通过channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));发送数据到Rabbitmq

2) 消费者通过channel.basicConsume(queueName, true, (consumerTag, delivery) -> {}, consumerTag -> { });从Rabbitmq消费消息
  1. 首部交换机(match 和 headers)
在首部交换机中,路由键名可以是空字符串(事实上它会被忽略),它的路由规则转而由首部属性决定,其中最重要的一个属性是x-match,该属性有两个取值(any/all),any表示发布消息时携带的键值对有一对能匹配上队列定义的其中一个就能,all表示所有的键值对需完全匹配。另外以x-开头的属性不被视作用于路由匹配的属性。
该模式下,交换机和队列绑定需要增加headers值,可通过如下设置:
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-match", "any");
arguments.put("arg1", "a1");
arguments.put("latitude", 51.5252949);
arguments.put("dataType", "json");
channel.queueBind(queueName, EXCHANGE_NAME, "", arguments);

使用:
1) 生产者通过如下代码发送消息到Rabbitmq
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("latitude",  51.5252949);
headers.put("longitude", -0.0905493);
channel.basicPublish(exchangeName, "",
             new AMQP.BasicProperties.Builder()
               .headers(headers)
               .build(),
               message.getBytes("UTF-8"));
               
2) 消费者通过channel.basicConsume(queueName, true, (consumerTag, delivery) -> {}, consumerTag -> { });从Rabbitmq消费消息

默认交换机规则是一个空字符串路由键的直连交换机定义官网,该交换机的路由键与队列键名相同,这使其看起来像是客户端直接提交消息到了队列中。所有新建的队列都天生绑定了这个默认交换机,它的存在也方便了一些简单需求的应用直接使用Rabbitmq。

6. ACK、TRANSACTION、RECOVERY和QOS

ACK、TRANSACTION和RECOVERY

​ 消费者消费队列消息时,过程中可能由于网络原因或业务原因,出现连接断开或者消费者未接收到消息或者消费者不能成功处理消息,这个时候可能会导致数据的丢失。基于这一点,Rabbitmq提供了,TRANSACRION机制、ACK机制和RECOVERY机制。

​ RECOVERY机制:Rabbitmq JAVA客户端提供连接恢复的功能,在启用该功能时,当客户端由于网络问题断开,会自动恢复客户端,包括恢复连接、恢复连接监听器、重新打开信道及监听器、恢复QOS设置、重新声明交换机、重新声明队列、恢复绑定和消费者,可以通过factory.setAutomaticRecoveryEnabled(true)来打开RECOVERY机制。

​ ACK机制:Rabbitmq提供了交付确认机制,可以设定自动确认交付和手动确认交付。消费者消费消息时的channel.basicConsume方法,第二个参数autoAcktrue时表示自动确认交付,为false时表示手动确认交付。自动确认交付模式下,消费者从队列消费消息时立即确认交付,这时队列中立即完全删除该消息。手动确认交付模式下,客户端需告诉服务端是否确认交付,共有三种交付状态:ACK、NACK、REJECT,当客户端发送ACK时表示客户端处理成功,这时Rabbitmq才会完全删除,ACK的调用为channel.basicAck(deliveryTag, false)。当客户端发送NACK和REJECT时均表示交付失败,区别时NACK能批量操作,REJECT的调用为channel.basicReject(deliveryTag, false),NACK的调用为channel.basicNack(deliveryTag, true, true),NACK和REJECT的最后一个参数均表示交付失败的是否重新进入队列,ACK和NACK的第二个参数均表示是否包含所有未回复的交付(即是否批量回复)。

​ TRANSACTION机制:Rabbitmq在AMQP协议下提供了事务机制,客户端使用channel.txSelect()方法开启事务,使用channel.txCommit()方法提交事务,使用channel.txRollback()方法回滚事务。该机制是重量级的,并且不是必要的,根据官网的说法,它将导致吞吐量降低250倍,因此官方后面添加了也能保证数据不丢失的ACK机制。

QOS

默认情况下,消费者一次会尽可能多的消费消息(根据客户端内存),设置QOS可以指定一次最高的消费量,可以使用channel.basicQos(prefetchCount)方法来设置。

7. 可视化界面

rabbitmq-plugins enable rabbitmq_management命令用于开启可视化管理界面,之后添加有该权限的用户即可访问15672端口使用

8. JAVA访问测试

POM依赖

<dependency>
	<groupId>com.rabbitmq</groupId>
	<artifactId>amqp-client</artifactId>
	<version>5.6.0</version>
</dependency>

简单HelloWorld

基于默认交换机,且不设QOS

生产者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Send {
    private final static String QUEUE_NAME = "hello",
			ROUTING_KEY = QUEUE_NAME;
	private static ConnectionFactory factory;
	private static Connection conn;
	private static synchronized Channel createConnectChannel() {
		if (factory == null) {
			factory = new ConnectionFactory();
			factory.setHost("118.25.54.197");
			factory.setVirtualHost("test");
			factory.setUsername(System.getProperty("username"));
			factory.setPassword(System.getProperty("password"));
		}
		if (conn == null) {
			try {
				conn = factory.newConnection();
			} catch(Exception e) {
				e.printStackTrace();
			}
		}
		try {
			Channel channel = conn.createChannel();
			return channel;
		} catch (Exception e) {
			e.printStackTrace();
		}
		
		throw new RuntimeException("create failed");
	}
	private static void closeChannel(Channel channel) {
		if (channel == null || !channel.isOpen()) return;
		synchronized (channel) {
			if (channel.isOpen()) {
				try {
					channel.close();
				} catch (IOException e) {
					e.printStackTrace();
				} catch (TimeoutException e) {
					e.printStackTrace();
				}
			}
		}
	}
	private static synchronized void closeConnection(Channel channel) {
		closeChannel(channel);
		if (conn != null && conn.isOpen()) {
			synchronized (conn) {
				if (conn != null && conn.isOpen()) {
					try {
						conn.close();
						conn = null;
					} catch (IOException e) {
						e.printStackTrace();
					}
				}
			}
		}
	}
    public static void main(String[] argv) throws Exception {
        Channel channel = createConnectChannel();
		
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		System.out.println(declare);
		String message = "Hello World...";
		channel.basicPublish("", ROUTING_KEY, null, message.getBytes());
		
		System.out.println(" [x] Sent '" + message + "'");
		closeConnection(channel);
    }
}

消费者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Recv {
    private final static String QUEUE_NAME = "hello";
    ...
    public static void main(String[] argv) throws Exception {
        Channel channel = createConnectChannel();
		
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
		
		CountDownLatch cdl = new CountDownLatch(1);
		DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
            cdl.countDown();
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
        
        cdl.await();
        closeConnection(channel);
    }
}

多个消费者

基于设QOS且存在ACK的情况

生产者

public class RabbitQueueProducer {
	...
	public static void main(String[] args) throws IOException {
		Channel channel = createConnectChannel();
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		
		Scanner sc = new Scanner(System.in);
		String nextLine = sc.nextLine();
		sc.close();
		if (!nextLine.contains(".")) nextLine += ".";

		String message = String.join("", nextLine);
		
		channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
		System.out.println(" [x] Sent '" + message + "'");
		closeConnection(channel);
	}
}

消费者

public class RabbitQueueConsumer {
	...
	private static SimpleDateFormat sdf = new SimpleDateFormat("HH🇲🇲ss.SSS");
	private static void doWork(String task) {
		for (char ch : task.toCharArray()) {
			if (ch == '.') {
				try {
					Thread.sleep(1000l);
				} catch (InterruptedException e) {
					e.printStackTrace();
					Thread.currentThread().interrupt();
				}
			} 
		}
	}
	
	public static void main(String[] args) throws IOException, InterruptedException {
		Channel channel = createConnectChannel();
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		Thread.sleep(4000l);
		channel.basicQos(1);
		CountDownLatch cdl = new CountDownLatch(1);
		DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            long deliveryTag = delivery.getEnvelope().getDeliveryTag();
            System.out.println(" [!] Delivery TAG '" + deliveryTag + "'");
            System.out.println(" [x] Received '" + message + "'");
            try {
            	doWork(message);
            } catch (Exception e) {
            	System.out.println(" [x] Done");
            	channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
            }
            boolean ack = Math.random() > 0.5;
            System.out.println(" [a] ACK '" + ack + "' " + sdf.format(new Date()));
            if (ack) {
            	channel.basicAck(deliveryTag, false);
            } else {
            	channel.basicNack(deliveryTag, false, true);
            }
            cdl.countDown();
        };
        System.out.println(" [*] Waiting for messages  " + sdf.format(new Date()));
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
        	System.out.println(" [x] Cancel '" + consumerTag + "'");
        });
        cdl.await();
        closeConnection(channel);
	}
}

Fanout

生产者

public class RabbitFanoutProducer {	
	private final static String EXCHANGE_NAME = "fanout";
	private final static String ROUTING_KEY = "";
	...
	public static void main(String[] args) throws IOException {
		Channel channel = createConnectChannel();
		
		
		Scanner sc = new Scanner(System.in);
		System.out.println(" [!] Enter Message");
		String nextLine = sc.nextLine();
		sc.close();
		if (!nextLine.contains(".")) nextLine += ".";

		String message = String.join("", nextLine);
		
		channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
		
		System.out.println(" [x] Sent '" + message + "'");
		closeConnection(channel);
	}
}

消费者

public class RabbitFanoutConsumer {
	private final static String EXCHANGE_NAME = "fanout";
	private final static String ROUTING_KEY = "";
	...
	public static void main(String[] args) throws IOException, InterruptedException {
		Channel channel = createConnectChannel();
		
		
		Scanner sc = new Scanner(System.in);
		System.out.println(" [!] Enter Queue Name");
		String queueName = sc.nextLine();
		sc.close();
		
		channel.queueDeclare(queueName, false, false, false, null);
		channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
		channel.queueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);
		
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
		
		CountDownLatch cdl = new CountDownLatch(3);
		DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            
            System.out.println(" [x] Received '" + message + "'");
            cdl.countDown();
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });

        
        cdl.await();
        closeConnection(channel);
	}
}