
Concurrent Collections
Multi-thread 환경에서 일반적인 컬렉션 (ex. ArrayList, HashMap)을 사용하면 경쟁 상태(Race Condition)와 데이터 손상(Data Corruption)이 발생할 수 있다. 이를 해결하기 위해 Concurrent Collections을 사용한다.
Concurrent Map (동시성 맵) : ConcurrentHashMap
ConcurrentHashMap은 multi-thread 및 coroutine에서 안전하게 읽기/쓰기 작업을 수행할 수 있는 HashTable 기반의 Map 구현체이다. 이 컬렉션의 메서드들은 모두 원자성을 보장하여 동기화 문제를 예방한다.
따라서, 여러 thread/process가 동시에 하나의 공유 자원에 접근 및 조작을 시도하는 경합(Contention)이 자주 발생하는 공유 데이터 구조에 이상적이다. (ex. 고성능 캐싱 시스템, 애플리케이션 런타임 상태 및 설정 정보)
핵심 원리 : 부분적 락(Partial Locking)
- 일반 맵 (Collections.synchronizedMap()) : 컬렉션 전체에 락을 걸어 한 번에 하나의 thread만 접근이 허용된다. (낮은 동시성)
- ConcurrentHashMap : 데이터의 특정 버킷(bucket)에만 락을 걸어, 서로 다른 버킷에 대한 읽기/쓰기 작업이 병렬로 이뤄진다. 이는 락의 범위를 최소화하여 매우 높은 수준의 동시성을 제공한다.
사용 예시 (Kotlin Coroutine)
compute() 메서드는 키에 대한 검사(Check) 및 업데이트(Act)를 원자적으로 처리하여 안전하게 카운터를 증가시킨다.
import java.util.concurrent.ConcurrentHashMap
import kotlinx.coroutines.*
val concurrentMap = ConcurrentHashMap<String, Int>()
suspend fun safeUpdateCounter(key: String, count: Int) {
repeat(count) {
// Atomic Operation, compute()
concurrentMap.compute(key) { _, value ->
(value ?: 0) + 1
}
}
}
fun main(): kotlin.Unit = runBlocking {
val key = "item_count"
concurrentMap[key] = 0
// 100 Coroutines created
val jobs = List(100) {
// Dispatchers.Default : Coroutine created in Shared Background Thread Pool
launch(Dispatchers.Default) {
safeUpdateCounter(key, 1_000_000)
}
}
jobs.joinAll()
println("Final Count : ${concurrentMap[key]}")
}
- safeUpdateCounter() = 안전한 원자적 업데이트
- 원자성 보장 : 값을 읽고, 새로운 값을 계산하고, 맵에 다시 쓰는 전체 과정이 락 없이(Lock-Free) 단일 작업으로 처리된다.
- 경쟁 상태 방지 : map.get(key)+1 로직은 읽기, 증가, 쓰기로 나뉘어 경쟁 상태를 유발하지만, compute()는 이를 하드웨어 수준의 CAS 메커니즘으로 안전하게 묶는다.
- launch(Dispatchers.Default) = 병렬 처리 위임
- launch (=Coroutine Bulder) : 새로운 coroutine은 생성/실행한다. 생성된 coroutine은 즉시 실행될 수 있도록 스케줄러에 등록된다.
- Dispatchers.Default : Coroutine이 실행될 thread를 지정한다. Default Dispatcher는 CPU 코어 수준에 비례하는 공유 thread pool을 사용한다.
결과적으로, 총 100 개의 launch(coroutine)이 생성되며, CPU 코어 수만큼의 thread를 공유하여 safeUpdateCounter() 작업을 동시에 병렬로 실행한다.
Concurrent List (동시성 리스트) : CopyOnWriteArrayList
읽기 작업이 압도적으로 많고 쓰기 작업이 적을 때 가장 효율적인 리스트이다.
데이터를 수정할 때마다 내부 배열의 복사본을 새로 만들며, 이 때문에 쓰기 작업이 자주 발생할 경우 속도가 느려진다.
따라서, 이벤트 리스너 및 핸들러 관리, 런타임 환경 변수 등 수정할 일은 거의 없으며 주로 순회가 일어나는 용도로 사용하는 것이 적합하다.
핵심 원리 : 쓰기 시 복사 (Copy-On-Write)
- 읽기 작업 : 락 없이 원본 배열에서 병렬로 이루어지기 때문에 매우 빠르다.
- 쓰기 작업 : 데이터를 수정해야 할 때마다 리스트 내부 배열의 복사본을 새로 만들어 수정한다. 수정이 완료되면 새로운 복사본으로 원본을 대체한다. 이 과정 전체에 락이 걸리지만, 읽기 작업은 락의 영향을 받지 않는다.
사용 예시 (Kotlin Coroutine)
import kotlinx.coroutines.*
import java.util.concurrent.CopyOnWriteArrayList
import kotlin.random.Random
import kotlin.system.measureTimeMillis
val cowList = CopyOnWriteArrayList<Int>()
// Write Task : Lock, Create copy, Add element
suspend fun writeTask (count: Int) {
repeat(count) {
// When writing, Create and Append new copy of current List
cowList.add(Random.nextInt(1, 100))
}
}
// Read Task : Without Lock, Enter original array and iterate the whole elements
// Never be blocked during the write task
suspend fun readTask(iterations: Int) {
var sum = 0L
repeat(iterations) {
for(item in cowList) {
sum += item
}
}
println("Read Task Complete. Final Sum Check: $sum")
}
fun main() = runBlocking {
val initialCount = 100_000
(1..initialCount).forEach { cowList.add(it) }
val numWriters = 5
val writeOps = 1000
val numReaders = 20
val readIterations = 5
val totalTime = measureTimeMillis {
val writeJobs = List(numWriters) {
launch(Dispatchers.Default) {
writeTask(writeOps)
}
}
val readJobs = List(numReaders) {
launch(Dispatchers.Default) {
readTask(readIterations)
}
}
writeJobs.joinAll()
readJobs.joinAll()
}
val expectedWriteCount = numWriters * writeOps
val finalSize = cowList.size
println("\n------Results------")
println("Total Execution Time: $totalTime ms")
println("Initial List Size: $initialCount")
println("[Write Task] Expected Total Write Ops : $expectedWriteCount")
println("[Write Task] Actual Final Count: $finalSize (Initial Size + total Write Ops)")
println("[Read Task] Read Tasks: $numReaders coroutines have done ${numReaders * readIterations} iterations without locks")
}
/*
Read Task Complete. Final Sum Check: 25000343736
Read Task Complete. Final Sum Check: 25000328503
Read Task Complete. Final Sum Check: 25000346034
Read Task Complete. Final Sum Check: 25000383700
Read Task Complete. Final Sum Check: 25000418818
Read Task Complete. Final Sum Check: 25000452942
Read Task Complete. Final Sum Check: 25000453387
Read Task Complete. Final Sum Check: 25000475429
Read Task Complete. Final Sum Check: 25000481905
Read Task Complete. Final Sum Check: 25000490700
Read Task Complete. Final Sum Check: 25000490864
Read Task Complete. Final Sum Check: 25000407622
Read Task Complete. Final Sum Check: 25000476110
Read Task Complete. Final Sum Check: 25000447818
Read Task Complete. Final Sum Check: 25000511465
Read Task Complete. Final Sum Check: 25000511098
Read Task Complete. Final Sum Check: 25000516901
Read Task Complete. Final Sum Check: 25000519527
Read Task Complete. Final Sum Check: 25000521755
Read Task Complete. Final Sum Check: 25000517656
------Results------
Total Execution Time: 202 ms
Initial List Size: 100000
[Write Task] Expected Total Write Ops : 5000
[Write Task] Actual Final Count: 105000 (Initial Size + total Write Ops)
[Read Task] Read Tasks: 20 coroutines have done 100 iterations without locks
*/
- 쓰기 작업 오버헤드
- 5개의 coroutine이 총 10,000번의 cowList.add()를 시도한다.
- add가 호출될 때마다 내부적으로 리스트 전체의 복사본이 생성되므로, 쓰기 작업 자체는 느리다.
- 읽기 작업의 무차단성 (Non-Blocking)
- 20개의 읽기 coroutine은 쓰기 작업이 진행되는 동안 for-loop을 통해 데이터를 순회한다.
- 이 읽기 과정은 쓰기 락의 영향을 받지 않는다.
- 일관성 (Consistency)
- 읽기 작업은 add가 완료되어 배열이 교체된 후에도 순회 시작 지점의 데이터를 끝까지 읽는 약한 일관성을 보장한다.
- 때문에 실시간성이 덜 중요하고 읽기 부하가 높은 환경에 완벽하다.
한계
동시성 컬렉션은 개별 메서드 호출{ get(), put(), size(), add() 등}에 대해서만 쓰레드 안정성(thread-safe)을 보장한다.
두 개 이상의 메서드를 묶는 복합 연산은 여전히 원자성을 보장하지 않아 경쟁 상태에 취약하다.
| 유형 | 한계점 | 설명 |
| ConcurrentHashMap | 집합 연산의 일관성 부족 | size(), isEmpty(), values() 등의 메서드는 스냅샷이 아니므로, 호출 도중에 다른 thread가 데이터를 변경하면 결과가 최신 상태가 아닐 수 있다. |
| CopyOnWriteArrayList | 쓰기 작업의 성능 저하 | 요소를 변경할 때마다 전체 배열을 복사하기 때문에 쓰기 작업이 빈번하면 일반 ArrayList 보다 심각하게 느려진다. 또한, 복사 비용으로 인해 메모리 사용량도 증가한다. |
| 공통 | 복합 연산의 비원자성 | 컬렉션의 두 개 이상의 메서드 호출을 묶는 복합 연산은 여전히 원자성을 보장하지 않기 때문에 Mutex나 synchronized 블록을 사용해야 한다. ex. $\text[if(map.containsKey(key)) map.put(key, value))$ |
잘못된 사용 방법
아래 코드는 ConcurrentHashMap을 사용했음에도 불구하고, 여전히 경쟁 상태(Race Condition)에 취약하다.
val map = ConcurrentHashMap<String, Int>()
map["data"] = 100
fun updateData() {
val key = "data"
// 1. 값 확인 (check)
if (map[key] == 100) {
Thread.sleep(1) // 여기서 다른 thread가 map["data"]의 값을 200으로 변경할 수 있다.
// 2. 값 변경 (act)
map[key] = 101 // 다른 thread의 변경을 덮어쓸 위험이 있음 (비원자적)
}
}
한 마디로, 컬렉션의 값 확인과 값 변경의 두 가지 작업이 원자적으로(Atomic, 한 번에) 처리되지 않아 그 사이에 다른 thread / coroutine이 끼어들어 데이터를 손상시키는 경우다.
해결 방법
1. Atomic Operations (컬렉션 메서드 활용)
replcae() 는 ConcurrentHashMap에서 복합 연산의 문제를 해결하기 위한 원자적 메서드다. "이전 값이 A라면 새 값 B로 변경하라"는 검사-변경을 단일 작업으로 처리한다. 이는 multi-thread 환경에서 "Check-Then-Act" 패턴을 안전히 구현하는 가장 효율적인 방법이다.
// 원자적으로 100일 때만 101로 교체 (Compare-and-Set 역할)
map.replace("data", 100, 101)
2. Mutex/synchronized (복합 로직 보호)
Kotlin coroutines 환경에서는 Mutex를, 전통적인 thread 환경에서는 synchronized 블록을 사용하여 전체 로직을 보호해야 한다. 그 중, Mutex는 특정 코드 블록에 한 번에 하나의 coroutine만 접근하도록 보장하는 락(lock) 메커니즘이다.
import kotlinx.coroutines.sync.*
val updateMutex = Mutex()
suspend fun updateDataSafe() {
// 아래 블록은 한 번에 하나의 coroutine만 진입 가능
updateMutex.withLock {
val key = "data"
if (map[key] == 100) {
map[key] - 101
}
}
}
참고 자료 (Coroutine 위주)
- Concurrent Collections (The Java™ Tutorials > Essential Java Classes > Concurrency)
- 안전한 스레드를 위한 CopyOnWriteArrayList | 꿈꾸는 지구별 개발자, Phang
- ConcurrentHashMap, CopyOnWriteArrayList에 대하여 :: slowandsteady코딩
Powered By. ChatGPT & Gemini
'Backend' 카테고리의 다른 글
| [Backend] volatile 키워드, 언제 쓰고 왜 쓰는가? (2) | 2025.10.04 |
|---|---|
| [Backend] TCP/IP 4계층 & 1024 이하 Known ports (0) | 2025.09.26 |
| [Backend] Pub/Sub Pattern (1) | 2025.09.24 |
| [Backend] ThreadLocal과 Atomic (1) | 2025.09.22 |
| [Backend] 경쟁 상태와 락 (Race Condition & Lock) (1) | 2025.09.19 |