rabbitmq操作指南
消费者
- 创建工厂(host,port,virtualhost,username,password)
- 创建连接
- 创建channel
- 调用channel
消费者方法:channel.basicConsume(,,)
消费者创建mq配置:channel.exchangeDeclare;channel.queueDeclare;channel.queueBind
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.128");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin"); factory.setPassword("admin");
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("Received message : '" + msg + "'"); System.out.println("consumerTag : " + consumerTag ); System.out.println("deliveryTag : " + envelope.getDeliveryTag() ); } };
channel.basicConsume(QUEUE_NAME, true, consumer);
|
生产者
生产者方法:channel.basicpublish(,,)
生产者必须关闭通道和连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.128");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin"); factory.setPassword("admin");
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
String msg = "Hello world, Rabbit MQ";
channel.basicPublish(EXCHANGE_NAME, "gupao.best", null, msg.getBytes());
channel.close(); conn.close();
|
队列删除
1 2
| channel.queueDelete channel.exchangeDelete
|
死信队列

生产者抛消息给队列,队列没有消息,队列绑定了过期的exchange或者queue,消息里面有设置过期时间,过期后就把消息丢给对应的exchange或者queue
启用插件
rabbitmq-plugins enable rabbitmq_management