경쟁적 소비 모델
import java.util.Scanner
import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.atomic.AtomicInteger
data class Event(val message: String)
// Broker (Publisher & Queue 관리자)
object Broker {
private val eventQueue: BlockingQueue<Event> = LinkedBlockingQueue<Event>()
private val subscriberIdCounter = AtomicInteger(0)
// Publisher가 메시지 발행 (put)
fun publish(event : Event) {
eventQueue.put(event)
println("[Broker] Published: ${event.message}")
}
// Subscriber가 메시지 수령 (take)
fun consume() : Event = eventQueue.take()
fun getNewSubscriberId() : Int = subscriberIdCounter.incrementAndGet()
}
// Subscriber (메시지 소비자)
class Subscriber(private val id: Int, private val broker: Broker) : Runnable {
override fun run() {
println("[Subscriber-$id] started on ${Thread.currentThread().name}")
try {
while (!Thread.currentThread().isInterrupted) {
val event = broker.consume()
println("[Subscriber-$id] received: ${event.message}")
}
} catch (_: InterruptedException) {
println("[Subscriber-$id] interrupted.")
Thread.currentThread().interrupt()
}
println("[Subscriber-$id] terminated.")
}
}
이 코드는 하나의 메시지를 여러 Subscriber 중 단 한 명만 소비하는 경쟁적 소비 모델 기반 코드 예시다.
모든 Subscriber가 하나의 공용 Queue를 공유하며, take()를 먼저 수행한 쓰레드가 메시지를 가져간다.
이 구조는 부하 분산 (Load Balancing) 목적에 유용하다.
예를 들어, 100개의 요청 메시지가 있다면 5개의 Subscriber가 이를 나눠서 병렬 처리할 수 있다.
유사한 시스템으로는 RabbitMQ, AWS SQS, Kafka Consumer Group이 존재한다.
고성능과 확장성(Scalability)를 보장하지만, 다른 Subscriber는 해당 메시지를 받지 못한다.
Broadcast
- 기존 코드 : Message Broker가 생성한 메시지를 메시지 큐에 저장하면 모든 Subscriber가 메시지를 소비하기 위해 경쟁하는 상태 (경쟁적 소비 모델, Queue-Based Competition)
- 요구사항 : Message Broker가 생성한 메시지의 복사본을 모든 Subscriber가 수령하는 방식으로 코드 변경. (Broadcast)
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
data class Message(val id: Int, val content: String)
object MessageBroker {
private val messageMap = ConcurrentHashMap<Int, Message>()
private val subscriberIdCounter = AtomicInteger(0)
private val messageIdCounter = AtomicInteger(1)
fun putMessage(content: String): Int {
val newId = messageIdCounter.getAndIncrement()
messageMap[newId] = Message(newId, content)
return newId
}
fun getMessage(id: Int): Message? = messageMap[id]
fun getLastMessageId(): Int = messageIdCounter.get() - 1
fun getNewSubscriberId() : Int = subscriberIdCounter.incrementAndGet()
}
class MessageSubscriber(private val id: Int, private val broker: MessageBroker) : Runnable {
private var lastProcessedId: Int = 0
override fun run() {
println("[Subscriber-$id] started.")
while(!Thread.currentThread().isInterrupted) {
try {
val lastMessageId = broker.getLastMessageId()
for (i in (lastProcessedId+1) .. lastMessageId) {
val message = broker.getMessage(i)
if (message != null) {
println("[Subscriber-$id] Message received from Message-$i: ${message.content}")
lastProcessedId = i
} else {
Thread.sleep(10)
}
}
if (lastMessageId == lastProcessedId) Thread.sleep(100)
} catch (_ : InterruptedException) {
Thread.currentThread().interrupt()
break
}
}
println("[Subscriber-$id] finished.")
}
}
Broadcast Pub/Sub은 모든 Subscriber가 모든 메시지를 수신하는 구조다.
Publisher가 메시지를 발행하면 Broker가 이를 모든 Subscriber에게 복제하여 전송한다. ("Create Once, Publish Everywhere(COPE)")
모든 Subscriber가 같은 데이터를 받아야 하는 상황(예: 실시간 알림, 채팅방, 센서 브로드캐스트)에 적합하다.
유사 시스템으로는 Redis Pub/Sub, MQTT Topic 이 존재한다.
모든 수신자에게 동일한 정보를 보장하지만, Subscriber 수가 많아질 수록 부하가 증가한다.
※ 간단하게 쓸 때는 Redis Pub/Sub, 대용량은 Kafka로 자리매김함.
'Backend' 카테고리의 다른 글
| [Backend] 서버-클라이언트 연결 (ServerSocket & Socket) (0) | 2025.10.16 |
|---|---|
| [Backend] Kotlin N:N Chat Application (Blocking Socket) (0) | 2025.10.14 |
| [Backend] volatile 키워드, 언제 쓰고 왜 쓰는가? (2) | 2025.10.04 |
| [Backend] TCP/IP 4계층 & 1024 이하 Known ports (0) | 2025.09.26 |
| [Backend] Concurrent Collections (동시성 컬렉션) (1) | 2025.09.25 |