Redis에서 Pub/Sub(Publish/Subscribe) 기능을 사용하는 방법은 비교적 간단합니다. 아래는 기본적인 사용 방법과 예제입니다.
🧠 개념 요약
Redis Pub/Sub는 메시지 브로커 역할을 수행하여, 한 클라이언트가 메시지를 발행(Publish)하면, 해당 채널을 구독(Subscribe)하고 있는 다른 클라이언트들이 실시간으로 그 메시지를 받게 됩니다.
- Publisher: 채널에 메시지를 발행하는 클라이언트
- Subscriber: 특정 채널을 구독하고 메시지를 받는 클라이언트
🛠 명령어
PUBLISH <channel> <message>: 메시지를 발행SUBSCRIBE <channel> [<channel> ...]: 채널 구독UNSUBSCRIBE [<channel> ...]: 채널 구독 취소
💻 예제 (CLI 기준)
터미널 1 – Subscriber
redis-cli
> SUBSCRIBE news
터미널 2 – Publisher
redis-cli
> PUBLISH news "Hello from Redis!"
Subscriber는 다음과 같은 응답을 받습니다:
1) "message"
2) "news"
3) "Hello from Redis!"
🧑💻 Python 예제 (redis-py 사용)
import redis
import threading
r = redis.Redis(host='localhost', port=6379, db=0)
def listen():
pubsub = r.pubsub()
pubsub.subscribe('news')
for message in pubsub.listen():
if message['type'] == 'message':
print(f"Received: {message['data'].decode('utf-8')}")
# Subscriber thread
threading.Thread(target=listen).start()
# Publisher
r.publish('news', 'Breaking News!')
⚠️ 주의사항
- Pub/Sub 메시지는 영속적이지 않음: 수신자가 구독 중이 아닐 경우 메시지는 유실됩니다.
- 분산 처리에 적합하지 않음: 메시지를 보장하거나 처리 상태를 추적해야 한다면 Redis Streams나 Kafka와 같은 큐 시스템을 고려하세요.
✅ 활용 예시
- 실시간 채팅 시스템
- 실시간 알림 시스템
- 간단한 이벤트 전파
JAVA
Java에서 Redis의 Pub/Sub 기능을 사용하기 위해 보통 Jedis 클라이언트 라이브러리를 많이 활용합니다. 아래는 Jedis를 사용한 간단한 Redis Pub/Sub 예제입니다.
📦 1. Maven 의존성 추가
pom.xml에 다음 의존성을 추가합니다:
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>5.1.0</version>
</dependency>
📡 2. Subscriber 구현
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
public class RedisSubscriber {
public static void main(String[] args) {
Jedis jedis = new Jedis("localhost", 6379);
JedisPubSub listener = new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
System.out.println("Received on channel " + channel + ": " + message);
}
};
System.out.println("Subscribing to channel 'news'...");
jedis.subscribe(listener, "news");
jedis.close();
}
}
📨 3. Publisher 구현
import redis.clients.jedis.Jedis;
public class RedisPublisher {
public static void main(String[] args) {
try (Jedis jedis = new Jedis("localhost", 6379)) {
String channel = "news";
String message = "Hello from Java!";
jedis.publish(channel, message);
System.out.println("Published: " + message);
}
}
}
🧪 실행 방법
- 먼저
RedisSubscriber를 실행하여 채널을 구독합니다. - 이후
RedisPublisher를 실행하여 메시지를 발행하면, 구독자가 메시지를 수신하게 됩니다.
🛠 추가 팁
JedisPubSub에는onSubscribe,onUnsubscribe,onPMessage등 다양한 콜백 메서드가 있습니다.- 여러 채널을 동시에 구독하거나 패턴 매칭(
psubscribe)도 가능합니다. - 메시지 보존이 필요하다면
Redis Streams를 사용하는 것이 좋습니다.
redis의 특정 필드만 pub/sub으로 쓰려면?
Redis의 기본적인 Pub/Sub 모델은 “채널” 단위로 작동하며, Redis 키(key)나 필드(field) 수준에서 직접적으로 Pub/Sub을 사용하는 기능은 없습니다. 하지만 특정 필드에 대한 Pub/Sub처럼 동작하도록 구현하는 방법은 몇 가지가 있습니다.
🔧 1. 채널명을 필드처럼 설계하기 (권장 방식)
✔ 개요
필드 개념을 Redis Pub/Sub에서 구현하려면 채널명을 키:필드 형식으로 설계하세요.
✔ 예시
// Redis key: user:123
// 필드: status, 채널명은 user:123:status 로 정의
jedis.publish("user:123:status", "online");
이렇게 하면 실제로는 필드를 따로 추적하지 않지만, 구독자가 필요한 필드만 골라 구독할 수 있습니다:
jedis.subscribe(listener, "user:123:status");
🔄 2. Hash 필드 변경과 Pub/Sub을 연동하려면
✔ 상황
Redis HASH 타입 데이터 (HSET, HGET)를 사용하고 있을 때, 특정 필드 변경 시마다 알림을 보내고 싶을 수 있습니다.
✔ 방법
직접 HSET 후 Pub/Sub을 트리거해야 합니다 (Redis는 자동 알림을 하지 않음).
// 1. Hash 필드 변경
jedis.hset("user:123", "status", "offline");
// 2. Pub/Sub 알림 수동 발행
jedis.publish("user:123:status", "offline");
⚠ 주의
Redis는 HASH나 SET 등 데이터 구조의 변화에 대해 자동으로 메시지를 발행하지 않기 때문에, 반드시 명시적으로 PUBLISH 호출이 필요합니다.
🧠 3. Keyspace Notifications (선택적 고급 기능)
✔ 기능
Redis는 Keyspace Notifications라는 기능을 통해 키 변경 이벤트를 Pub/Sub 형태로 알릴 수 있습니다.
✔ 단점
- 키 단위만 지원하며 필드 단위는 아님
- 별도 설정 필요 (
notify-keyspace-events설정)
# 설정 예시
CONFIG SET notify-keyspace-events Kh
✔ 예시 코드
// 키 변경 감지 구독 (ex. user:123)
jedis.subscribe(listener, "__keyspace@0__:user:123");
✅ 결론
| 방법 | 필드 단위 지원 | 메시지 보존 | 비고 |
|---|---|---|---|
| 채널명에 필드 포함 | ✅ 가능 | ❌ | 가장 일반적이며 유연함 |
| HSET + 수동 Publish | ✅ 가능 | ❌ | 데이터 + 알림 분리 |
| Keyspace Notification | ❌ 불가능 | ❌ | 키 수준 알림만, 서버 설정 필요 |
Spring Boot + Redis 와 Redis Streams 구현
📘 1. Spring Boot + Redis Pub/Sub (필드 수준 구현)
✅ 아키텍처
- Redis Pub/Sub 사용
- 특정 필드 구독을 위해
채널명을 key:field형태로 설계 @Component와MessageListenerAdapter사용
🔧 Maven 의존성 (spring-boot-starter-data-redis)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
🛠 Redis Config
@Configuration
public class RedisConfig {
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listenerAdapter, new PatternTopic("user:123:status"));
return container;
}
@Bean
public MessageListenerAdapter listenerAdapter(RedisSubscriber subscriber) {
return new MessageListenerAdapter(subscriber, "onMessage");
}
}
📥 Subscriber Component
@Component
public class RedisSubscriber {
public void onMessage(String message, String channel) {
System.out.printf("Channel: %s | Received: %s%n", channel, message);
}
}
📤 Publisher Service
@Service
@RequiredArgsConstructor
public class RedisPublisher {
private final StringRedisTemplate redisTemplate;
public void publish(String field, String message) {
String channel = "user:123:" + field;
redisTemplate.convertAndSend(channel, message);
}
}
📙 2. Spring Boot + Redis Streams
✅ 아키텍처
- 메시지 보존 및 소비 확인이 필요한 경우 Redis Streams 사용
XADD,XREAD,XREADGROUP사용
🧩 Redis Stream Producer
@Service
@RequiredArgsConstructor
public class RedisStreamPublisher {
private final StringRedisTemplate redisTemplate;
public void sendToStream(String streamKey, Map<String, String> message) {
redisTemplate.opsForStream().add(StreamRecords.mapBacked(message).withStreamKey(streamKey));
}
}
// 예시 호출
publisher.sendToStream("user:123:status", Map.of("value", "online"));
🧲 Redis Stream Consumer with Consumer Group
@Component
@RequiredArgsConstructor
public class RedisStreamSubscriber {
private final StringRedisTemplate redisTemplate;
@PostConstruct
public void init() {
String stream = "user:123:status";
String group = "statusGroup";
String consumer = "consumer-1";
try {
redisTemplate.opsForStream().createGroup(stream, group);
} catch (Exception e) {
// 그룹이 이미 있으면 무시
}
Executors.newSingleThreadExecutor().submit(() -> {
while (true) {
List<MapRecord<String, Object, Object>> messages = redisTemplate
.opsForStream()
.read(Consumer.from(group, consumer),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(stream, ReadOffset.lastConsumed()));
if (messages != null) {
for (MapRecord<String, Object, Object> message : messages) {
System.out.println("Received Stream Message: " + message.getValue());
redisTemplate.opsForStream().acknowledge(stream, group, message.getId());
}
}
}
});
}
}
🔑 주요 차이점 요약
| 항목 | Redis Pub/Sub | Redis Streams |
|---|---|---|
| 메시지 영속성 | ❌ 없음 | ✅ 있음 |
| 메시지 누락 가능성 | 높음 | 낮음 |
| 소비자 그룹 지원 | ❌ 없음 | ✅ 있음 |
| 메시지 수신 확인 (ack) | ❌ 불가 | ✅ 가능 |
| 적합한 용도 | 실시간 브로드캐스트 | 내결함성 있는 이벤트 소비 |
👉 언제 무엇을 써야 할까?
- 단순한 실시간 알림만 필요하면 →
Pub/Sub - 메시지 유실 방지, 확인 처리, 재처리, 컨슈머 그룹이 필요하다면 →
Redis Streams