Search
📒

9-2. Message Broker 활용

먼저 Spring Boot에서 RabbitMQ를 활용하기 위한 라이브러리를 살펴봅시다.
Initialzr 기준으로 Spring for RabbitMQ 의존성을 검색하시면 됩니다. 직접 gradle에 의존성을 추가한다면,
implementation 'org.springframework.boot:spring-boot-starter-amqp'
Java
복사
으로 작성하면 됩니다. 여기서 amqp 란, RabbitMQ를 비롯한 Message Broker들이 메시지를 주고받기 위한 통신 규약으로, Advanced Message Queueing Protocol의 약자입니다. HTTP와 마찬가지로 Application Layer 상에 정의된 통신 규약의 일종입니다.

Worker Queue 구성해보기

위에 언급한것처럼, Worker Queue를 구성하려면, 하나의 Exchange에 하나의 Queue를 작성하면 됩니다. 간단한 테스트를 위해서는, RabbitMQ에서 제공하는 기본 Exchange를 사용해도 됩니다.
@Configuration public class ProducerConfig { @Bean public Queue queue(){ return new Queue("boot.amqp.worker-queue", true, false, true); } }
Java
복사
Producer
Producer 서버에서 @Configuration 을 이용해 Queue를 @Bean 의 형태로 제공하면, Spring 내부에서 제공된 계정정보를 가지고 RabbitMQ에 Queue를 선언합니다.
@Service @RabbitListener(queues = "boot.amqp.worker-queue") public class ConsumerService { private static final Logger logger = LoggerFactory.getLogger(ConsumerService.class); @RabbitHandler public void receive(String message){ logger.info(" [x] Received '" + message + "'"); } }
Java
복사
Consumer
Consumer 서버에서는 @Configuration 을 구성할 필요 없이 @RabbitListener@RabbitHandler 어노테이션을 Bean 객체에서 사용하면, 만들어진 Queue에 연결이 됩니다. 주의할점은, 이렇게 Queue에 연결하는 것은 이미 만들어진 Queue가 존재해야 한다는 점입니다. 그래서 지금 상태에서는 Producer가 켜지지 않은 상태에서 Consumer가 켜질경우, Queue에 연결하지 못하여 에러가 발생합니다. 방지하고 싶다면, Consumer에도 동일한 설정의 Bean 객체를 만들면 됩니다.
여기에 Queue에 메시지를 적재하기 위한 Controller를 추가하고 테스트 해봅시다.
public void send() { StringBuilder builder = new StringBuilder("Hello"); if (dots.incrementAndGet() == 4) { dots.set(1); } builder.append(".".repeat(Math.max(0, dots.get()))); builder.append(count.incrementAndGet()); String message = builder.toString(); rabbitTemplate.convertAndSend(rabbitQueue.getName(), message); logger.info(" [x] Sent '" + message + "'"); }
Java
복사
ProducerService.java
public ProducerController( @Autowired ProducerService producerService ) { this.producerService = producerService; } @GetMapping("/") public void sendMessage(){ producerService.send(); }
Java
복사
ProducerController.java

Publish Subscribe 구성해보기

Publish Subscribe의 경우도 큰 차이는 없습니다. Worker Queue와의 차이점은, 모든 어플리케이션이 메시지를 받도록 구성하기 위해, Exchange를 정의를 해야한다는 점입니다. 또한 Exchange에 메시지를 듣고있는, 각 어플리케이션이 별도의 익명 Queue를 선언하게 됩니다.
@Configuration public class PublisherConfig { @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("boot.fanout.exchange"); } }
Java
복사
PublisherConfig.java
Publisher 쪽에서는 Exchange만 정의하면 됩니다. Queue에 직접적으로 메시지를 작성하지 않기 때문에, Queue를 정의할 필요가 없습니다. 여기서 FanoutExchange는, 연결된 모든 Queue에 차별없이 메시지를 전달하게 됩니다. 다른 Exchange의 경우, Binding을 진행할때의 조건에 따라 특정 메시지만 받도록 설정이 가능합니다.
public void publishMessage(){ StringBuilder builder = new StringBuilder("Hello"); if (dots.incrementAndGet() == 4) { dots.set(1); } builder.append(".".repeat(Math.max(0, dots.get()))); builder.append(count.incrementAndGet()); String message = builder.toString(); rabbitTemplate.convertAndSend( fanoutExchange.getName(), "", message ); }
Java
복사
PublisherService.java
Publish를 진행할때는 Queue 대신 Exchange의 이름을 제공하고, 다른 Exchange에서 활용하게 되는 routing key에 더미데이터를 추가해둡니다. 앞서 설명한 다른 Exchange들에서 사용하는 값입니다.
@Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("boot.fanout.exchange"); } @Bean public Queue autoGenQueue(){ return new AnonymousQueue(); } @Bean public Binding binding( Queue queue, FanoutExchange fanoutExchange ){ return BindingBuilder .bind(queue) .to(fanoutExchange); }
Java
복사
SubscriberConfig.java
Publisher와 동일하게 FanoutExchange를 설정하고, 해당 Exchange에 연결할 Queue와, 실제 연결을 진행할 Binding까지 정의합니다.
@Service @RabbitListener(queues = "#{autoGenQueue.name}") public class SubscriberService { private static final Logger logger = LoggerFactory.getLogger(SubscriberService.class); @RabbitHandler public void receiveMessage(String messageRaw){ logger.info("received: {}", messageRaw); } }
Java
복사
SubscriberService.java
@RabbitListener 에서 Queue를 받아오게 되면 구성이 끝납니다. 마지막으로 테스트를 진행해봅시다.