본문 바로가기

개발 공부

RabbitMQ 작업대기열(Work Queue) Java 2

저번시간에는 Queue가 데이터를 한번씩 나눠서 consumer 에게 전달을 하는 코드를 짜봤다.

하지만 이렇게 짜게 된다면 문제가 발생하는데 예를들어

데이터가 무거운거 한번 가벼운거 한번이 번갈아 가면서 Queue에 들어오게 되면 한개의 consumer는 너무 바빠지고, 하나의 consumer는 놀게되는 이슈가 발생한다 이를 해결하기 위해서 tcp 3 way handshake를 사용하는데,

비동기 처리 방식에서 사용되는 연결 설정 과정이다.

간단하게 설명하면 Queue가 consumer에게 지금 데이터를 받을 수 있는 상황인지 먼저 노크를 한번 해주는 과정이라고 생각 하면 된다.

이때 consumer 가 Queue에게 데이터를 받을 수 있다 라고 신호를 보내주는게 Ack를 보낸다고 이해하고 넘어가자

그럼 Worker class를 수정 해 보겠다.

  • 한번에 2개 이상의 메세지를 받지 않도록 설정
//RabbitMQ가 consumer 에게 한 번에 둘 이상의 메세지를 제공하지 않음.
int prefetchCount = 1;
channel.basicQos(prefetchCount);
  • Ack를 전송(메세지의 손실 여부를 판단)
try {
    doWork();
} finally {
    System.out.println(" [x] Done");
    //ack를 전송(메세지의 손실 여부를 판단)
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
  • tcp 통신에서의 Ack를 설정
//tcp 통신에서의 ack 설정.
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
}
  • 전체 Worker Class
package recv;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Worker {
    //Queue name 선언
    private final static String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws Exception {

        //서버와 연결 만들기
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //대기열 선언
        channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
        System.out.println("[*] Waiting for messages, To exit press CTRL+C" +
                "[*] 메세지를 기다리는 중입니다. 종료하려면 CTRL+C를 누르십시오.");

        //기존 메세지 = 균등하게 적용 -> 현제 메세지 = RabbitMQ가 consumer 에게 한 번에 둘 이상의 메세지를 제공하지 않음.
        int prefetchCount = 1;
        channel.basicQos(prefetchCount);

        //데이터를 받을때까지 버퍼링/ 받게되면 byte 데이터를 String으로 변환.
        //가상 딜레이 적용
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");

            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork();
            } finally {
                System.out.println(" [x] Done");
                //ack를 전송(메세지의 손실 여부를 판단)
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        //tcp 통신에서의 ack 설정.
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
        });
    }

    //가상 딜레이 설정
    private static void doWork() {
        try {
            Thread.sleep(10000);
        } catch (InterruptedException _ignored) {
            Thread.currentThread().interrupt();
        }
    }
}
  • 4회 실행 결과

수정한 Worker는 한번을 받았고, Recv는 3번을 받았다.

'개발 공부' 카테고리의 다른 글

JUnit 기본  (0) 2023.06.19
RabbitMQ 작업대기열(Work Queue) Java  (0) 2023.06.13
RabbitMQ Recv Java  (0) 2023.06.13
RabbitMQ Sendding Java  (0) 2023.06.13
RabbitMQ 사용해 보기 Java  (0) 2023.06.13