Pink Spider/Apache Kafka 사용 시 코드로 오류 처리 방법

Created Wed, 26 Mar 2025 13:38:03 +0900 Modified Mon, 08 Dec 2025 08:41:47 +0900
1359 Words 6 min

Spring Boot에서 Kafka를 사용할 때 오류를 코드로 해결하는 방법을 설명해볼게요.
Kafka 프로듀서와 컨슈머에서 Fallback 메소드, 재시도 (Retry), 예외 처리 등을 적용하는 방법을 알아봅니다.


1. Kafka 프로듀서에서 오류 처리 (Fallback, Retry)

Kafka 프로듀서는 메시지를 브로커로 보내는 과정에서 오류가 발생할 수 있습니다.
예를 들어, 네트워크 문제, 브로커 다운, 메시지 크기 초과 등의 문제가 있을 수 있는데요, 이를 처리하는 방법입니다.

(1) Fallback 메소드 적용 (Resilience4j 사용)

Resilience4j@Retry@Recover를 활용하면
Kafka 전송이 실패했을 때 자동으로 재시도를 하고, 최종적으로 실패하면 대체 로직을 수행할 수 있습니다.

📌 구현 예제

import io.github.resilience4j.retry.annotation.Recover;
import io.github.resilience4j.retry.annotation.Retry;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class KafkaProducerService {
    private static final Logger logger = LoggerFactory.getLogger(KafkaProducerService.class);
    private final KafkaTemplate<String, String> kafkaTemplate;

    @Retry(name = "kafkaRetry", fallbackMethod = "sendMessageFallback")
    public void sendMessage(String topic, String key, String message) {
        logger.info("Sending message: {} to topic: {}", message, topic);
        kafkaTemplate.send(new ProducerRecord<>(topic, key, message))
                .whenComplete((result, ex) -> {
                    if (ex != null) {
                        throw new RuntimeException("Kafka send failed", ex);
                    }
                    logger.info("Message sent successfully to {}", result.getRecordMetadata());
                });
    }

    @Recover
    public void sendMessageFallback(Exception ex, String topic, String key, String message) {
        logger.error("Kafka message send failed. Fallback method invoked. Error: {}", ex.getMessage());
        // 여기서 대체 로직 실행 (예: 메시지를 DB나 다른 저장소에 저장)
    }
}

📌 주요 포인트

  • @Retry(name = "kafkaRetry"): Kafka 전송이 실패하면 자동으로 재시도
  • @Recover: 재시도 후에도 실패하면 Fallback 메소드 실행
    → 이곳에서 DB 저장, 재처리 큐(RabbitMQ 등)로 메시지 전달 가능

(2) Kafka Send 실패 시 Callback으로 예외 처리

KafkaTemplate에서 send() 메소드의 addCallback을 활용하여 실패 시 처리하는 방법입니다.

📌 구현 예제

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.stereotype.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Service
public class KafkaProducerServiceWithCallback {
    private static final Logger logger = LoggerFactory.getLogger(KafkaProducerServiceWithCallback.class);
    private final KafkaTemplate<String, String> kafkaTemplate;

    public KafkaProducerServiceWithCallback(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessageWithCallback(String topic, String key, String message) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key, message);

        future.addCallback(new ListenableFutureCallback<>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                logger.info("Message [{}] delivered to topic [{}] with offset [{}]",
                        message, topic, result.getRecordMetadata().offset());
            }

            @Override
            public void onFailure(Throwable ex) {
                logger.error("Failed to send message [{}] to topic [{}]", message, topic, ex);
                // 실패 시 대체 로직 실행 (예: DB 저장, 알림 전송 등)
            }
        });
    }
}

📌 주요 포인트

  • 성공 시 onSuccess(), 실패 시 onFailure()에서 예외 처리 가능
  • 실패 시 DB 저장, Slack 알림 전송, 또는 재처리 큐로 이동 가능

2. Kafka 컨슈머에서 오류 처리 (Fallback, Retry)

Kafka 컨슈머가 메시지를 받을 때 네트워크 문제, 오프셋 오류, 데이터 파싱 오류 등이 발생할 수 있습니다.
이를 해결하는 방법을 알아볼게요.


(1) @Retryable을 사용한 자동 재시도

컨슈머에서 특정 오류 발생 시 자동으로 재시도할 수 있습니다.

📌 구현 예제

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.retry.annotation.Retryable;
import org.springframework.retry.annotation.Recover;
import org.springframework.stereotype.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Service
public class KafkaConsumerService {
    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerService.class);

    @Retryable(value = RuntimeException.class, maxAttempts = 3)
    @KafkaListener(topics = "test-topic", groupId = "test-group")
    public void consume(String message) {
        logger.info("Received message: {}", message);
        if (message.contains("error")) {
            throw new RuntimeException("Processing error!");
        }
    }

    @Recover
    public void recover(RuntimeException ex, String message) {
        logger.error("Message processing failed after retries. Moving to fallback logic. Message: {}", message);
        // 여기서 실패한 메시지를 저장 (예: DB, Dead Letter Queue)
    }
}

📌 주요 포인트

  • @Retryable(maxAttempts = 3): 최대 3번 재시도 후 실패하면 @Recover 메소드 실행
  • 실패한 메시지를 DB나 Dead Letter Queue (DLQ) 로 이동 가능

(2) Kafka Dead Letter Queue (DLQ) 활용

Kafka에서는 DLQ (Dead Letter Queue) 를 활용하여
처리 실패한 메시지를 별도의 토픽으로 보낼 수 있습니다.

📌 설정 예제 (application.yml)

spring:
  kafka:
    consumer:
      enable-auto-commit: false
      auto-offset-reset: earliest
      properties:
        spring.kafka.listener.type: batch
        spring.kafka.listener.ack-mode: manual_immediate
        spring.kafka.listener.dead-letter-policy:
          enable: true
          dead-letter-topic: "dead-letter-topic"

이렇게 하면 실패한 메시지는 "dead-letter-topic"으로 자동 전송됩니다.


3. Circuit Breaker를 활용한 보호 (Resilience4j)

Kafka 브로커가 다운될 경우 Circuit Breaker를 적용하여 서비스 장애를 방지할 수 있습니다.

📌 구현 예제

import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerWithCircuitBreaker {
    private final KafkaTemplate<String, String> kafkaTemplate;

    public KafkaProducerWithCircuitBreaker(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @CircuitBreaker(name = "kafkaBreaker", fallbackMethod = "fallback")
    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }

    public void fallback(String topic, String message, Exception ex) {
        System.out.println("Kafka is unavailable. Storing message in DB.");
        // DB 저장 로직 추가 가능
    }
}

📌 주요 포인트

  • Kafka 장애 발생 시 Circuit Breaker가 작동하여 fallback() 실행
  • 브로커가 복구되면 자동으로 다시 메시지 전송 가능

🔎 정리

  • 프로듀서 오류 처리@Retry, addCallback(), CircuitBreaker
  • 컨슈머 오류 처리@Retryable, Dead Letter Queue
  • Fallback 처리@Recover, DLQ, Database 저장

이렇게 하면 Kafka 오류를 코드로 유연하게 처리할 수 있습니다! 🚀