Pub/Sub Pattern
Publish/Subscribe (Pub/Sub) 패턴은 메시지 패턴으로, 메시지 전달자인 Publisher를 수령자인 Subsriber와 분리시킨다.
이를 통해 비동기, 다대다 의사소통을 가능케하며, 메시지 전달을 담당하는 중앙 집중형 메시지 브로커를 갖고 있다.

핵심 구성요소
- Publisher : (어떤 Subscriber가 수령할지 상관하지 않는) 메시지 발행 주체
- Subscriber : 하나 이상의 주제를 구독하여 이에 대한 메시지를 수령한다.
- Topic : 메시지 연결 통로(channel) 혹은 분야(category)
- Message Broker : 메시지 흐름을 관리하는 중간 요소로, Publisher로부터 메시지를 수령하여 Subscriber에게 전달한다.
동작 방식
- Publisher가 message broker의 특정 topic에 메시지를 전달한다.
- Subsriber가 message broker의 하나 이상의 topic을 구독한다.
- Message broker가 메시지를 수령하면 이에 대한 복사본을 구독한 Subsriber의 수 만큼 생성 후 전달한다.
- 이를 통해 Publisher와 Subsriber는 비동기적으로 의사소통한다.
Pub/Sub 패턴이 위 방식으로만 구현되는 것은 아니다. 고객 요구사항에 따라 다양한 방식으로 전달 형태와 방식을 구현할 수 있다.
특징
- Pub/Sub 분리 : 서로에 영향을 주는 일 없이 자유롭게 요소를 추가하고 제거할 수 있어 시스템 유연성이 향상된다.
- 비동기성 : Publisher는 Subscriber가 메시지를 수령받길 기다릴 필요가 없어 반응 속도와 성능이 향상된다.
- 신뢰성 : 메시지는 전달에 성공할 때까지 broker에 저장되기 때문에, Subsriber가 잠깐 사용불가능해도 데이터를 잃지 않는다.
주된 사용처
- SNS 등 실시간 업데이트 및 통지
- 사용자 결제, 이메일 등 이벤트 기반(Event-driven) 아키텍처
- 로깅 분담
2025년 가장 많이 사용되는 Message Broker 서비스는 Pulsar(Kafka 흡수), Redis Pub/Sub, Amazon SQS가 있다.
구현
package org.example.concurrent
/*
1. Subscriber를 멀티쓰래드로 구현. 10개 정도를 실행 한다.
각각의 Subscriber는 고유의 ID를 가진다
2. Subscriber 가 Broker 에게 요청하여 데이터를 가져 가는 형태로 구현한다
3. Broker 를 Singleton 으로 구현
4. 0 이 아닌 숫자를 입력하면 메시지를 개수만큼 발행.
5. 발행 된 메시지는 SubScriber 가 Broker 에게서 받아간다.
SubScriber 는 받은 메시지와 SubScriber ID 를 출력한다
6. 0 이 아닌 숫자를 입력할경우 메세지 발행 후 다시 숫자를 입력 받음(무한 반복)
7. 0 를 입력하면 종료
*/
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.atomic.AtomicInteger
data class Event(val message: String)
// 3. Singleton Broker
object Broker {
private val messageQueue = LinkedBlockingQueue<Event>()
private val subscriberIdCounter = AtomicInteger(0)
// 4. Message Publishing..
fun putEvent(event : Event) {
messageQueue.add(event)
}
// 2, 5. The Subscriber calls Broker and takes message
fun getEvent() : Event {
return messageQueue.take()
}
fun getNewSubscriberId() : Int {
return subscriberIdCounter.incrementAndGet()
}
}
// 1, 5. Multi-threading Subscribers
class Subscriber(private val id: Int, private val broker: Broker) : Runnable {
override fun run() {
println("[Subscriber-$id] started.")
while(!Thread.currentThread().isInterrupted) {
try {
val event = broker.getEvent()
println("[Subscriber-$id] received message: ${event.message}")
} catch (e: InterruptedException) {
Thread.currentThread().interrupt()
break
}
}
println("[Subscriber-$id] terminated.")
}
}
fun main() {
val numberOfSubscribers = 10
val threads = mutableListOf<Thread>()
// 1. Multi-threading Subscribers
repeat (numberOfSubscribers) {
val subscriberId = Broker.getNewSubscriberId()
val subscriberTask = Subscriber(subscriberId, Broker)
val thread = Thread(subscriberTask, "Subscriber-$subscriberId")
threads.add(thread)
thread.start()
}
println("System started.")
print(" > ")
val reader = java.util.Scanner(System.`in`)
while(true) {
val count = try {
reader.nextInt()
} catch (e: Exception) {
println("Invalid input. Please enter a valid number")
reader.nextLine()
continue
}
if (count == 0) {
// 7. If 0, exit.
threads.forEach { it.interrupt() }
threads.forEach { it.join() }
println("All subscribers termintated. System exit.")
break
} else if (count > 0) {
// 4. Else, publish message
for (i in 1..count) {
Broker.putEvent(Event("Message $i/$count"))
}
println("Published $count messages to the Broker.")
}
}
}
구현 분석
1. LinkedBlockingQueue를 사용한 효율적 대기
만약 ConcurrentLinkedQueue로 위 코드를 구현한다면 sleep()을 호출하여 주기적으로 깨어나 큐를 확인하는 비효율적인 Busy Waiting이 발생한다.
이를 LinkedBlockingQueue의 blocking 기능을 사용하여 해결하고, CPU 자원 낭비를 방지했다.
- (Broker) Event 발행 시, Singleton 인스턴스 Broker에 queue.put(message)를 호출하여 메시지를 안전하게 저장한다.
- (Subscriber) Broker 호출 시, Broker의 queue.take()를 호출하면 큐가 비어있을 경우, Subscriber thread는 CPU 자원을 소모하지 않고 효율적으로 잠시 멈춘다(Block). 새로운 메시지가 큐에 들어온다면 큐는 대기 중인 thread를 자동으로 깨워 작업을 재개한다.
- 이 큐는 내부적으로 동기화 메커니즘을 갖고 있어, 여러 thread가 동시에 메시지를 읽고, 써도 데이터 손상이 없다.
2. 경쟁적 소비 모델
이 모델은 하나의 메시지 큐를 다수의 소비자 thread가 공유하며, 각 메시지는 오직 하나의 소비자에게만 전달된다.
대량 주문 처리, 이메일 발송 작업 등 작업 부하를 여러 워커(Worker)에 분산시켜 처리량을 극대화할 때 사용된다.
참고 자료
- What Is Pub/Sub? A Guide to the Publish-Subscribe Pattern | Toptal®
- Publish / Subscribe 패턴 알아보기 | tistory-zaccoding
Powered By. ChatGPT & Gemini
'Backend' 카테고리의 다른 글
| [Backend] TCP/IP 4계층 & 1024 이하 Known ports (0) | 2025.09.26 |
|---|---|
| [Backend] Concurrent Collections (동시성 컬렉션) (1) | 2025.09.25 |
| [Backend] ThreadLocal과 Atomic (1) | 2025.09.22 |
| [Backend] 경쟁 상태와 락 (Race Condition & Lock) (1) | 2025.09.19 |
| [Backend] Process & Thread (2) | 2025.09.17 |