justgo_developer

RabbitMQ 본문

IT/기타

RabbitMQ

다날92 2023. 10. 5. 19:23
728x90
반응형

RabbitMQ

목차

  1. RabbitMQ 개념
  2. RabbitMQ 사용 이유
  3. RabbitMQ 적용 예제
  4. Kafka vs RabbitMQ 차이

상세

1. RabbitMQ 개념과 원리

RabbitMQ는 오픈 소스 메시지 브로커 소프트웨어로서,
AMQP(Advanced Message Queueing Protocol) 프로토콜을 구현한 메시지 브로커입니다.

AMQP란?
: Advanced Message Queueing Protocol의 약자로 메세지 큐 오픈소스에 기반한 표준 프로토콜입니다.
Producer가 Middleware Broker에 메세지를 발행하고
Consumer가 Broker의 Queue를 통해 메세지를 구독합니다.

AMQP는 Producer, Consumer, Broker, Exchange, Binding, Queue로 구성되어 있습니다.

  • Producer : Producer는 Exchange에 메세지를 발행합니다.(Client)
  • Consumer : Consumer는 Broker 안 Queue를 통해 메세지를 구독합니다.
  • Broker : Exchange에서 받은 메세지를 Binding규칙에 의해 Queue로 메세지 전달합니다.
  • Binding : Exchange와 Queue 간 라우팅 규칙과 관계를 정의합니다.
  • Queue : 메세지를 저장합니다.
  • Exchange : Exchange는 Producer로부터 수신한 메세지를 Bingding 규칙에 따라 적절한 Queue로 메세지를 라우팅합니다.
    • Direct Exchange : 메세지는 Binding Key가 Routing Key와 정확히 일치하는 Queue로 전달
    • Fanout Exchange : 모든 Queue에게 메세지 전달
    • Topic Exchange : 정해진 Binding 패턴이 일치하는 Queue로 전달
    • Headers Exchange : 헤더의 key-value로 정의된 일치조건에 따라 Queue로 전달

2. RabbitMq 사용 이유

  1. 비동기 처리 가능
  2. 빠른 개발 가능
  3. 사용하기 쉽다
728x90

3. RabbitMQ 적용 예제

일단, RabbitMQ를 설치하고 RabbitMQ 서버에 접속합니다.
그 이후 RabbitMQ 서버에서 exchange, queue, binding을 설정합니다.

  • Durability : 브로커가 재시작 될 때 남아 있는지 여부(durable : 재시작 시 유지 , transient : 재시작 시 삭제)
  • Auto delete : 마지막 Queue 연결이 해제되면 삭제

RabbitMQ exchange 세팅

RabbitMQ queue 세팅

RabbitMQ binding 세팅

rabbitmq를 사용하기 위해서 아래의 dependency 추가

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

#Consumer 예제

application.yml에 파일에는 rabbitmq host, port, username, password를 작성한다.

server:
  port: 9999

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: test
    password: test

Consumer 부분에는 @RabbitListener를 이용해 Queue명을 설정하면 간단하게 메세지를 수신할수 있다.

package com.server.people92.rabbitmq.consumer;


import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class RabbitmqListener {

    private static final String RABBITMQ_TEST_QUEUE = "rabbitmq.test.queue";

    @RabbitListener(queues = RABBITMQ_TEST_QUEUE)
    public void receiveRabbitmqMessage(String message){

        log.info("==== rabbitmq result => {}", message);
    }
}

#Producer 예제

application.yml에는 Consumer와 일치하지만 서버를 동시에 올리기 위해 서버 포트만 consumer와 다르게 설정한다.

server:
  port: 9710

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: test
    password: test

RabbitMQ 서버에서 생성한 exchange, queue, biding 명을 빈으로 등록한다.

package com.server.people92.rabbitmq.producer;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;

public class RabbitmqConfig {

    private static final String TEST_EXCHANGE_NAME = "rabbitmq.test.direct";
    private static final String TEST_QUEUE_NAME = "rabbitmq.test.queue";
    private static final String TEST_BIDING_KEY = "rabbitmq.test.key";

    @Bean
    TopicExchange exchange() {
        return new TopicExchange(TEST_EXCHANGE_NAME);
    }
    @Bean
    Queue queue() {
        return new Queue(TEST_QUEUE_NAME);
    }
    @Bean
    Binding binding (Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(TEST_BIDING_KEY);
    }
    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }

}

테스트를 위해 Controller를 호출하여 RabbitMQ로 메세지를 발행하도록 구현하였다.
RabbitTemplate을 이용해 메세지를 RabbitMQ로 발행한다.

package com.server.people92.rabbitmq.producer;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;


@RestController
public class RabbitmqProducer {

    private static final String TEST_EXCHANGE_NAME = "rabbitmq.test.direct";
    private static final String TEST_BIDING_KEY = "rabbitmq.test.key";


    private RabbitTemplate rabbitTemplate;

    public RabbitmqProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    @GetMapping("/producer/{message}")
    public String RabbitmqProducer(@PathVariable String message){

        rabbitTemplate.convertAndSend(TEST_EXCHANGE_NAME,TEST_BIDING_KEY, message);

        return "SUCCESS";
    }
}

Consumer에선 boot 실행시 RabbitMQ에 연결된 로그가 나오지만, producer에선 나오지 않았다.
처음엔 연결이 잘못된건가 해서 찾아보다가, 실제 RabbitMQ에 메세지를 발행하니 연결된 로그가 발생하였다.
Producer는 실제 RabbitMQ에 부트 실행시 연결되는게 아니라 첫 메세지 발행 후 연결되는 것 같다.

producer spring boot 실행 로그

2021-05-02 15:44:01.327  INFO 70592 --- [           main] c.s.p.r.producer.ProducerApplication     : Starting ProducerApplication using Java 1.8.0_281 on MinJungui-MacBookAir.local with PID 70592 (/Users/minjungyun/Desktop/Coding/producer/target/classes started by minjungyun in /Users/minjungyun/Desktop/Coding/producer)
2021-05-02 15:44:01.329  INFO 70592 --- [           main] c.s.p.r.producer.ProducerApplication     : No active profile set, falling back to default profiles: default
2021-05-02 15:44:02.370  INFO 70592 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 9710 (http)
2021-05-02 15:44:02.377  INFO 70592 --- [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2021-05-02 15:44:02.378  INFO 70592 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.45]
2021-05-02 15:44:02.444  INFO 70592 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2021-05-02 15:44:02.445  INFO 70592 --- [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1075 ms
2021-05-02 15:44:03.178  INFO 70592 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2021-05-02 15:44:03.410  INFO 70592 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 9710 (http) with context path ''
2021-05-02 15:44:03.430  INFO 70592 --- [           main] c.s.p.r.producer.ProducerApplication     : Started ProducerApplication in 2.498 seconds (JVM running for 3.116)

consumer spring boot 실행 로그

2021-05-02 15:46:45.687  INFO 71348 --- [           main] c.s.p.r.c.RabbitmqConsumerApplication    : Starting RabbitmqConsumerApplication using Java 1.8.0_281 on MinJungui-MacBookAir.local with PID 71348 (/Users/minjungyun/Desktop/Coding/consumer/target/classes started by minjungyun in /Users/minjungyun/Desktop/Coding/consumer)
2021-05-02 15:46:45.689  INFO 71348 --- [           main] c.s.p.r.c.RabbitmqConsumerApplication    : No active profile set, falling back to default profiles: default
2021-05-02 15:46:46.622  INFO 71348 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 9999 (http)
2021-05-02 15:46:46.627  INFO 71348 --- [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2021-05-02 15:46:46.627  INFO 71348 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.45]
2021-05-02 15:46:46.676  INFO 71348 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2021-05-02 15:46:46.676  INFO 71348 --- [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 944 ms
2021-05-02 15:46:46.943  INFO 71348 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2021-05-02 15:46:47.440  INFO 71348 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 9999 (http) with context path ''
2021-05-02 15:46:47.442  INFO 71348 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2021-05-02 15:46:47.474  INFO 71348 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#4992613f:0/SimpleConnection@45e22def [delegate=amqp://guest@127.0.0.1:5672/, localPort= 56055]
2021-05-02 15:46:47.586  INFO 71348 --- [           main] c.s.p.r.c.RabbitmqConsumerApplication    : Started RabbitmqConsumerApplication in 2.336 seconds (JVM running for 2.869)

producer 메세지 발행 후 로그

2021-05-02 15:48:39.696  INFO 70592 --- [nio-9710-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2021-05-02 15:48:39.698  INFO 70592 --- [nio-9710-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2021-05-02 15:48:39.723  INFO 70592 --- [nio-9710-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 25 ms
2021-05-02 15:48:39.800  INFO 70592 --- [nio-9710-exec-1] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2021-05-02 15:48:39.839  INFO 70592 --- [nio-9710-exec-1] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#2b97cc1f:0/SimpleConnection@40a851c8 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 56081]

728x90
반응형

'IT > 기타' 카테고리의 다른 글

Bearer Authorization  (0) 2023.10.10
Gson 이용 시 Unicode 변환 해결 방법  (0) 2023.10.10
Xml 파싱  (1) 2023.10.05