kaifuu的gravatar头像
kaifuu 2017-09-07 18:06:02
RabbitMQ安装及DEMO

配置

1.安装完以后erlang需要手动设置ERLANG_HOME 的系统变量。
输入:set ERLANG_HOME=C:\Program Files\erl8.0

2.激活Rabbit MQ's Management Plugin
使用Rabbit MQ 管理插件,可以更好的可视化方式查看Rabbit MQ 服务器实例的状态,你可以在命令行中使用下面的命令激活。
输入:rabbitmq-plugins.bat  enable  rabbitmq_management
同时,我们也使用rabbitmqctl控制台命令(位于 rabbitmq_server-3.6.3\sbin>)来创建用户,密码,绑定权限等。

2.1. 创建管理用户

输入:rabbitmqctl.bat add_user zhangweizhong weizhong1988

2.2. 设置管理员

输入:rabbitmqctl.bat set_user_tags zhangweizhong administrator

2.3. 设置权限

输入:rabbitmqctl.bat set_permissions -p / zhangweizhong ".*" ".*" ".*"

2.4. 其他命令
查询用户: rabbitmqctl.bat list_users
查询vhosts: rabbitmqctl.bat list_vhosts
启动RabbitMQ服务: net stop RabbitMQ && net start RabbitMQ
以上这些,账号、vhost、权限、作用域等基本就设置完了
 

Rabbit MQ管理后台

3.1. 使用浏览器打开http://localhost:15672 访问Rabbit Mq的管理控制台,使用刚才创建的账号登陆系统即可.Rabbit MQ 管理后台,可以更好的可视化方式查看RabbitMQ服务器实例的状态

3.2. 创建vhosts:创建vhosts,  在admin页面,点击右侧Virtual Hosts 将刚创建的OrderQueue分配给相关用户

3.3. RabbitMQ是用Erlang,对于主要的编程语言都有驱动或者客户端。我们这里要用的是Java,所以先要获得Java客户端。。下面是Java客户端的maven依赖的配置。
<dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>3.0.4</version>
</dependency>
像RabbitMQ这样的消息代理可用来模拟不同的场景,例如点对点的消息分发或者订阅/推送。我们的程序足够简单,有两个基本的组件,一个生产者用于产生消息,还有一个消费者用来使用产生的消息。
在这个例子里,生产者会产生大量的消息,每个消息带有一个序列号,另一个线程中的消费者会使用这些消息。

代码

抽象类EndPoint:
我们首先写一个类,将产生产者和消费者统一为 EndPoint类型的队列。不管是生产者还是消费者, 连接队列的代码都是一样的,这样可以通用一些。
package co.syntx.examples.rabbitmq;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * Represents a connection with a queue
 * @author syntx
 *
 */
public abstract class EndPoint{
 
    protected Channel channel;
    protected Connection connection;
    protected String endPointName;
 
    public EndPoint(String endpointName) throws IOException{
         this.endPointName = endpointName;
  
         //Create a connection factory
         ConnectionFactory factory = new ConnectionFactory();
    
         //hostname of your rabbitmq server
         factory.setHost("localhost");
  
         //getting a connection
         connection = factory.newConnection();
    
         //creating a channel
         channel = connection.createChannel();
    
         //declaring a queue for this channel. If queue does not exist,
         //it will be created on the server.
         channel.queueDeclare(endpointName, false, false, false, null);
    }
 
 
    /**
     * 关闭channel和connection。并非必须,因为隐含是自动调用的。
     * @throws IOException
     */
     public void close() throws IOException{
         this.channel.close();
         this.connection.close();
     }
}
生产者:
生产者类的任务是向队列里写一条消息。我们使用Apache Commons Lang把可序列化的Java对象转换成 byte 数组。commons lang的maven依赖如下:
<dependency>
 <groupId>commons-lang</groupId>
 <artifactId>commons-lang</artifactId>
 <version>2.6</version>
</dependency>
package co.syntx.examples.rabbitmq;

import java.io.IOException;
import java.io.Serializable;

import org.apache.commons.lang.SerializationUtils;


/**
 * The producer endpoint that writes to the queue.
 * @author syntx
 *
 */
public class Producer extends EndPoint{
 
 public Producer(String endPointName) throws IOException{
  super(endPointName);
 }

 public void sendMessage(Serializable object) throws IOException {
     channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object));
 } 
}
消费者:
消费者可以以线程方式运行,对于不同的事件有不同的回调函数,其中最主要的是处理新消息到来的事件。
package co.syntx.examples.rabbitmq;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang.SerializationUtils;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;


/**
 * 读取队列的程序端,实现了Runnable接口。
 * @author syntx
 *
 */
public class QueueConsumer extends EndPoint implements Runnable, Consumer{
 
 public QueueConsumer(String endPointName) throws IOException{
  super(endPointName);  
 }
 
 public void run() {
  try {
   //start consuming messages. Auto acknowledge messages.
   channel.basicConsume(endPointName, true,this);
  } catch (IOException e) {
   e.printStackTrace();
  }
 }

 /**
  * Called when consumer is registered.
  */
 public void handleConsumeOk(String consumerTag) {
  System.out.println("Consumer "+consumerTag +" registered");  
 }

 /**
  * Called when new message is available.
  */
 public void handleDelivery(String consumerTag, Envelope env,
   BasicProperties props, byte[] body) throws IOException {
  Map map = (HashMap)SerializationUtils.deserialize(body);
     System.out.println("Message Number "+ map.get("message number") + " received.");
  
 }

 public void handleCancel(String consumerTag) {}
 public void handleCancelOk(String consumerTag) {}
 public void handleRecoverOk(String consumerTag) {}
 public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1) {}
}
Putting it together:
在下面的测试类中,先运行一个消费者线程,然后开始产生大量的消息,这些消息会被消费者取走。
package co.syntx.examples.rabbitmq;

import java.io.IOException;
import java.sql.SQLException;
import java.util.HashMap;

public class Main {
 public Main() throws Exception{
  
  QueueConsumer consumer = new QueueConsumer("queue");
  Thread consumerThread = new Thread(consumer);
  consumerThread.start();
  
  Producer producer = new Producer("queue");
  
  for (int i = 0; i < 100000; i++) {
   HashMap message = new HashMap();
   message.put("message number", i);
   producer.sendMessage(message);
   System.out.println("Message Number "+ i +" sent.");
  }
 }
 
 /**
  * @param args
  * @throws SQLException
  * @throws IOException
  */
 public static void main(String[] args) throws Exception{
   new Main();
 }
}


打赏
最近浏览
最代码官方  LV168 2017年9月8日
顶部 客服 微信二维码 底部
>扫描二维码关注最代码为好友扫描二维码关注最代码为好友