Pink Spider/Redis에서 pub/sub, 그리고 Redis Stream

Created Wed, 21 May 2025 14:20:45 +0900 Modified Mon, 08 Dec 2025 08:41:47 +0900
1991 Words 9 min

Redis에서 Pub/Sub(Publish/Subscribe) 기능을 사용하는 방법은 비교적 간단합니다. 아래는 기본적인 사용 방법과 예제입니다.


🧠 개념 요약

Redis Pub/Sub는 메시지 브로커 역할을 수행하여, 한 클라이언트가 메시지를 발행(Publish)하면, 해당 채널을 구독(Subscribe)하고 있는 다른 클라이언트들이 실시간으로 그 메시지를 받게 됩니다.

  • Publisher: 채널에 메시지를 발행하는 클라이언트
  • Subscriber: 특정 채널을 구독하고 메시지를 받는 클라이언트

🛠 명령어

  • PUBLISH <channel> <message>: 메시지를 발행
  • SUBSCRIBE <channel> [<channel> ...]: 채널 구독
  • UNSUBSCRIBE [<channel> ...]: 채널 구독 취소

💻 예제 (CLI 기준)

터미널 1 – Subscriber

redis-cli
> SUBSCRIBE news

터미널 2 – Publisher

redis-cli
> PUBLISH news "Hello from Redis!"

Subscriber는 다음과 같은 응답을 받습니다:

1) "message"
2) "news"
3) "Hello from Redis!"

🧑‍💻 Python 예제 (redis-py 사용)

import redis
import threading

r = redis.Redis(host='localhost', port=6379, db=0)

def listen():
    pubsub = r.pubsub()
    pubsub.subscribe('news')
    for message in pubsub.listen():
        if message['type'] == 'message':
            print(f"Received: {message['data'].decode('utf-8')}")

# Subscriber thread
threading.Thread(target=listen).start()

# Publisher
r.publish('news', 'Breaking News!')

⚠️ 주의사항

  • Pub/Sub 메시지는 영속적이지 않음: 수신자가 구독 중이 아닐 경우 메시지는 유실됩니다.
  • 분산 처리에 적합하지 않음: 메시지를 보장하거나 처리 상태를 추적해야 한다면 Redis Streams나 Kafka와 같은 큐 시스템을 고려하세요.

✅ 활용 예시

  • 실시간 채팅 시스템
  • 실시간 알림 시스템
  • 간단한 이벤트 전파

JAVA

Java에서 Redis의 Pub/Sub 기능을 사용하기 위해 보통 Jedis 클라이언트 라이브러리를 많이 활용합니다. 아래는 Jedis를 사용한 간단한 Redis Pub/Sub 예제입니다.


📦 1. Maven 의존성 추가

pom.xml에 다음 의존성을 추가합니다:

<dependency>
  <groupId>redis.clients</groupId>
  <artifactId>jedis</artifactId>
  <version>5.1.0</version>
</dependency>

📡 2. Subscriber 구현

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

public class RedisSubscriber {

    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost", 6379);

        JedisPubSub listener = new JedisPubSub() {
            @Override
            public void onMessage(String channel, String message) {
                System.out.println("Received on channel " + channel + ": " + message);
            }
        };

        System.out.println("Subscribing to channel 'news'...");
        jedis.subscribe(listener, "news");
        jedis.close();
    }
}

📨 3. Publisher 구현

import redis.clients.jedis.Jedis;

public class RedisPublisher {

    public static void main(String[] args) {
        try (Jedis jedis = new Jedis("localhost", 6379)) {
            String channel = "news";
            String message = "Hello from Java!";
            jedis.publish(channel, message);
            System.out.println("Published: " + message);
        }
    }
}

🧪 실행 방법

  1. 먼저 RedisSubscriber를 실행하여 채널을 구독합니다.
  2. 이후 RedisPublisher를 실행하여 메시지를 발행하면, 구독자가 메시지를 수신하게 됩니다.

🛠 추가 팁

  • JedisPubSub에는 onSubscribe, onUnsubscribe, onPMessage 등 다양한 콜백 메서드가 있습니다.
  • 여러 채널을 동시에 구독하거나 패턴 매칭(psubscribe)도 가능합니다.
  • 메시지 보존이 필요하다면 Redis Streams를 사용하는 것이 좋습니다.

redis의 특정 필드만 pub/sub으로 쓰려면?

Redis의 기본적인 Pub/Sub 모델은 “채널” 단위로 작동하며, Redis 키(key)나 필드(field) 수준에서 직접적으로 Pub/Sub을 사용하는 기능은 없습니다. 하지만 특정 필드에 대한 Pub/Sub처럼 동작하도록 구현하는 방법은 몇 가지가 있습니다.


🔧 1. 채널명을 필드처럼 설계하기 (권장 방식)

✔ 개요

필드 개념을 Redis Pub/Sub에서 구현하려면 채널명을 키:필드 형식으로 설계하세요.

✔ 예시

// Redis key: user:123
// 필드: status, 채널명은 user:123:status 로 정의

jedis.publish("user:123:status", "online");

이렇게 하면 실제로는 필드를 따로 추적하지 않지만, 구독자가 필요한 필드만 골라 구독할 수 있습니다:

jedis.subscribe(listener, "user:123:status");

🔄 2. Hash 필드 변경과 Pub/Sub을 연동하려면

✔ 상황

Redis HASH 타입 데이터 (HSET, HGET)를 사용하고 있을 때, 특정 필드 변경 시마다 알림을 보내고 싶을 수 있습니다.

✔ 방법

직접 HSET 후 Pub/Sub을 트리거해야 합니다 (Redis는 자동 알림을 하지 않음).

// 1. Hash 필드 변경
jedis.hset("user:123", "status", "offline");

// 2. Pub/Sub 알림 수동 발행
jedis.publish("user:123:status", "offline");

⚠ 주의

Redis는 HASHSET 등 데이터 구조의 변화에 대해 자동으로 메시지를 발행하지 않기 때문에, 반드시 명시적으로 PUBLISH 호출이 필요합니다.


🧠 3. Keyspace Notifications (선택적 고급 기능)

✔ 기능

Redis는 Keyspace Notifications라는 기능을 통해 키 변경 이벤트를 Pub/Sub 형태로 알릴 수 있습니다.

✔ 단점

  • 키 단위만 지원하며 필드 단위는 아님
  • 별도 설정 필요 (notify-keyspace-events 설정)
# 설정 예시
CONFIG SET notify-keyspace-events Kh

✔ 예시 코드

// 키 변경 감지 구독 (ex. user:123)
jedis.subscribe(listener, "__keyspace@0__:user:123");

✅ 결론

방법 필드 단위 지원 메시지 보존 비고
채널명에 필드 포함 ✅ 가능 가장 일반적이며 유연함
HSET + 수동 Publish ✅ 가능 데이터 + 알림 분리
Keyspace Notification ❌ 불가능 키 수준 알림만, 서버 설정 필요

Spring Boot + Redis 와 Redis Streams 구현


📘 1. Spring Boot + Redis Pub/Sub (필드 수준 구현)

✅ 아키텍처

  • Redis Pub/Sub 사용
  • 특정 필드 구독을 위해 채널명을 key:field 형태로 설계
  • @ComponentMessageListenerAdapter 사용

🔧 Maven 의존성 (spring-boot-starter-data-redis)

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

🛠 Redis Config

@Configuration
public class RedisConfig {

    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                                   MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listenerAdapter, new PatternTopic("user:123:status"));
        return container;
    }

    @Bean
    public MessageListenerAdapter listenerAdapter(RedisSubscriber subscriber) {
        return new MessageListenerAdapter(subscriber, "onMessage");
    }
}

📥 Subscriber Component

@Component
public class RedisSubscriber {
    public void onMessage(String message, String channel) {
        System.out.printf("Channel: %s | Received: %s%n", channel, message);
    }
}

📤 Publisher Service

@Service
@RequiredArgsConstructor
public class RedisPublisher {
    private final StringRedisTemplate redisTemplate;

    public void publish(String field, String message) {
        String channel = "user:123:" + field;
        redisTemplate.convertAndSend(channel, message);
    }
}

📙 2. Spring Boot + Redis Streams

✅ 아키텍처

  • 메시지 보존 및 소비 확인이 필요한 경우 Redis Streams 사용
  • XADD, XREAD, XREADGROUP 사용

🧩 Redis Stream Producer

@Service
@RequiredArgsConstructor
public class RedisStreamPublisher {

    private final StringRedisTemplate redisTemplate;

    public void sendToStream(String streamKey, Map<String, String> message) {
        redisTemplate.opsForStream().add(StreamRecords.mapBacked(message).withStreamKey(streamKey));
    }
}
// 예시 호출
publisher.sendToStream("user:123:status", Map.of("value", "online"));

🧲 Redis Stream Consumer with Consumer Group

@Component
@RequiredArgsConstructor
public class RedisStreamSubscriber {

    private final StringRedisTemplate redisTemplate;

    @PostConstruct
    public void init() {
        String stream = "user:123:status";
        String group = "statusGroup";
        String consumer = "consumer-1";

        try {
            redisTemplate.opsForStream().createGroup(stream, group);
        } catch (Exception e) {
            // 그룹이 이미 있으면 무시
        }

        Executors.newSingleThreadExecutor().submit(() -> {
            while (true) {
                List<MapRecord<String, Object, Object>> messages = redisTemplate
                    .opsForStream()
                    .read(Consumer.from(group, consumer),
                          StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                          StreamOffset.create(stream, ReadOffset.lastConsumed()));

                if (messages != null) {
                    for (MapRecord<String, Object, Object> message : messages) {
                        System.out.println("Received Stream Message: " + message.getValue());
                        redisTemplate.opsForStream().acknowledge(stream, group, message.getId());
                    }
                }
            }
        });
    }
}

🔑 주요 차이점 요약

항목 Redis Pub/Sub Redis Streams
메시지 영속성 ❌ 없음 ✅ 있음
메시지 누락 가능성 높음 낮음
소비자 그룹 지원 ❌ 없음 ✅ 있음
메시지 수신 확인 (ack) ❌ 불가 ✅ 가능
적합한 용도 실시간 브로드캐스트 내결함성 있는 이벤트 소비

👉 언제 무엇을 써야 할까?

  • 단순한 실시간 알림만 필요하면 → Pub/Sub
  • 메시지 유실 방지, 확인 처리, 재처리, 컨슈머 그룹이 필요하다면 → Redis Streams