RabbitMQ初体验

最近看redis的时候碰到书上讲MQ,这里也想大概记录下目前基本的理解

var rabbit = amqp.createConnection();

// rabbit ready 后创建了ex 和 charge 这个q, bind后啥都没做
rabbit.on('ready', function () {
rabbit.exchange('credit_charge', { autoDelete: false }, function (ex) {
rabbit.queue('charge', { autoDelete: false }, function (q) {
q.bind('credit_charge', q.name);
q.close();
startServer(ex);
});
});
});

// 这里注册了路由, 创建了 随机名字的q,并且给charge这个q 发送了一个消息 订阅自己这个随机名字q
// worker 里会创建 charge 这个q,并订阅charge这个q,也就是会受到这个detail的message
// worker 会向这个随机名字的q 返回信息 message done,所以下面这个订阅也会输出
function startServer(ex) {
app.get('/credit_charge', function (req, res) {
rabbit.queue('', { exclusive: true, autoDelete: true }, function (q) {
q.bind('credit_charge', q.name);
ex.publish('charge', { card: 'details' }, { replyTo: q.name });
q.subscribe(function (message) {
console.log(message);
q.destroy();
q.close();
res.send('Charged! Thanks!');
});
});
});
var server = app.listen(8002);

io = io.listen(server);
io.on('connection', function (socket) {
// socket.io 连接后创建q
rabbit.queue(socket.id, { exclusive: true, autoDelete: true },
function (q) {
q.bind('credit_charge', q.name);
// 这里订阅了本身q的信息并发事件给前端
q.subscribe(function (message, headers, delivery) {
socket.emit(headers.emitEvent);
});
socket.on('charge', function (data) {
// 收到charge事件后 发消息给才charge 这个q
// worker里会返回 message done
ex.publish('charge', { card: 'details' }, {
replyTo: q.name, headers:
{ emitEvent: 'charged' }
});
});
socket.on('disconnect', function () {
q.destroy();
q.close();
});
});
});
};

基本用法

  1. 连接MQ拿到rabbit这个connection
  2. 创建exchange, exchange里面可以包含多个queue
  3. 创建queue用bind和ex进行绑定
  4. 绑定后的 ex pulish任何的消息,订阅方都是可以收到的。 如果没有订阅就会存在队列里,等有人订阅的时候自然会顺序发出消息

理解的时候让我想起了rxjs。基本和stream是一致的。pub/sub 模式
lazy的特点让消息可以滞留在stream里等待有人订阅