rabbitmq

rabbitmq

rabbitmq와 같은 messaging queue를 사용하는이유는 다음과 같을것이다.

  1. 실시간으로 데이터를 빠르게 처리해야한다.
  2. 메시지 유실이 없어야한다.
  3. 대용량 실시간 traffic에 대한 확장(cluster)에 유연 해야한다.

메신저 서비스와 같은 카카오톡, 라인 그밖의 채팅시스템에서는 무조건적으로 채택해야할 기술중 하나라고 생각된다.
AMQP 프로토콜을 사용하여 진행한 간단한 테스트를 정리해본다.

setup broker

테스트 환경이므로 빠르게 rabbitmq를 docker를 활용하여 띄어본다.

1
$ docker run -d -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=0000 --hostname my-rabbit --name rabbit -p 5672:5672 -p 8080:15672 rabbitmq:3-management

docker hub rabbitmq official는 https://hub.docker.com/_/rabbitmq/ 에서 확인할수있다.
필자는 config를 commandline 대신 management dashboard를 활용할것이기에 management version으로 실행하였다.
localhost:8080 접속하여 container 환경변수로 정의된 user 와 password 를 입력한후 다음의 설정을 진행한다.

  • client 접근계정 생성
  • virtual host 생성
  • 생성된 계정에 생성된 virtual host access 권한 설정

user: jaehunpark
password: 0000
virtual host: mq

producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
const amqp = require('amqplib/callback_api');
const exchange = 'group';
const exchangeOpts = {
durable: true, // broker restart시 해당 exchange 삭제 여부 ( false면 broker 재시작시 exchage 삭제 ) // default true
autoDelete: false // exchange에 속한 모든 queue의 갯수가 0일시 exchange 삭제 여부. queueOpts의 autoDelete에 의존함 // default false
}
const publishOpts = {
persistent: true // broker restart시 message 보존 여부
}
const routingKey = 'one.two.three.four.end';
const msg = 'Hello World!';
// amqp://usename:password@host/vhost_name
amqp.connect('amqp://jaehunpark:0000@localhost:5672/mq', (err, conn) => {
conn.createChannel((err, ch) => {
ch.assertExchange(exchange, 'topic', exchangeOpts );
ch.publish(exchange, routingKey, new Buffer(msg), publishOpts ); // unique queue id를 가진 client에게 direct로 보낼시 exchange를 '' 로세팅, unique routingKey가 queue id
console.log(" [x] Sent %s:'%s'", routingKey, msg);
});
setTimeout(() => { conn.close(); process.exit(0) }, 500);
});

consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
const amqp = require('amqplib/callback_api');
const exchange = 'group';
const exchangeOpts = {
durable: true, // broker restart시 해당 exchange 삭제 여부 ( false면 broker 재시작시 exchage 삭제 ) // default true
autoDelete: false // exchange에 속한 모든 queue의 갯수가 0일시 exchange 삭제 여부. queueOpts의 autoDelete에 의존함 // default false
}
const queue = 'uniqueQueueId1';
const queueOpts = {
durable: true, // broker restart시 해당 queue를 삭제 여부 ( false면 broker 재시작시 queue 삭제 ) // default true
autoDelete: false // queue가 disconnect 될시 queue를 삭제 여부 ( queue에 있는 message 유실됨. persistent X ) // default false
}
// [ topic ]
// exchange를 topic으로 구성시 모든 매칭은 . 로 구분
// start.one.two.three => 매칭
// one.two.three.end => 매칭
// one.two.middle.three.four => 매칭
// one.two.center.three.four => 비매칭
// one.center.two => 매칭
// => #은 복수의 구분자를 만족
// => *는 단일 구분자 만족
// [ direct ]
// exchange를 direct로 구성시 패턴 매칭 불가. 정확한 queue name을 가져야함
// [ fanout ]
// exchange를 fanout로 구성시 queue 정의필요 X 해당 exchange에 속한 모든 unique queue id 에게 메시지를 보냄.
const queues = ['start.#','#.end','#.middle.#','*.center.*'];
const consumeOpts = {
noAck: false // 받은 message를 받았다고 broker에게 알림 여부 ( false면 수동으로 ch.ack(msg) 보내줘야함 ) // default false
};
// amqp://usename:password@host/vhost_name
amqp.connect('amqp://jaehunpark:0000@localhost:5672/mq', (err, conn) => {
conn.createChannel((err, ch) => {
ch.assertExchange(exchange, 'topic', exchangeOpts);
// ch.prefetch(10);
ch.assertQueue(queue, queueOpts, (err, q) => {
console.log(' [*] Waiting for logs. To exit press CTRL+C');
queues.forEach((key) => {
ch.bindQueue(q.queue, exchange, key);
});
ch.consume(q.queue, (msg) => {
console.log(" [x] %s:'%s'", msg.fields.routingKey, msg.content.toString());
if (!consumeOpts.noAck){
// ch.ack(msg); 정의하지않을시 기본적으로 전체메시지를 전부다시 받아옴. 가장 마지막 메시지들을 가지고올수있는 옵션정의는 다음과같음
// ch.assertQueue 정의하기전에 ch.prefetch(10); 정의할것
ch.ack(msg); // broker에게 message 받았다고 알려줌
};
}, consumeOpts );
});
});
});

rabbitmq에서 사용되는 메시징 처리 방식에는 fanout, direct, topic 방식이 존재한다.
해당예제는 topic( 패턴매칭 방식 )으로 작성하였다.

  • fanout: 알려진 모든 Queue에 메시지 전달
  • direct: 지정된 routingKey를 가진 Queue에만 메시지 전달 함
  • topic: 지정된 패턴 바인딩 형태에 일치하는 Queue에만 메시지 전달. #(여러단어), *(한단어)를 통한 문자열 패턴 매칭
  • header: 헤더에 포함된 key=value의 일치조건에 따라서 메시지 전달

또한 consumer는 unique한 queue를 가진다
client들이 중복된 queue를 가질경우 broker는 자동으로 메시지를 분배하여 메시지를 publish 한다.( Round-robin 으로 처리 )

참고

이미 생성된 exchange나 queue에 대해서 생성된 설정정보를 반드시 따라야하며 생성된 설정정보를 벗어난 설정으로 연결시 ( 특히 durable, persistent, autoDelete 등 ) 에러를 뱉으니 주의해야한다

참고